Wednesday, December 12, 2012

Presenting for Datastax C*ollege Credit Webinar Series

I'm going to present as part of the Datastax C*ollege Webinar Series tomorrow, December 13, 2012.

I've taken some liberties with the material.  We are going to venture into uncharted territory and use Astyanax and CQL to build a globally scalable naughty and nice list for Santa.  Hopefully,  it will be a good time.  Please join if you have time!

Tuesday, December 11, 2012

Building CQL Java Driver on Mac OSX (and Cassandra Cluster Manager(ccm): No module named yaml)

I wanted to play around with the new CQL java driver.

After git cloning, I had tests failing because it relied on Cassandra Cluster Manager (ccm), found here:

I quickly grabbed that and ran:
sudo ./ install

But ran into this error when I tried to run:

bone@zen$ ccm
Traceback (most recent call last):
  File "/usr/local/bin/ccm", line 5, in 
    from ccmlib import common
  File "/Library/Python/2.7/site-packages/ccmlib/", line 5, in 
    import os, common, shutil, re, sys, cluster, node, socket
  File "/Library/Python/2.7/site-packages/ccmlib/", line 3, in 
    import common, yaml, os, subprocess, shutil, repository, time, re
ImportError: No module named yaml

After a couple tries installing via easy_install, I had to resort to installing libYAML from source:

Once, I did that, then the easy install worked with:
sudo easy_install pyyaml

Then, ccm was happy, which allowed me to build CQL java-driver.

bone@zen:~/git/boneill42/java-driver$ mvn clean install 
[INFO] Cassandra Java Driver ............................. SUCCESS [0.496s]
[INFO] Cassandra Java Driver - Core ...................... SUCCESS [4.718s]
[INFO] Cassandra Java Driver Examples .................... SUCCESS [0.008s]
[INFO] Cassandra Java Driver Examples - Stress ........... SUCCESS [4.965s]
[INFO] ------------------------------------------------------------------------


Tuesday, November 27, 2012

Compiling Storm (and jzmq) on Mac OSX

I recently setup a new machine, went to compile Storm, and forgot what a PITA it is.  Here is a blog post so future-me knows how to do it.

The biggest hurdle is jzmq.   To get that installed, your going to need some make mojo, and some libraries.  I use Brew.  If you are still using macports, consider a switch.

Anyway, here is the recipe to get jzmq installed assuming you have Brew.
$ git clone
$ brew install automake
$ brew install libtool
$ brew install zmq
$ cd jzmq
$ ./configure

That resulted in a cryptic message:
cannot find jni.h in /Library/Java/Home/include 

To fix that, I found a suggested fix and created a symbolic link.
$ sudo ln -s /System/Library/Frameworks/JavaVM.framework/Versions/A/Headers/ /Library/Java/Home/include

Then, you hit this:
No rule to make target `classdist_noinst.stamp'

I had to dig through Google cache archives to find a solution for that one:
$ touch src/classdist_noinst.stamp

Then, you hit:
error: cannot access org.zeromq.ZMQ
That's because you haven't compiled zeromq yet!  So:
$ cd src
$ javac -d . org/zeromq/*.java
Wasn't that easy? =)  All that's left is to build and install:
$ cd ..
$ make
$ sudo make install

Now you have jzmq installed.  So we can get on with Storm.  Storm needs lein to build.  Don't go grabbing the latest version of lein either.   You'll need < 2.  There is an explicit check that was added to Storm that will refuse to build with lein >= 2.  You can grab older versions here. (we use 1.7.1)

Unzip that and copy $LEIN_HOME/bin/lein to your bin directory.  Make it executable and put it in your path.  Once you've done that, building Storm isn't so bad.  From the root of the storm source tree:
$ lein deps
$ lein jar 
$ lein install

Happy Storming.

Monday, November 26, 2012

Running Cassandra 1.2-beta on JDK 7 w/ Mac OSX: no snappyjava in java.library.path

The latest greatest Cassandra (1.2-beta) now uses snappy-java for compression.  Unfortunately for now, Cassandra uses version of snappy-java.  That version of snappy-java doesn't play well with JDK 7 on Mac OSX.

There is a known bug:

The fix is in the latest milestone release:

Until that is formally released and Cassandra upgrades its dependency, if you want to run Cassandra under JDK 7 on Mac OSX, follow the instructions at the bottom of this issue.  Basically, unzip the snappy-java jar file and copy the jni library into $CASSANDRA_HOME.  You can see below that I used the jar file from my m2 repo.

bone@zen:~/.m2/repository/org/xerial/snappy/snappy-java/> unzip snappy-java- | grep Mac | grep jni
    44036  10-05-11 10:34   org/xerial/snappy/native/Mac/i386/libsnappyjava.jnilib
    49792  10-05-11 10:34   org/xerial/snappy/native/Mac/x86_64/libsnappyjava.jnilib

Copy the libsnappyjava.jnilib file into Cassandra, and you should be good to go.  If you used the version in your m2, thats:
cp ~/.m2/repository/org/xerial/snappy/snappy-java/ .

Alternatively, if you are building Cassandra from source, you can upgrade the snappy-java version yourself (in the build.xml).

BTW, Cassandra is tracking this under issue:

Wednesday, November 21, 2012

Installing JDK 7 on Mac OS X

To get JDK 7 up,
Surgery required.  So, I headed over to:
This is where the system jvm's are stored.  You'll notice a symbolic link for CurrentJDK.  It probably points to:
You're going to want to point that to the new JDK, which java_home tells us is located in:
bone@zen:/usr/libexec$ /usr/libexec/java_home
So, the magic commands you need are:
bone@zen:/System/Library/Frameworks/JavaVM.framework/Versions$ sudo rm CurrentJDK
bone@zen:/System/Library/Frameworks/JavaVM.framework/Versions$ sudo ln -s /Library/Java/JavaVirtualMachines/jdk1.7.0_09.jdk/Contents/ CurrentJDK
Then, you should be good:
bone@zen:/System/Library/Frameworks/JavaVM.framework/Versions$ java -version 
java version "1.7.0_09"
Java(TM) SE Runtime Environment (build 1.7.0_09-b05)
Java HotSpot(TM) 64-Bit Server VM (build 23.5-b02, mixed mode)

Thursday, November 8, 2012

Big Data Quadfecta: (Cassandra + Storm + Kafka) + Elastic Search

In my previous post, I discussed our BigData Trifecta, which includes Storm, Kafka and Cassandra. Kafka played the role of our work/data queue.  Storm filled the role of our data intake / flow mechanism, and Cassandra our system of record for all things storage.

Cassandra is a fantastic for flexible/scalable storage, but unless you purchase Datastax Enterprise, you're on your own for unstructured search.  Thus, we wanted a mechanism that could index the data we put in Cassandra.

Initially, went with our cassandra trigger mechanism, connected to a SOLR backend. (  That was sufficient, but as we scale our use of Cassandra, we anticipate a much greater load on SOLR, which means additional burden to manage slave/master relationships.  Trying to get ahead of that, we wanted to look at other alternatives.

We evaluated Elastic Search (ES) before choosing SOLR.  ES was better in almost every aspect: performance, administration, scalability, etc.  BUT, it still felt young.  We did this evaluation back in mid-2011, and finding commercial support for ES was difficult compared to SOLR.

That changed substantially this year however when Elastic Search incorporated, and pulled in some substantial players.  With our reservations addressed, we decided to re-examine Elastic Search, but from a new angle.

We now have Storm solidly in our technology stack.  With Storm acting as our intake mechanism, we decided to move away from a trigger-based mechanism, and instead we decided to orchestrate the data flow between Cassandra and ES using Storm.

That motivated us to write and open-source an Elastic Search bolt for Storm:

We simply tacked that bolt onto the end of our Storm topology and with little effort, we have an index of all the data we write to Cassandra.

For the bolt, we implemented the same "mapper" pattern that we put in place when we refactored the Storm Cassandra bolt.  To use the bolt, you just need to implement, TupleMapper, which has the following methods:

    public String mapToDocument(Tuple tuple);
    public String mapToIndex(Tuple tuple);
    public String mapToType(Tuple tuple);
    public String mapToId(Tuple tuple); 

Similar to the Cassandra Bolt, where you map a tuple into a Cassandra Row, here you simply map the tuple to a document that can be posted to Elastic Search (ES).  ES needs four pieces of information to index a document: the documents itself (JSON), the index to which the document should be added, the id of the document and the type.

We included a default mapper, which does nothing more than extract the four different pieces of data directly from the tuple based on field name:

The bolt is still*very* raw.  I have a sample topology that uses it with the Cassandra Bolt. I'll try to get that out there ASAP.

Sidenote: We are still waiting on a resolution to the classpath issue in Storm before we can do another release.  (See:

As always, let me know if you have any trouble. 

Wednesday, October 31, 2012

CQL: Seismic shift or simply a SQL veneer for Cassandra?

If you have recently come to Cassandra and you are happily using CQL, this blog post will probably only serve to confuse you.  You may want to stop here.

If however, you:
  • Are using Cassandra as a BigTable and care about the underlying persistence layer
  • Started using CQL, but wonder why it has crazy limitations that aren't found in SQL
  • Are accustomed to Java APIs and are trying to reconcile that with the advent of CQL
Then, this blog post is for you.  In fact, first you may want to read Jonathan's excellent post to get up to speed.

Now, I don't know the full history of all of it, but Cassandra has struggled with client apis from the beginning.  There are remnants of that struggle littered throughout the code base, and I believe there were a couple false starts along the way. (cassandra$ grep -r "avro" *)

But since I've been using Cassandra, I've been able to hang my hat on Thrift, which is the underlying RPC layer on which many of the Java APIs are built.  The Thrift layer is tied intimately to the storage engine in Cassandra and although it can be a bit cumbersome, it exposed everything we needed to build out app-layer extensions (a REST interface, indexing, and trigger functionality).

That said, Thrift is an albatross.  It provides no abstraction layer between between clients and the server-side implementation.  Concepts that you wish would die are still first class citizens in the Thrift API.  (cassandra$ grep -r "SuperColumn" * | grep thrift)  

An abstraction layer was certainly necessary, and I believe this was the primary purpose of CQL: to shield the client/user from the complexities of the underlying storage mechanism, and changes within that storage layer.  But because we chose a SQL-like abstraction layer, it has left many of the people that came to Cassandra because it is "NO"-SQL, wondering why a SQL-like language was introduced, mucking up their world.  Isn't the simple BigTable-based data model sufficient, why do we have to complicate things and impose schemas on everyone?

First, let me say that I never considered the underlying storage mechanism of Cassandra complex.  To me, it was a lot easier to reason about than many of the complex relational models I've seen over my career.  At the risk of oversimplifying, in Cassandra you really only have one structure to deal with HashMap<RK, SortedMap<CN, V>>  (RK = RowKey, CN=ColumnName, V=Value).   That is straight forward enough, and the only "complexity" here is that the outside HashMap is spread across machines.  No big deal.  But I'll accept that many people have trouble grok'ing the concept vs. the warm and comfy, familiar concepts of RDBMS. 

So, aside from the fact that the native Cassandra data structures might be difficult to grok, why did we add SQL to a NoSQL world?  As I said before in my post on Cassandra terminology,  I think a SQL-like language opens up avenues that weren't possible before. (application integrations with ETL tools, ORM layers, easing the transition for DBAs, etc.)  I truly appreciate why CQL was defined/selected and at the same time I'm sensitive to viewing Cassandra through SQL-colored glasses, and forcing those glasses on others.

Specifically, if it looks, tastes and smells like SQL, people will expect SQL like behavior.  People (and systems) expect to be able to construct arbitrary WHERE clauses, and JOINs.  With those expectations,  and without an understanding of the underlying storage model,  features, functions and even performance might not align well with users expectations. (IMHO, never a good thing)  We may find ourselves explaining BigTable concepts anyway, just to explain why JOINs aren't welcome here.
(or we can just point people to Dean Hiller and playORM, so he can explain why they are. =)

Also, I think we want to be careful not to hide the "simple" structures of the BigTable.  If it becomes cumbersome to interact with the BigTable (the Maps), we'll end up alienating the portion of the community that came to Cassandra for simple dynamic/flexible schemas.  That flexibility and simplicity allowed us to accomodate vast Varieties of data, one of the pillars in the 3 V's of BigData.  We don't want to lose it.

For more and discussion on this, follow and chime in on:

With those caveats in mind, I'm on board with CQL.   I intend to embrace it whole heartedly, especially given the enhancements coming down the pipe.   IMHO, CQL is more than a SQL veneer for Cassandra.  It is the foundation for future feature enhancements.  And although Thrift will be around for a long, long, long, long time, Thrift RPC will begin to fall behind CQL.  There is already evidence of that as CQL is going to provide first-class support for operations on collections in 1.2, with only limited support (via JSON) in Thrift. See:

With that conclusion, we've got our work cut out for us.  All of the enhancements we've developed for Cassandra were built on Thrift (either as AOP against the Thrift API, or directly consuming it to enable embedding).  This includes: cassandra-indexing, cassandra-triggers, and Virgil.  For each, we need to find a path forward that embraces CQL, keeping in mind that CQL is built on an entirely new protocol, listening on an entirely different port.  Additionally,  I'm looking to develop the Spring Data integration layer on CQL.

Anyone looking to get involved, we'd love a hand in migrating Virgil and Cassandra-Triggers forward and creating the initial Spring Data integration!

I'm excited about the future prospects of CQL, but it will take everyone's involvement to ensure that we get the best of all worlds from it, and that we don't lose anything in the transition.

Thursday, October 25, 2012

J2EE is dead: Long-live javascript backed by JSON services

Hello, my name is Brian O'Neill, and I've been J2EE free for almost 8 years now.  

A long time ago in a blog post far away, I asked the question "Is J2EE dead?".  Almost five years later, I'm here to say that, "Yes, J2EE is dead".   In fact, I'd say that it is now generations removed from the emerging technologies, and an ancestor to current enterprise best practices.

Originally, I thought that the MVC and HTTP/CRUD portions of J2EE would be replaced by rapid development frameworks like Rails.  The back-end non-http portions of the system (e.g. JMS) would move outside the container into Enterprise-Service Bus's (ESBs) and other non-J2EE based event-driven systems. 

In large part, I think we've seen this trend.  Increasingly, enterprises are adopting Fuse, Camel and Mule to form an event-driven backbone for the enterprise and although I don't believe we've seen as wide an adoption of Rails in the enterprise, I think we've seen a strong movement towards light-weight non-J2EE containers for web application deployment.  In fact, I didn't realize just how much the deployment landscape has changed,  until I saw this survey, which showed that half the enterprise servers are running Tomcat followed by Jetty at 16%.

We're perhaps on the extreme end, since we've abandoned war files entirely, swapping them out for the Dropwizard framework with an embedded Jetty Server. (and we're loving it)

Now, I think most enterprises are rolling out successful deployments using what I believe has become the canonical enterprise stack: Spring and Hibernate on Tomcat.  The more daring are striving for even higher realms of productivity and are breaking new ground with an even lighter stack: pure javascript backed by light-weight JSON services.

This is where we've landed.  We use straight-up javascript for our front-end.  Although I'm a huge fan of jquery, we've gone with ExtJS.  With it, we deliver a single-page webapp that can be deployed to anything capable of serving up raw HTML.  (e.g. apache web server)   No more servlets, nor more containers of any kind.  This enables our front-end developers to be *incredibly* productive.  They don't need to run a server of any kind.  We stub out the JSON services with flat files, and they can plow ahead, cranking out kick-ass look and feel and slick UX.  To be honest, they probably haven't heard the acronym J2EE. (and probably never will)

Meanwhile, back at the (server) farm, our java-heads can crank out light-weight services, which replace the stub JSON files with real services that interact with the back-end systems.  To keep them productive, again we cut the fat, we stay in J2SE and roll simple JAX-RS based services that we *could* deploy to Tomcat, but we choose to deploy as individual processes using DropWizard.  They are simple java classes that anyone can pick up and start coding after only reading the first couple chapters of Java in 21 days.

This not to say that we don't leverage elements of the J2EE stack (e.g. JMS and Servlet Filters for security, etc.), but even those we are beginning to replace with web-scale equivalents (e.g. Kafka).  I can see the day coming where J2EE will be naught but a distant memory.

"Alas poor J2EE, I knew him (well)."  We'll miss you.

(BTW -- keep your eye on the up and coming Apigee.  They hint at even more radical (and productive) architecture strategies that focus your time away from infrastructure entirely, focusing your time on only the elements of your application that are unique.  Its a glimpse of things to come.)


Wednesday, October 24, 2012

Solid NoSQL Benchmarks from YCSB w/ a side of HBase Bologna

One of the great appeals of Cassandra is its linear scalability.  Need more speed? Just add water, er... nodes to your ring.    Proving this out, Netflix performed one of the most famous Cassandra benchmarks.

Cassandra's numbers are most impressive, but Netflix didn't perform a side-by-side comparison of the available NoSQL platforms.  Admirably, Yahoo! Cloud Serving Benchmark (YCSB) endeavored to perform just such a comparison. The results of that effort were recently published by Network World.

It's not surprising that Cassandra and HBase lead the pack in many of the metrics since both are based on Google's BigTable.  It was surprising however to see HBase's latency near zero.  This warranted some digging.

Now, side-by-side comparisons are always tough because they often depend highly on system configuration and the specifics of the use case / data model used.   And in NetworkWorld's article, there is a key paragraph:
"During updates, HBase and Cassandra went far ahead from the main group with the average response latency time not exceeding two milliseconds. HBase was even faster. HBase client was configured with AutoFlush turned off. The updates aggregated in the client buffer and pending writes flushed asynchronously, as soon as the buffer became full. To accelerate updates processing on the server, the deferred log flush was enabled and WAL edits were kept in memory during the flush period"

With Autoflush disabled, writes are buffered until flush is called.  See:

If I'm reading that correctly, with autoflush disabled, the durability of a put operation is not guaranteed until the flush occurs.   This really sets up an unfair comparison with the other systems where durability is guaranteed on each write.  When buffering the data locally, nothing is sent over the wire, which naturally results in near-zero latency!

The change required to YCSB to level the playing field can be seen here:

I think its certainly worth including metrics when the autoflush is disabled because that is a valid use case, but YCSB should also include the results when autoflush is enabled.  Similarly, it would be great to see Cassandra's numbers when using different consistency levels and replication factors. Durability is something we all need, but it isn't one-size fits all. (which is why tunable consistency/replication is ideal)

I appreciate all the work the YCSB crew put in to produce the benchmark.  Thank you.  And it is not to say you should take the benchmarks with a grain of salt, but you may need some mustard to go with the HBase autoflush bologna.

Friday, October 12, 2012

Monday, October 8, 2012

CQL, Astyanax and Compound/Composite Keys: Writing Data

In my previous post, I showed how to connect the dots between Astyanax and CQL, but it focused primarily on reading.  Here is the update that connects the dots on write.

I created a sample project and put it out on github.  Let me know if you have any trouble.

Extending the previous example to accomodate writes, you need to use the AnnotatedCompositeSerializer when writing as well.  Here is the code:

    public void writeBlog(String columnFamilyName, String rowKey, FishBlog blog, byte[] value) throws ConnectionException {
        AnnotatedCompositeSerializer entitySerializer = new AnnotatedCompositeSerializer(FishBlog.class);
        MutationBatch mutation = keyspace.prepareMutationBatch();
        ColumnFamily columnFamily = new ColumnFamily(columnFamilyName,
                StringSerializer.get(), entitySerializer);
        mutation.withRow(columnFamily, rowKey).putColumn(blog, value, null);

Now, if you recall from the previous post, we had a single object, FishBlog, that represented the compound/composite column name:

public class FishBlog {
    @Component(ordinal = 0)
    public long when;
    @Component(ordinal = 1)
    public String fishtype;
    @Component(ordinal = 2)
    public String field;

We mapped this object to the following schema:
    CREATE TABLE fishblogs (
        userid varchar,
        when timestamp,
        fishtype varchar,
        blog varchar,
        image blob,
        PRIMARY KEY (userid, when, fishtype)

We had one member variable in FishBlog, field, that specified which piece of data we were writing: image or blog.  Because of how things work with CQL and the Java API, you actually need *two* mutations to create a row.  Here is the code from the unit test:

       AstyanaxDao dao = new AstyanaxDao("localhost:9160", "examples");
       FishBlog fishBlog = new FishBlog();
       dao.writeBlog("fishblogs", "bigcat", fishBlog, "myblog.".getBytes());
       FishBlog image = new FishBlog();
       image.when = now;
       byte[] buffer = new byte[10];
       buffer[0] = 1;
       dao.writeBlog("fishblogs", "bigcat", image, buffer);

All parts of the primary key need to be the same between the the different mutations.  Then, after you perform both mutations, you'll get a row back in CQL that looks like:

cqlsh:examples> select * from fishblogs;
 userid | when                     | fishtype | blog            | image
 bigcat | 2012-10-08 12:08:10-0400 |  CATFISH | this is myblog. | 01000000000000000000

Hopefully this clears things up for people.

Wednesday, October 3, 2012

"File name too long" in scala/sbt (on ubuntu)

When building Kafka on mac osx, I had no problem.  I moved to Ubuntu and received an error:

[info] Compiling main sources...
[error] File name too long
[error] one error found

The problem is that you are most likely on an encrypted filesystem.  If you are using Ubuntu, you might not even remember that you selected to have an encrypted filesystem.  It presented an option when you installed to encrypt your home directory.  You probably answered yes. ;)

Anyway, to get compiling move the source to another filesystem/mount.  You'll be good to go after that.

Tuesday, September 11, 2012

Compound/Composite Keys: Connecting the dots between CQL3, Astyanax and Hector

I still don't know where I stand on the terminology debate, but I know things can get confusing if you are accessing your database from CQL as well as Java.  They are very different views of the world.  Hopefully this article can bridge that gap and explain the different views with respect to composite keys.

Before we get started, we should get our terminology straight.  Cassandra has a CompositeType data type.  This is *not* what we'll be discussing.  Instead, we're discussing compound column names.  CQL3 calls these compound keys, while Astyanax and Hector call them Composites.

Underneath the hood, Cassandra's storage model hasn't changed to accomodate composite keys.  Composite keys are really just a fancy bit-wise concatenation of the key components prior to storage and/or retrieval.  The storage model remains, rowkey -> columnname -> value.

Thus, you have to bend your brain a bit when seeing Cassandra through CQL.  Since multiple "columns" as viewed by CQL, will become a fewer number of columns in the physical storage.  And since the Java APIs provide direct access to the storage model, it will appear as one column through the Java APIs (at least initially).

Let's consider the following CQL as our example:

CREATE TABLE fishblogs (
  userid varchar,
  when timestamp,
  fishtype varchar,
  blog varchar,
  image blob,
  PRIMARY KEY (userid, when, fishtype)

Although its not apparent from the CQL, the PRIMARY KEY statement uses the ordering of the parameters to map the schema to Cassandra's storage model.  Specifically, the five columns declared above will result in two columns when an insert statement is run.  Here's how Cassandra maps the schema to storage.
row_key => userid  (since it is the first key declared)
column_name_1 => when + fishtype + blog  (since when and fishtype are keys, and blog is a value it needs to store)
column_name_1 => when + fishtype + image (since when and fishtype are keys and image is a value it needs to store)
If I can paraphrase the mapping... the first primary key becomes the row key.  Subsequent primary keys comprise the prefix components of the composite column name.  One column is generated for each non-primary key column.   The composite key combines the common prefix (comprising the primary keys) plus the column name for the value.

If we perform the following insert:

cqlsh:hms_data> insert into fishblogs (userid, when, fishtype, blog, image) values ('boneill42', 1343925155443, 'CATFISH', 'Caught the big one.', '632a726f636b73');
cqlsh:hms_data> select * from fishblogs;
 userid    | when                     | fishtype | blog                | image
 boneill42 | 2012-08-02 12:32:35-0400 |  CATFISH | Caught the big one. | 632a726f636b73

CQL presents this as five columns, but under the hood there are really only two (column name -> value):
[8?.vsCATFISHblog]->[Caught the big one.]

In the above, you can see the concatenated keys in the column name.  This is the raw output you receive from a query using Astyanax, when you don't have composite keys in place.  This the abbreviated code:
ColumnFamily columnFamily = new ColumnFamily(columnFamilyName, StringSerializer.get(), StringSerializer.get());
OperationResult> result = this.keyspace.prepareQuery(columnFamily).getKey(rowKey).execute();

In order to have the same view as CQL, we need to map the composite key to a class and map the fields of that class.  For example,
public class FishBlog {
    @Component(ordinal = 0)
    public long when;
    @Component(ordinal = 1)
    public String type;
    @Component(ordinal = 2)
    public String field;
    public FishBlog() {
What seems a little unnatural about this mapping is the "field" member variable.  Ideally, we'd prefer two member variables: "image" and "blog".  But since the data is spread across multiple columns, each containing the value for a separate field, the class has to be flexible enough to carry either (but not both).  

We can then use the following Astyanax code to retrieve data:
AnnotatedCompositeSerializer entitySerializer = new AnnotatedCompositeSerializer(FishBlog.class);
ColumnFamily columnFamily = new ColumnFamily(columnFamilyName, StringSerializer.get(), entitySerializer); OperationResult> result = this.keyspace.prepareQuery(columnFamily).getKey(rowKey).execute();
return result.getResult();

This results in the following ouput, when accessing the result set, which contains FishBlog's.
fishBlog.when=>[Thu Aug 02 12:32:35 EDT 2012]
Clear as mud?  Now, to complete the picture I'll show the same code for Hector.  Here is the fetch:
ColumnFamilyTemplate template = new ThriftColumnFamilyTemplate(this.keyspace, columnFamilyName, new StringSerializer(), new CompositeSerializer());
return template.queryColumns(rowKey);
Then, what may feel a little weird is that the Composite object, which contains the component keys for your object, comes from the column name:
for (Composite columnName : columns.getColumnNames()){
   FishBlog fishBlog = new FishBlog(columnName);
   LOG.debug("fishBlog.when=>[" + new Date(fishBlog.getWhen()) + "]");
   LOG.debug("fishBlog.type=>[" + fishBlog.getType() + "]");
   LOG.debug("fishBlog.field=>[" + fishBlog.getField() + "]");
   LOG.debug("fishBlog.value=>[" + columns.getString(columnName) + "]");

As you can see from above, I recommend wrapping the composite in an object that provides easy accessors. (which makes it feel like Astyanax)  Here is my FishBlog object for Hector:
public class FishBlog {
    private Composite composite;
    public FishBlog(Composite composite) {
        this.composite = composite;
    public long getWhen() {
        return composite.get(0, new LongSerializer());
    public String getType() {
        return composite.get(1, new StringSerializer());
    public String getField() {
        return composite.get(2, new StringSerializer());
OK -- hopefully that demonstrates how the five columns from the CQL perspective translates into two physical columns that can be accessed via Hector or Astyanax, resulting in two entries in the result set mapped to java objects that you can use to get at the values.  I'll try to get the code for these examples up on github.

Thursday, August 23, 2012

Cassandra APIs : The Laundry List

We had a question come across the mailing list regarding the available Java APIs.  That spawned the following post.  These are the Cassandra APIs I'm aware of.  Below commentary is entirely subjective and based solely on my experiences with the APIs, which in some cases was limited.  If I've missed any, please let me know.  This can be an organic blog entry that we'll eventually move to some place more official.

So, FWIW...

Cassandra APIs

  • Java

  • Hector (Production-Ready)
    • The most stable of the Java APIs, ready for prime-time.
  • Astyanax (The Up and Comer)
    • A clean Java API from Netflix.  It isn't as widely used as Hector, but it is solid.  In some of our uses, we've swapped out Hector for Astyanax.
  • Kundera (The NoSQL ORM)
    • JPA compliant, this is handy when you want to interact with Cassandra via objects.  This constrains you somewhat in that you won't be able to have dynamic number of columns/names, etc.  But it does allow you to port over ORMs, or centralize storage onto Cassandra for more traditional uses.
  • Pelops 
    • I've only used Pelops briefly.  It was a straight forward API, but didn't seem to have the momentum behind it. 
  • PlayORM (ORM without the constraints?)
    • I just heard about this one (Thanks for the pointer Dean).  It  looks like it is trying to solve the impedance mismatch between traditional JPA-based ORMs and NoSQL by introducing JQL.  It looks promising.
  • Spring Data Cassandra (Entirely Proof of Concept!)
    • We're big Spring-heads.   We use Spring Data elsewhere in our system.  There was a MongoDB implementation, but alas no Cassandra implementation.  To get consistency across our DAO layer, we decided to start work on the Spring Data Cassandra implementation.  Still a work in progress.
  • Thrift (Avoid Me!)
    • This is the "low-level" API.  I almost consider it the internal API/protocol for Cassandra.  Avoid using thrift directly unless you have cause to do so.
  • REST
  • Virgil  (Our Baby)
    • Built on Dropwizard, we use Virgil for loosely coupled integrations between systems.  It is also a simple way to get newbies up and running (firing curl's at the DB).  We also use it for Ruby/Cucumber integration.
  • restish
    • Not sure if this one is still maintained, but I wanted to include it for completeness. (and as not to appear biased)
  • Python
  • Pycassa (The only game in town?)
    • As far as I know, this is *the* python API for Cassandra.  Please let me know if there are others worth considering.
  • PHP
  • PHPcassa (Been around the block and back)
    • Another rock solid library that's been in the game for a while. This is your best bet in the PHP world.
  • Ruby
  • Ruby Gem 
    • I use this to create small little scripts for ETL, etc.  I had some trouble when I tried to use it for substantial jobs, which is why we fell back and decided to use the REST API instead.
  • Node.js
    • Node-cassandra-client
      • A node.js client released by Rackspace.
  • Perl 

Wednesday, August 8, 2012

Stoked to be selected as a Cassandra MVP!

Thanks everyone.  I'm honored to be selected as a Cassandra MVP.   At Health Market Science (HMS), we've benefited tremendously from Cassandra, and we're happy to contribute back.

The Cassandra community is one of the strongest most passionate open-source crews I've had the pleasure of hanging with.   There are fun times ahead as Cassandra's gains even more momentum in the industry.

Thanks again.

Saturday, August 4, 2012

A Big Data Trifecta: Storm, Kafka and Cassandra

We're big fans of Cassandra.  We also use Storm as our distributed processing engine.  We've had a lot of success using our Cassandra Bolt to create a successful marriage between the two.  To date, we've been using Storm to integrate with our legacy technologies via our JMS Spout.  Now we're looking to expand its role beyond legacy system integration.

In Storm's new role, the work load is orders of magnitude greater and although JMS worked well in the previous integration scenarios, we knew it might not be the best solution to accommodate the volume of work we anticipate. We need to support millions of messages on the queues.   This is not the typical application of JMS and is exactly the reason LinkedIn open sourced Kafka:

"We first looked at several existing queuing solutions in the market. The most popular ones are based on JMS. Although JMS offers a rich set of features, it also adds significant overhead in the message representation. Additionally, some JMS implementations are optimized for the case when all messages can be cached in memory and their performance starts to degrade significantly when the in-memory buffer is saturated. Finally, most existing solutions don’t have a clean design for scaling out."

To validate our assumptions, we needed to put Kafka through its paces.  That meant plugging it into our Storm topology.  For those that don't know Storm, think of it aa "Big Data ESB" optimized for processing streams of data that are broken down into discrete packets called Tuples.  Spouts emit tuples.  Bolts consume them. Storm plays the role of message router between the components.

We already had our Cassandra Bolt in place.  All I needed to do was swap out our JMS Spout, with a Kafka Spout.   Here is what the topology looked like:

        TopologyBuilder builder = new TopologyBuilder();
        List hosts = new ArrayList();

        SpoutConfig spoutConfig = SpoutConfig.fromHostStrings(hosts, 1, "test", "/foo", "foo");
        spoutConfig.zkServers = ImmutableList.of("localhost");
        spoutConfig.zkPort = 2181;
        spoutConfig.scheme = new StringScheme();
        builder.setSpout("spout", new KafkaSpout(spoutConfig));

        DefaultBatchingCassandraBolt bolt = new DefaultBatchingCassandraBolt(new MyColumnFamilyMapper(), new MyRowKeyMapper(), new MyColumnsMapper());
        builder.setBolt("loader", bolt).shuffleGrouping("spout");

This topology simply connects a Kafka Spout to a Cassandra Bolt.

 (WARNING: The above code leverages a change to the Cassandra bolt that is still only in my fork.  It may not work for you. Watch this pull request.)

I then queued 10 million JSON records in Kafka. (which took about 5 minutes running locally on a macbookpro)  I then unleashed the topology.

Now, Kafka is *fast*.  When running the Kafka Spout by itself, I easily reproduced Kafka's claim that you can consume "hundreds of thousands of messages per second".  When I first fired up the topology, things went well for the first minute, but then quickly crashed as the Kafka spout emitted  too fast for the Cassandra Bolt to keep up.  Even though Cassandra is fast as well, it is still orders of magnitude slower than Kafka.

Fortunately, since Storm interacts with its Spout's using a pull model, it provides a way to throttle back the messaging.  I added the following parameter to the Config.

config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 5000);

This limits the number of un-acked tuples in the system.  With the AckStrategy set to ACK_ON_WRITE within the Cassandra Bolt, this established a safe way for the Bolt to communicate back to the Spout that it is "ready for some more".

With this topology, we saw consistent throughput of 5000 writes per second to Cassandra. (running locally on my MBP).  That will work nicely when deployed to the cluster. =)

Kafka has some other nice characteristics that make it well suited for big data applications.  I'll go into the details of those in a future post.

* Kudos to Taylor Goetz.  He has done some great work on the storm components that's made this possible.

Wednesday, August 1, 2012

Example Distributed Queue using Zookeeper (via Curator)

We use Storm for distributed processing.  We've been using JMS as a means of driving work into Storm via the spout we developed:

But since Storm uses Zookeeper under the hood, we thought we could perhaps use Zookeeper instead of JMS for certain low-throughput distributed messaging needs, thereby decreasing the number of technologies we needed to maintain in our stack for certain applications.

Fortunately, there is a Distributed Queue recipe for Zookeeper.

Even better, Netflix has bundled an implementation of this recipe into Curator.

The docs for the Curator recipe were adequate, but I didn't see a complete example.  After some muddling around, I was able to get it working.  You can find the code here:

In the end, the throughput was just too slow. (You could see the messages scroll by on the screen as they were being consumed)

Eventually, I punted and decided to take a look at Kafka.  That experiment is going *extremely* well.  Kafka is very impressive.  More on that in a future post. =)

Monday, July 23, 2012

Polyglotism. Losing your (rubyist) religion in Philly.

An interesting email came across the Philly Ruby list posing the question, "Is Ruby dead in Philadelphia?", noting that there appears to be more ruby jobs in SF and NYC then Philly.  It provoked a good conversation.  Here's my take... (echoing a bit of Andrew Libby's sentiment)

Universally, people need their problems solved.   The way they solve their problems depends on their priorities.  Small budgets and tight timeframes prioritize time-to-market above all else.  RoR typically expedites time-to-market. (IMHO)  Since startups often find themselves in this situation, they often pickup RoR because it aligns with their priorities.  It follows that since NYC and SF both have stronger startup communities than Philadelphia, there is a greater demand for Ruby in those areas.

Outside of that however, I'd suggest that Ruby is alive and well in Philadelphia, but maybe not as visible.   Its use may not manifest itself in job postings.  More and more often, solutions comprise an assortment of technologies.  Even the larger enterprises in Philly (Comcast, Pharma, etc.) are beginning to take a polyglot approach to development.  They are using the "best tool for the job" to solve their problem.  The days of endless debates arguing over which language is better are waning.

Why compare apples to oranges, when really -- "Sometimes you feel like a nut, sometimes you don't"?

Losing your religious affiliation to a language opens up a world of possibilities.

We've got no religion at Health Market Science.   We use ruby, perl, java, clojure, etc.  We use ruby,  not for its rapid development characteristics, but because it is dynamic.  There are pieces of our system that need to be flexible.  Some of our code even needs to change at runtime.  Ruby fit the bill.  But our services platform is written in Java where we get warm fuzzies from type-safety, etc.

Now, what's that mean for job descriptions?  Well, I'm not sure.  We still tend to put specific technologies in the descriptions, but the expectation is otherwise.   We look to hire good software engineers.  I'd expect any good software engineer could be productive in any language in short order.  That's not to say that knowing idiosyncrasies of languages isn't useful, its just not of paramount importance in today's multi-language, multi-technology environments.

So... if you are looking for a Ruby job in Philly, you may want to look for a Java job in an environment that requires ruby. =)

Wednesday, July 18, 2012

Spring Data w/ Cassandra using JPA

We recently adopted the use of Spring Data.  Spring Data provides a nice pattern/API that you can layer on top of JPA to eliminate boiler-plate code.

With that adoption, we started looking at the DAO layer we use against Cassandra for some of our operations.  Some of the data we store in Cassandra is simple.  It does *not* leverage the flexible nature of NoSQL.  In other words, we know all the table names, the column names ahead of time, and we don't anticipate them changing all that often.

We could have stored this data in an RDBMs, using hibernate to access it, but standing up another persistence mechanism seemed like overkill.  For simplicity's sake, we preferred storing this data in Cassandra.  That said, we want the flexibility to move this to an RDBMs if we need to.

Enter JPA.

JPA would provide us a nice layer of abstraction away from the underlying storage mechanism.  Wouldn't it be great if we could annotate the objects with JPA annotations, and persist them to Cassandra?

Enter Kundera.

Kundera is a JPA implementation that supports Cassandra (among other storage mechanisms).  OK -- so JPA is great, and would get us what we want, but we had just adopted the use of Spring Data.  Could we use both?

The answer is "sort of".

I forked off SpringSource's spring-data-cassandra:

And I started hacking on it.  I managed to get an implementation of the PagingAndSortingRepository for which I wrote unit tests that worked, but I was duplicating a lot of what should have come for free in the SimpleJpaRepository.  When I tried to substitute my CassandraJpaRepository for the SimpleJpaRepository, I ran into some trouble w/ Kundera.  Specifically, the MetaModel implementation appeared to be incomplete.  MetaModelImpl was returning null for all managedTypes().  SimpleJpa wasn't too happy with this.

Instead of wrangling with Kundera, we punted.  We can achieve enough of the value leveraging JPA directly. 

Perhaps more importantly, there is still an impedance mismatch between JPA and NoSQL.  In our case, it would have been nice to get at Cassandra through Spring Data using JPA for a few cases in our app, but for the vast majority of the application, a straight up ORM layer whereby we know the tables, rows and column names ahead of time is insufficient. 

For those cases where we don't know the schema ahead of time, we're going to need to leverage the converters pattern in Spring Data.  So, I started hacking on a proper Spring Data layer using Astyanax as the client.  Follow along here:

More to come on that....

Saturday, July 7, 2012

Cassandra-Triggers upgraded to support Cassandra 1.1.2

We just released version 1.0.1 of Cassandra Triggers, upgraded to support Cassandra 1.1.2.

With Cassandra 1.1's Schema Management Renaissance, we felt comfortable with run-time schema creation.  Now, Cassandra Triggers automatically creates the requisite column families for you.  The system creates the Configuration column family and a pair of column families per host to maintain the event log. 

This makes it easier than ever to get triggers setup.  The GettingStarted page should be all you need to get up and running with the TestTrigger bundled into the release.

As always, let us know if you have any trouble.

Thanks to Andrew Swan for his help uncovering issues and motivating the upgrade.

Tuesday, July 3, 2012

NoSQL/Cassandra Terminology : Risks and Rewards

Recently, there's been growing support to change the terminology we use to describe the data model of Cassandra.  This has people somewhat divided and although I've gone on record as supporting the decision.  I too am a bit torn.  I can appreciate both perspectives, and there are both risks and rewards associated with the switch.

The two controversial terms are Keyspace and Column Family.  The terms roughly correlate to the more familiar relational equivalents: Schema and Table. I think that it is a fairly easy transition to change from Keyspace to Schema.  Logically speaking, in relational databases, a schema is collection of tables.  Likewise, in Cassandra, a Keyspace is a collection of Column Families. 

The sticky point is Column Family.   Conceptually, everyone can visualize a table as an nxm matrix of data.  Although you can mentally map a Column Family into that same logical construct, buyer beware.

The Risks:

A data model for a column-oriented database is typically *much* different from an analogous model designed for an RDBMS.  To achieve the same capabilities that a relational database provides on tables, you need to model your data differently to support "standard" relational queries.   Assuming a column family has the same capabilities as a table will lead you to all sorts of headaches. (e.g. consider Range Queries and Indexing)

When data modeling, I don't relate column families to tables at all.  For me, its easier to think of column families as a map of maps.  Then just remember that the top-level map can be distributed across a set of machines.  Using that mental model you are more likely to create a data model that is compatible with a column-oriented database.  Think of column families as tables, and you may get yourself into trouble that will require significant refactoring.

The Rewards:

With a strong movement towards polyglot persistence architectures, and tools that need to span the different persistence mechanisms, I can see a strong motivation to align terminology.  (Consider ETL tools (e.g. Talend), design tools (e.g. Erwin), even SQL clients (e.g. good old Toad)) 

The popularity of Cassandra's CQL is further evidence that people want to interact with NoSQL databases using tried-and-true SQL (ironically).  And maybe we should "give the people what they want" especially if it simultaneously eases the transition for new comers.

The Big Picture:

Theologically, and in an ideal world, I agree with Jonathan's point:
"The point is that thinking in terms of the storage engine is difficult and unnecessary. You can represent that data relationally, which is the Right Thing to do both because people are familiar with that world and because it decouples model from representation, which lets us change the latter if necessary"

Pragmatically, I've found that it is often necessary to consider the storage engine at least until that engine has all the features and functions that allow me to ignore it.

Realistically, any terminology change is going to take a long time.  The client APIs probably aren't changing anytime soon, (Hector, Astyanax, etc.) and the documentation still reflects the "legacy" terminology.  It's only on my radar because we decided to evolve the terminology in the RefCard that we just released. 

Only time will tell what will come of "The Great Cassandra Terminology Debates of 2012", but guaranteed there will be people on both sides of the fence -- as I find myself occasionally straddling it. =)

Thursday, June 21, 2012

Indexing JSON in Cassandra

Cassandra has native indexing capabilities, but those capabilities only work if the values stored in your columns are the values you want indexed.  If the data is structured in some way (e.g. JSON or Protobuf), it is impossible to leverage Cassandra's indexing capabilities out of the box.

A while back we had a discussion on the dev list regarding indexing attributes stored in columns in Cassandra.  I think Jermiah Jordan had the best description of the problem.  Lately, we found ourselves with the same problem so we ended up building it into our cassandra-indexing extension.

Our cassandra-indexing module, is an AOP-based extension to Cassandra.  Drop the jar file in the Cassandra lib directory, update the start script to include the extension and the aspect will take care of indexing information as you mutate the data in Cassandra.

To support indexing a specific field within the column value, we added the ability for a user to specify a field to index (not just a column name).  We then parse the JSON document as it is written in the mutation, extract the values and create entries in the wide-row index for that column family.

Specifics for the new JSON-based indexing configuration can be found on the wiki.  Admittedly, we are still light on documentation.  If you have any trouble using the new capability, let us know.

Tuesday, June 19, 2012

Cassandra as an integral part of a Big Data platform

There is more to a successful Big Data platform than simply deploying a NoSQL database.  That database needs to integrate with the rest of your architecture, and nowadays that often requires a polyglot approach to persistence. 

As part of the release of the new Apache Cassandra RefCard, we did an interview on Cassandra's role in a complete Big Data platform.  The interview covers the challenges of and solutions for integrating NoSQL into a Big Data enterprise solution.

Monday, June 18, 2012

Apache Cassandra Cheat Sheet / RefCard

Months in the making, we just published the Apache Cassandra RefCard!

Thanks to all that helped out.

Specific shout outs to:
David Strauss @ Pantheon
Kris Hahn @ Datastax
and Jonathan Ellis @ Datastax

thanks guys.

Monday, May 28, 2012

Dropwizard and Spring (sort of)

We're big Dropwizard fans.  If ever I have to, its going to be tough for me to go back to Tomcat and war files.   Dropwizard is lean and mean.

But in all of Dropwizard's beautiful simplicity, sometimes you need a little bit more.  You might need a JMS connection, JPA access to a database, or a scheduled job.  Dropwizard has simple facilities for some of the same functions (e.g. JDBI), but if you need capabilities beyond that, it is sometimes best to fall back on tried and true (and incredibly popular) frameworks like Spring.

For JMS, Spring JMS is simple and rock solid.
For JPA, Spring Hibernate takes the pain out of ORM. (or at least most of it)
For scheduling, Spring + Quartz is great.
For workflow, I've even become a fan of Activiti, which also has Spring integration.

OK -- so we're motivated to build the bridge between these lovely frameworks, but what does that even mean?

The heaviest point of intersection between the two is configuration.  Dropwizard loves yaml.  Spring loves xml.  And both want to control the load process.  Although we debated (and continue to debate) it,  in the end we decided that it was best to let Spring drive because we believe its necessary to take full advantage of Spring's ability to autowire an application.

Thus, in our initialize method of our com.yammer.dropwizard.Service.initialize() method we load our ApplicationContext off of the classpath.

protected void initialize(CirrusServiceConfiguration conf, Environment env) throws Exception {
   applicationContext = new ClassPathXmlApplicationContext("beans.xml");
   RestResource restResource = (RestResource) applicationContext.getBean("restResource");

Each of our Dropwizard JAX-RS Resources we declare as spring components (with @Component), which are then picked up during a component scan.  As shown above, we can then pull them out of the application context and register them with Dropwizard.

Now, that's fairly clean.  Dropwizard controls the main(), while Spring controls the configuration load.  The challenge comes if you want to maintain only one single environment specific configuration file.   Dropwizard will want that environment configuration in yaml, and will parse the config using Dropwizard conventions.   But without further work, you'd have to maintain another set of xml or properties files to keep Spring happy.

We found an "acceptable" solution, by integrating snakeyaml into the Spring config.  Fortunately, Mea came up with a pretty nice piece of code for this.  Follow that blog entry, pointing the placeholder at the Dropwizard configuraiton.

Doing that allowed us to use properties defined in the Dropwizard yaml in our spring configuration.  For example, you could refer to ${fooDb.user} in your Spring config, if there were a hierarchical property in your dropwizard yaml:

  user: ora_foo_readonly

This works and centralizes the configuration, but the keen eye will notice that we are parsing the configuration twice.  We're first allowing Dropwizard to parse it, then we are parsing it again with Snakeyaml for the Spring specific configurations.  This doesn't necessarily make for a happy environment because Dropwizard will encounter properties it doesn't understand.  And it will complain, sometimes loudly, if it doesn't have corresponding properties and classes that correspond to those values.  Alas, presently that is the price you must pay.

I know others are working this problem, and a solution may be forthcoming, but for now -- this allows us to move forward with both Spring and Dropwizard in a single application with a single centralized configuration.

Please let us know if people have alternative solutions for integrating these stellar frameworks.

Wednesday, May 2, 2012

Dumping Data from Cassandra (like mysqldump, to create a seed script)

To manage our database migrations for Cassandra, we've taken to creating scripts that create/modify the schema and populate configuration data. In my previous post, I showed how to dump and load the schema (keyspaces and column families).

Here is a code snippet that lets you dump the data into set statements that you can then load via the CLI.

     public void dumpColumnFamily() {  
         String columnFamily = "YOUR_COLUMN_FAMILY";  
         Cluster cluster = HFactory.getOrCreateCluster("dev", "localhost:9160");  
         Keyspace keyspace = HFactory.createKeyspace("YOUR_KEYSPACE", cluster);  
         RangeSlicesQuery<String, String, String> rangeSlicesQuery = HFactory  
                 .createRangeSlicesQuery(keyspace, StringSerializer.get(),  
                         StringSerializer.get(), StringSerializer.get());  
         rangeSlicesQuery.setKeys("", "");  
         rangeSlicesQuery.setRange("", "", false, 2000); // MAX_COLUMNS  
         rangeSlicesQuery.setRowCount(2000); // MAX_ROWS  
         QueryResult<OrderedRows<String, String, String>> result = rangeSlicesQuery .execute();  
         OrderedRows<String, String, String> orderedRows = result.get();  
         for (Row<String, String, String> r : orderedRows) {  
             ColumnSlice<String, String> slice = r.getColumnSlice();  
             for (HColumn<String, String> column : slice.getColumns()) {  
                 System.out.println("set " + columnFamily + "['" + r.getKey()  
                         + "']" + "['" + column.getName() + "'] = '"  
                         + column.getValue() + "';");  

Friday, April 13, 2012

Dumping/Loading schema in Cassandra

This handy command line will dump a schema from Cassandra:
echo -e "use your_keyspace;\r\n show schema;\n" | bin/cassandra-cli -h localhost > mySchema.cdl

I always forget the "-e" on echo, which is why I thought I would blog this.  We've started using ".cdl" as the extension, short for "Cassandra DDL".

Coincidentally, you can then load it with:
bin/cassandra-cli -h localhost -f mySchema.cdl

Hope people find this useful.

Monday, April 2, 2012

Cassandra vs. (CouchDB | MongoDB | RIak | HBase)

Here is why in "Cassandra vs.", it's Cassandra FTW!

Our organization processes thousands of data sources continuously to produce a single consolidated view of the healthcare space.  There are two aspects of this problem that are challenging.  The first is schema management, and the second is processing time.

Creating a flexible RDBMS model to accomodate thousands disparate data sources is difficult, especially as those schemas change over time.  Even given a flexible relational model, to properly access and manipulate data in that model is complicated.  That complexity bleeds into application code and hampers analytics.

Given the volume of data and the frequency of updates, standardizing, indexing, analyzing and processing that data takes days of time across dozens and dozens of machines.   And even with round the clock processing, the business and customer appetites for additional and more current analytics are insatiable.

Trying to scale the RDBMS system vertically through hardware eventually has its limits.  Scaling horizontally through sharding becomes a challenge.  Operations and Maintenance (O&M) is difficult and requires a lot of custom coding to accommodate the partitioning.

We needed a distributed data system that provided:
  • Flexible Schema Management
  • Distributed Processing
  • Easy Administration (to lower O&M costs)
Driven by the need for flexible schemas, we turned to NoSQL.  We considered: MongoDB, CouchDB, HBase, and Riak.   Immediately we set out to see what support each of these had support for "real" map/reduce.  Given the processing we do, we knew we would eventually need support for all of Hadoop's goodness.  This includes extensions like Pig, Hive, and Cascading.

CouchDB dropped out here.  It supports map/reduce, but little or no notable support for Hadoop proper.  MongoDB scored "acceptable", but the Hadoop support was not nearly as evolved as the support in Cassandra.  Datastax actually distributes an enterprise version of Cassandra that fully integrates the Hadoop runtime.   Thus, we left MongoDB for another day and scored HBase's Hadoop support off the charts.

Riak is interesting in that they provide very slick native support for  map/reduce ( via REST, while they also provide a nice bridge from Hadoop.  I must admit.  We were *very* attracted to the REST interface. (which is why we eventually went on to create Virgil for Cassandra)

Left with Riak, HBase and Cassandra, we layered in some non-functional requirements.  First, we needed to be able to get solid third-party support.   Unfortunately, this is where Riak fell out.   Basho provides support for Riak, but Datastax and Cloudera were names we were familiar with.  

NOW -- Down to HBase and Cassandra.  For this comparison, I won't bother re-iterating all the great points from Dominic William's great post.   Given that post and a few others, we decided on Cassandra.

Now, since choosing Cassandra, I can say there are a few other *really* important less tangible considerations.  The first, is the code base.  Cassandra has an extremely clean and well maintained code base.  Jonathan and team do a fantastic job managing the community and the code.  As we adopted NoSQL, the ability to extend the code-base and incorporate our own features has proven invaluable. (e.g. triggers, a REST interface, and server-side wide-row indexing)  

Secondly, the community is phenomenal. That results in timely support, and solid releases on a regular schedule.   They do a great job prioritizing features, accepting contributions, and cranking out features. (They are now releasing ~quarterly)   We've all probably been part of other open source projects where the leadership is lacking, and features and releases are unpredictable, which makes your own release planning difficult.  Kudos to the Cassandra team.

Wednesday, March 21, 2012

Cassandra Indexing: The good, the bad and the ugly

Within NoSQL, the operations of indexing, fetching and searching for information are intimately tied to the physical storage mechanisms.  It is important to remember that rows are stored across hosts, but a single row is stored on a single host. (with replicas)  Columns families are stored in sorted order, which makes querying a set of columns efficient (provided you are spanning rows).

The Bad : Partitioning

One of the tough things to get used to at first is that without any indexes queries that span rows can be (very) bad.  Thinking back to our storage model however, that isn't surprising.  The strategy that Cassandra uses to distribute the rows across hosts is called Partitioning. 

Partitioning is the act of carving up the range of rowkeys assigning them into the "token ring", which also assigns responsibility for a segment (i.e. partition) of the rowkey range to each host.  You've probably seen this when you initialized your cluster with a "token".  The token gives the host a location along the token ring, which assigns responsibility for a section of the token range. Partitioning is the act of mapping the rowkey into the token range.  

There are two primary partitioners: Random and Order Preserving.  They are appropriately named.  The RandomPartitioner hashes the rowkeys into tokens.  With the RandomPartitioner, the token is a hash of the rowkey.  This does a good job of evenly distributing your data across a set of nodes, but makes querying a range of the rowkey space incredibly difficult. From only a "start rowkey" value and an "end rowkey" value, Cassandra can't determine what range of the token space you need.  It essentially needs to perform a "table scan" to answer the query, and a "table scan" in Cassandra is bad because it needs to go to each machine (most likely ALL machines if you have a good hash function) to answer the query.

Now, at the great cost of even data distribution, you can employ the OrderPreservingPartitioner (OPP).  I am *not* down with OPP.  The OPP preserves order as it translates rowkeys into tokens.  Now, given a start rowkey value and a end rowkey value, Cassandra *can* determine exactly which hosts have the data you are looking for.  It computes the start value to a token the end value to a token, and simply selects and returns everything in between. BUT, by preserving order, unless your rowkeys are evenly distributed across the space, your tokens won't be either and you'll get a lopsided cluster, which greatly increases the cost of configuration and administration of the cluster. (not worth it)

The Good : Secondary Indexes

Cassandra does provide a native indexing mechanism in Secondary Indexes.  Secondary Indexes work off of the columns values.   You declare a secondary index on a Column Family.  Datastax has good documentation on the usage.  Under the hood,  Cassandra maintains a "hidden column family" as the index. (See Ed Anuff's presentation for specifics)  Since Cassandra doesn't maintain column value information in any one node, and secondary indexes are on columns value (rather than rowkeys), a query still needs to be sent to all nodes.  Additionally, secondary indexes are not recommended for high-cardinality sets.  I haven't looked yet, but I'm assuming this is because of the data model used within the "hidden column family".  If the hidden column family stores a row per unique value (with rowkeys as columns), then it would mean scanning the rows to determine if they are within the range in the query.

From Ed's presentation:
  • Not recommended for high cardinality values(i.e.timestamps,birthdates,keywords,etc.)
  • Requires at least one equality comparison in a query--not great for less-than/greater-than/range queries
  • Unsorted - results are in token order, not query value order
  • Limited to search on datatypes, Cassandra natively understands
With all that said, secondary indexes work out of the box and we've had good success using them on simple values.

The Ugly : Do-It-Yourself (DIY) / Wide-Rows

Now, beauty is in the eye of the beholder.  One of the beautiful things about NoSQL is the simplicity.  The constructs are simple: Keyspaces, Column Families, Rows and Columns.  Keeping it simple however means sometimes you need to take things into your own hands.

This is the case with wide-row indexes.  Utilizing Cassandra's storage model, its easy to build your own indexes where each row-key becomes a column in the index.  This is sometimes hard to get your head around, but lets imagine we have a case whereby we want to select all users in a zip code.  The main users column family is keyed on userid, zip code is a column on each user row.  We could use secondary indexes, but there are quite a few zip codes.  Instead we could maintain a column family with a single row called "idx_zipcode".  We could then write columns into this row of the form "zipcode_userid".  Since the columns are stored in sorted order, it is fast to query for all columns that start with "18964" (e.g. we could use 18964_ and 18964_ZZZZZZ as start and end values).

One obvious downside of this approach is that rows are self-contained on a host. (again except for replicas)  This means that all queries are going to hit a single node.  I haven't yet found a good answer for this.

Additionally, and IMHO, the ugliest part of DIY wide-row indexing is from a client perspective.  In our implementation, we've done our best to be language agnostic on the client-side, allowing people to pick the best tool for the job to interact with the data in Cassandra.  With that mentality, the DIY indexes present some trouble.  Wide-rows often use composite keys (imagine if you had an idx_state_zip, which would allow you to query by state then zip).  Although there is "native" support for composite keys, all of the client libraries implement their own version of them (Hector, Astyanax, and Thrift).   This means that client needing to query data needs to have the added logic to first query the index, and additionally all clients need to construct the composite key in the same manner.

Making It Better...
For this very reason, we've decided to release two open source projects that help push this logic to the server-side.  The first project is Cassandra-Triggers.  This allows you to attached asynchronous activities to writes in Cassandra.  (one such activity could be indexing)  We've also released Cassandra-Indexing.  This is hot off the presses and is still in its infancy (e.g. it only supports UT8Types in the index), but the intent is to provide a generic server-side mechanism that indexes data as its written to Cassandra.  Employing the same server-side technique we used in Cassandra-Indexing, you simply configure the columns you want indexed, and the AOP code does the rest as you write to the target CF.  As always, questions, comments and thoughts are welcome. (especially if I'm off-base somewhere)