Thursday, August 23, 2012

Cassandra APIs : The Laundry List

We had a question come across the mailing list regarding the available Java APIs.  That spawned the following post.  These are the Cassandra APIs I'm aware of.  Below commentary is entirely subjective and based solely on my experiences with the APIs, which in some cases was limited.  If I've missed any, please let me know.  This can be an organic blog entry that we'll eventually move to some place more official.

So, FWIW...

Cassandra APIs

  • Java

  • Hector (Production-Ready)
    • The most stable of the Java APIs, ready for prime-time.
  • Astyanax (The Up and Comer)
    • A clean Java API from Netflix.  It isn't as widely used as Hector, but it is solid.  In some of our uses, we've swapped out Hector for Astyanax.
  • Kundera (The NoSQL ORM)
    • JPA compliant, this is handy when you want to interact with Cassandra via objects.  This constrains you somewhat in that you won't be able to have dynamic number of columns/names, etc.  But it does allow you to port over ORMs, or centralize storage onto Cassandra for more traditional uses.
  • Pelops 
    • I've only used Pelops briefly.  It was a straight forward API, but didn't seem to have the momentum behind it. 
  • PlayORM (ORM without the constraints?)
    • I just heard about this one (Thanks for the pointer Dean).  It  looks like it is trying to solve the impedance mismatch between traditional JPA-based ORMs and NoSQL by introducing JQL.  It looks promising.
  • Spring Data Cassandra (Entirely Proof of Concept!)
    • We're big Spring-heads.   We use Spring Data elsewhere in our system.  There was a MongoDB implementation, but alas no Cassandra implementation.  To get consistency across our DAO layer, we decided to start work on the Spring Data Cassandra implementation.  Still a work in progress.
  • Thrift (Avoid Me!)
    • This is the "low-level" API.  I almost consider it the internal API/protocol for Cassandra.  Avoid using thrift directly unless you have cause to do so.
  • REST
  • Virgil  (Our Baby)
    • Built on Dropwizard, we use Virgil for loosely coupled integrations between systems.  It is also a simple way to get newbies up and running (firing curl's at the DB).  We also use it for Ruby/Cucumber integration.
  • restish
    • Not sure if this one is still maintained, but I wanted to include it for completeness. (and as not to appear biased)
  • Python
  • Pycassa (The only game in town?)
    • As far as I know, this is *the* python API for Cassandra.  Please let me know if there are others worth considering.
  • PHP
  • PHPcassa (Been around the block and back)
    • Another rock solid library that's been in the game for a while. This is your best bet in the PHP world.
  • Ruby
  • Ruby Gem 
    • I use this to create small little scripts for ETL, etc.  I had some trouble when I tried to use it for substantial jobs, which is why we fell back and decided to use the REST API instead.
    • http://www.engineyard.com/blog/2009/cassandra-and-ruby-a-love-affair/
  • Node.js
    • Node-cassandra-client
      • A node.js client released by Rackspace.
      • http://www.rackspace.com/blog/rackspace-contributes-cassandra-cql-driver-for-node-js/
  • Perl 

Wednesday, August 8, 2012

Stoked to be selected as a Cassandra MVP!



Thanks everyone.  I'm honored to be selected as a Cassandra MVP.   At Health Market Science (HMS), we've benefited tremendously from Cassandra, and we're happy to contribute back.

The Cassandra community is one of the strongest most passionate open-source crews I've had the pleasure of hanging with.   There are fun times ahead as Cassandra's gains even more momentum in the industry.

Thanks again.



Saturday, August 4, 2012

A Big Data Trifecta: Storm, Kafka and Cassandra


We're big fans of Cassandra.  We also use Storm as our distributed processing engine.  We've had a lot of success using our Cassandra Bolt to create a successful marriage between the two.  To date, we've been using Storm to integrate with our legacy technologies via our JMS Spout.  Now we're looking to expand its role beyond legacy system integration.

In Storm's new role, the work load is orders of magnitude greater and although JMS worked well in the previous integration scenarios, we knew it might not be the best solution to accommodate the volume of work we anticipate. We need to support millions of messages on the queues.   This is not the typical application of JMS and is exactly the reason LinkedIn open sourced Kafka:

"We first looked at several existing queuing solutions in the market. The most popular ones are based on JMS. Although JMS offers a rich set of features, it also adds significant overhead in the message representation. Additionally, some JMS implementations are optimized for the case when all messages can be cached in memory and their performance starts to degrade significantly when the in-memory buffer is saturated. Finally, most existing solutions don’t have a clean design for scaling out."

To validate our assumptions, we needed to put Kafka through its paces.  That meant plugging it into our Storm topology.  For those that don't know Storm, think of it aa "Big Data ESB" optimized for processing streams of data that are broken down into discrete packets called Tuples.  Spouts emit tuples.  Bolts consume them. Storm plays the role of message router between the components.

We already had our Cassandra Bolt in place.  All I needed to do was swap out our JMS Spout, with a Kafka Spout.   Here is what the topology looked like:

        TopologyBuilder builder = new TopologyBuilder();
        List hosts = new ArrayList();
        hosts.add("localhost");

        SpoutConfig spoutConfig = SpoutConfig.fromHostStrings(hosts, 1, "test", "/foo", "foo");
        spoutConfig.zkServers = ImmutableList.of("localhost");
        spoutConfig.zkPort = 2181;
        spoutConfig.scheme = new StringScheme();
        builder.setSpout("spout", new KafkaSpout(spoutConfig));

        DefaultBatchingCassandraBolt bolt = new DefaultBatchingCassandraBolt(new MyColumnFamilyMapper(), new MyRowKeyMapper(), new MyColumnsMapper());
        bolt.setAckStrategy(AckStrategy.ACK_ON_WRITE);
        builder.setBolt("loader", bolt).shuffleGrouping("spout");



This topology simply connects a Kafka Spout to a Cassandra Bolt.

 (WARNING: The above code leverages a change to the Cassandra bolt that is still only in my fork.  It may not work for you. Watch this pull request.)

I then queued 10 million JSON records in Kafka. (which took about 5 minutes running locally on a macbookpro)  I then unleashed the topology.

Now, Kafka is *fast*.  When running the Kafka Spout by itself, I easily reproduced Kafka's claim that you can consume "hundreds of thousands of messages per second".  When I first fired up the topology, things went well for the first minute, but then quickly crashed as the Kafka spout emitted  too fast for the Cassandra Bolt to keep up.  Even though Cassandra is fast as well, it is still orders of magnitude slower than Kafka.

Fortunately, since Storm interacts with its Spout's using a pull model, it provides a way to throttle back the messaging.  I added the following parameter to the Config.

config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 5000);

This limits the number of un-acked tuples in the system.  With the AckStrategy set to ACK_ON_WRITE within the Cassandra Bolt, this established a safe way for the Bolt to communicate back to the Spout that it is "ready for some more".

With this topology, we saw consistent throughput of 5000 writes per second to Cassandra. (running locally on my MBP).  That will work nicely when deployed to the cluster. =)

Kafka has some other nice characteristics that make it well suited for big data applications.  I'll go into the details of those in a future post.

* Kudos to Taylor Goetz.  He has done some great work on the storm components that's made this possible.

Wednesday, August 1, 2012

Example Distributed Queue using Zookeeper (via Curator)


We use Storm for distributed processing.  We've been using JMS as a means of driving work into Storm via the spout we developed:
https://github.com/hmsonline/storm-jms

But since Storm uses Zookeeper under the hood, we thought we could perhaps use Zookeeper instead of JMS for certain low-throughput distributed messaging needs, thereby decreasing the number of technologies we needed to maintain in our stack for certain applications.

Fortunately, there is a Distributed Queue recipe for Zookeeper.

Even better, Netflix has bundled an implementation of this recipe into Curator.

The docs for the Curator recipe were adequate, but I didn't see a complete example.  After some muddling around, I was able to get it working.  You can find the code here:
https://github.com/boneill42/zookeeper-distributed-queue-example

In the end, the throughput was just too slow. (You could see the messages scroll by on the screen as they were being consumed)

Eventually, I punted and decided to take a look at Kafka.  That experiment is going *extremely* well.  Kafka is very impressive.  More on that in a future post. =)