Saturday, August 4, 2012

A Big Data Trifecta: Storm, Kafka and Cassandra


We're big fans of Cassandra.  We also use Storm as our distributed processing engine.  We've had a lot of success using our Cassandra Bolt to create a successful marriage between the two.  To date, we've been using Storm to integrate with our legacy technologies via our JMS Spout.  Now we're looking to expand its role beyond legacy system integration.

In Storm's new role, the work load is orders of magnitude greater and although JMS worked well in the previous integration scenarios, we knew it might not be the best solution to accommodate the volume of work we anticipate. We need to support millions of messages on the queues.   This is not the typical application of JMS and is exactly the reason LinkedIn open sourced Kafka:

"We first looked at several existing queuing solutions in the market. The most popular ones are based on JMS. Although JMS offers a rich set of features, it also adds significant overhead in the message representation. Additionally, some JMS implementations are optimized for the case when all messages can be cached in memory and their performance starts to degrade significantly when the in-memory buffer is saturated. Finally, most existing solutions don’t have a clean design for scaling out."

To validate our assumptions, we needed to put Kafka through its paces.  That meant plugging it into our Storm topology.  For those that don't know Storm, think of it aa "Big Data ESB" optimized for processing streams of data that are broken down into discrete packets called Tuples.  Spouts emit tuples.  Bolts consume them. Storm plays the role of message router between the components.

We already had our Cassandra Bolt in place.  All I needed to do was swap out our JMS Spout, with a Kafka Spout.   Here is what the topology looked like:

        TopologyBuilder builder = new TopologyBuilder();
        List hosts = new ArrayList();
        hosts.add("localhost");

        SpoutConfig spoutConfig = SpoutConfig.fromHostStrings(hosts, 1, "test", "/foo", "foo");
        spoutConfig.zkServers = ImmutableList.of("localhost");
        spoutConfig.zkPort = 2181;
        spoutConfig.scheme = new StringScheme();
        builder.setSpout("spout", new KafkaSpout(spoutConfig));

        DefaultBatchingCassandraBolt bolt = new DefaultBatchingCassandraBolt(new MyColumnFamilyMapper(), new MyRowKeyMapper(), new MyColumnsMapper());
        bolt.setAckStrategy(AckStrategy.ACK_ON_WRITE);
        builder.setBolt("loader", bolt).shuffleGrouping("spout");



This topology simply connects a Kafka Spout to a Cassandra Bolt.

 (WARNING: The above code leverages a change to the Cassandra bolt that is still only in my fork.  It may not work for you. Watch this pull request.)

I then queued 10 million JSON records in Kafka. (which took about 5 minutes running locally on a macbookpro)  I then unleashed the topology.

Now, Kafka is *fast*.  When running the Kafka Spout by itself, I easily reproduced Kafka's claim that you can consume "hundreds of thousands of messages per second".  When I first fired up the topology, things went well for the first minute, but then quickly crashed as the Kafka spout emitted  too fast for the Cassandra Bolt to keep up.  Even though Cassandra is fast as well, it is still orders of magnitude slower than Kafka.

Fortunately, since Storm interacts with its Spout's using a pull model, it provides a way to throttle back the messaging.  I added the following parameter to the Config.

config.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 5000);

This limits the number of un-acked tuples in the system.  With the AckStrategy set to ACK_ON_WRITE within the Cassandra Bolt, this established a safe way for the Bolt to communicate back to the Spout that it is "ready for some more".

With this topology, we saw consistent throughput of 5000 writes per second to Cassandra. (running locally on my MBP).  That will work nicely when deployed to the cluster. =)

Kafka has some other nice characteristics that make it well suited for big data applications.  I'll go into the details of those in a future post.

* Kudos to Taylor Goetz.  He has done some great work on the storm components that's made this possible.

6 comments:

Sonali Parthasarathy said...

This is great! We're working on a similar architecture as well. I had a quick question. Is it possible to write a Cassandra Bolt that queries Cassandra, converts the results into tuples and re-emit the tuples via Kafka? Basically, for data enrichment purposes, we were thinking about loading say a dictionary of sorts into Cassandra and querying against this when processing the actual tuples/events.

Alp Şehiç said...

Hi Brian,

for that part;

"We first looked at several existing queuing solutions in the market. The most popular ones are based on JMS. Although JMS offers a rich set of features, it also adds significant overhead in the message representation. Additionally, some JMS implementations are optimized for the case when all messages can be cached in memory and their performance starts to degrade significantly when the in-memory buffer is saturated. Finally, most existing solutions don’t have a clean design for scaling out."

Have you evaluated for instance ZeroMQ? Or to be more specific, which messaging solutions have you gone through?...

Thanks in advance...

Brian O'Neill said...

Sonali,
Sorry -- just seeing this reply. Yes, we have a bolt that reads from Cassandra. Take a look at the CassandraLookupBolt.

Brian O'Neill said...

@Alp, we have various flavors of JMS in house, but because we must handle very large batches (millions of recors on the queue), we've decided to go with Kafka.

Opes said...

Hi Brian
I am looking at processing a continuous stream of data using storm and write the tuples to Titan graph database. Is there a storm-titan bolt available that can do the job ?
Cheryan

Jeryl Cook said...

How many Storm instances did you use?