Thursday, February 13, 2014

Storm and Cassandra : A Three Year Retrospective


We are doing our final edits on our Storm book due out in April.  In reviewing the chapters, I got to thinking through the evolution of our architecture with Storm.  I thought I would capture some of our journey.  Maybe people out there can skip a few epochs of evolution. =)

Kudos again to +P. Taylor Goetz for introducing Storm at Health Market Science.  When Taylor joined the team, we were well on our way to selecting Cassandra as our persistence mechanism, but we had yet to solve the distributed processing problem.  We had varying levels of JMS/JVM sprawl and we were dealing with all the challenges of transactional processing against those queues. (exactly the situation Nathan Marz typically depicts when motivating Storm)

To accompany the JMS/JVM sprawl, we also had Hadoop deployments against Cassandra that we were fairly frustrated with.  The Map/Reduce paradigm for analysis seemed very restrictive, and we were spending a lot of time tweaking jobs to balance work across phases (map, shuffle, reduce).   It felt like we were shoe-horning our problems into M/R.  If you then add on the overhead of spinning up a job and waiting for the results, we wanted better.  Enter Storm.

Amoebas
We had made the decision to go forward with Cassandra, but we didn't see any bridge between Storm and Cassandra -- so we built one.  By December 2011, we had made enough progress on storm-cassandra that it made it into Nathan's talk at the Cassandra Summit, and we started building out our first topologies.

Back in those days, there was no such thing as Trident in Storm.   And given the pain that we first encountered, I'd guess that most of the production uses of Storm did not demand transactional integrity.  I'm guessing that many of those uses only needed "good enough" real-time answers, and likely leveraged Hadoop, lagging somewhat, to correct issues offline.

We didn't like that approach.  Our systems are used to make health-care related decisions in many large operational systems.   We wanted the data to be immediately vendable, and guaranteed. Enter Transactional Topologies.

We started using transactional topologies, getting our feet wet connecting Storm to our JMS queues, and birthing storm-jms in the process.  In this phase, we felt some of the pain of being on the bleeding edge.  APIs felt like they were shifting quite a bit with the coup de grâce coming when Trident was introduced.

Trident added to Storm what we needed: better support for transactional processing and state management.  But what was that?  transactional topologies are now deprecated?  Ouch.   Quick -- everyone over to Trident!  (we could have used a heads up here!)

Vertebrates
We rapidly ported all of our transactional topologies over to Trident and got acquainted with the new concepts of State.   At the same time, we were advancing our understanding of Cassandra data modeling.

We learned the following, which should be tattooed on everyone working in distributed computing:

  1. Eliminate read-before-write scenarios (never fetch+update, just insert)
  2. Ensure all operations are idempotent (when you insert, overwrite)
  3. When possible, avoid shuffling data around (partitionPersist is your friend)

Make sure your processing flow and data model support the above tenants.  With our tattoos, we continued to build out our use of Storm throughout 2013.

Walking Upright
Many people tend to compare Storm and Hadoop, with Storm portrayed as a "real-time Hadoop". I believe this short changes Storm a bit.   Hadoop (v1) runs a specific algorithm across a pre-defined set of data.  Many times to accomplish something useful, you need to string many M/R jobs together.  Eventually, you find yourself in want of a higher-level language like Pig, Hive or Cascading.  Storm operates at that this higher level, and although it is often cast as a framework for "real-time analytics", it is a general data processing layer capable of accommodating fairly sophisticated data flows.

In fact, Storm excels as data flow and orchestration infrastructure.  We use it as our data ingestion infrastructure, orchestrating writes across multiple data storage mechanisms.  (see trident-elasticsearch)  It provides the backbone for a solid platform that avails of polyglot persistence.

Future Evolution
The best is yet to come.  Cassandra is churning out new features that make it even more useful for distributed processing.  See my previous post on CQL enhancements.  We created a new project to take advantage of those features. (see storm-cassandra-cql)  It's already getting some traction. (shout out to Robert Lee for contributing the BackingMap implementation, which should be merged shortly)

Also, with Storm's incorporation into Apache and HortonWorks commitment, we should see a more stable API, more frequent releases, and better synergy with other apache projects.  (yarn anyone?!)

Conclusion
So, if you are a gambling type, I'd push your chips to the center of the table.  Bet on Storm and Cassandra being a powerful pair as demands for bigger, better and faster continue to push those of us at the edge of the envelope.  Its anyone's guess what the powerful pair will evolve into, but one can imagine great things.


4 comments:

Baron Schwartz said...

The link to your book is broken.

Brian O'Neill said...

Tnx. (fixed)

David Gildea said...

Hi Brian,

Great article, and having spend the past year implementing a realtime analytics system on Cassandra using counters I got worried reading you rules, especially #2 about the data always being idempotent which counters are not.
In using storm you are effectively getting around that problem but having the counters in state in Storm/Trident ?

Thanks
Dave

Brian O'Neill said...

David,

Exactly right. In fact, my next blog post will address that. If you have a look at the storm-cassandra-cql implementation, I've added what I call an "Incremental" version of the Trident State object that has an in-built aggregation. Combining that with conditional updates, you can "count" (aggregate) in Storm, then safely transition the state in C*. (with counters) -- the conditional update is used to make things idempotent.