Friday, September 25, 2015

Druid : Vagrant Up (and Tranquility!)

We've been shown the light.  

After a great meeting with Fangjin and Giam, we are headed down the Tranquility route for real-time ingestion from Kinesis to Druid.  For me, that means getting comfortable with the expanded set of node types in the Druid cluster.

There is no better way to get comfortable with the cluster and the new node types, than to spin up a complete environment and kick the tires a bit.  And there is no better way to do that than "vagrant up"!

So, this morning I set out to create a complete environment suitable for Tranquility, inclusive of middle managers (which are typically omitted from a standalone "simple" cluster).  I started with Quantily's vagrant config for Druid 0.6.160, forked the repo, and went to work.

If you are impatient, you can find my fork here and get going.  If you have the patience...

It's important to understand the anatomy of Druid cluster.  First off, Druid relies on Zookeeper and MySQL.  The script installs vanilla versions of these, and creates a druid database and user in MySQL.  (Druid itself populates the schema at startup.)

The following is a list of the server types in a Druid cluster.  On our vagrant server, each of the servers occupies a different port, and has its own configuration. (also detailed below)

Overlord: (port 8080)

The overlord is responsible for task management on the Druid cluster.  Since Tranquility is largely an orchestration engine, doling out tasks to create segments as they form in real-time, the overlord is the entry point into the Druid cluster for Tranquility. 

Coordinator: (port 8081)

The coordinator is responsible for segment management, telling nodes to load/drop segments.

Broker: (port 8090)

The Broker is the query proxy in the system.  It receives the query, knows which nodes have which segments, and proxies the query to the respective nodes.

Historical: (port 8082)

Historical nodes are the beasts that load the segments and respond to queries for "static" data.  Once a segment has been committed to deep storage, it is loaded by a historical node, which responds to queries.

MiddleManager: (port 8100)

MiddleManagers are exactly that. =)  They push tasks to Peons, which they spawn on the same node.  Right now, one Peon works on one task, which is produces one segment.  (that may change) 

Special Notes about Real-Time nodes:
Per my previous blog, we were debating the use of a real-time (RT) node.   But with Tranquility, you don't need RT nodes.  They are replaced with transient peon instances that run a temporary firehose ingesting the events for that segment directly from the Tranquility client.  

Druid is targeting highly-available, fully-replicated, transactional real-time ingestion.  Since RT nodes share nothing.  They are unable to coordinate and unable to replicate data, which means they don't fit well into the strategic vision for the project.  Tranquility, and its coordinated ingestion model, may eventually obsolesce/replace RT nodes entirely. (See #1642 for more information)

Getting Started

OK -- down to business.

To fire up your own cluster, simply clone the repository and "vagrant up".  Once things are up and running, you can:

Hit the Overlord at:

Hit the Coordinator at:

In my next blog, I'll detail the client-side of integrating Tranquility using the direct Finagle API.

Thursday, September 24, 2015

Kinesis -> Druid : Options Analysis (to Push? to Pull? to Firehose? to Nay Nay?)

Druid is one of the best slicers and dicers on the planet (except maybe for the Vegematic-2 ;).  And I know there are people out there that might argue that Elastic Search can do the job (Shay & Isaac, I'm looking at you), but MetaMarkets and Yahoo have proved that Druid can scale to some pretty insane numbers, with query times an order of magnitude better than Spark.  And because of that, we've come to rely on Druid as our main analytics database.

The challenge is figuring out how best to pipe events into Druid.  As I pointed out in my previous post, Kinesis is now a contender.  Alas, Druid has no first class support for Kinesis.  Thus, we need to figure out how to plumb Kinesis to Druid.  (<-- druidish="" intended="" p="" pun="">
First, let me send out some Kudos to Cheddar, Fangjin, Gian and the crew...
They've made *tremendous* progress since I last looked at Druid. (0.5.X days)
That progress has given us the additional options below.

Here are the options as I see them:

Option 1: Traditional KinesisFirehose Approach

In the good ole' days of 0.5.X, real-time data flowed via Firehoses.  To introduce a new ingest mechanism, one simply implemented the Firehose interface, fired up a real-time node, and voila: data flowed like spice. (this is what I detailed in the Storm book)

And since 0.5, Druid has made some updates to the Firehose API, to specifically address the issues that we saw before (around check-pointing, and making sure events are processed only once, in a highly-available manner).  For the update on the Firehose API, check out this thread.

After reading that thread, I was excited to crank out a KinesisFirehose.  And we *nearly* have a KinesisFirehose up and running.  As with most Java projects these days, you spend 15 minutes on the code, and then two hours working out dependencies.  Ironically, Amazon's Kinesis Producer Library (KPL) uses an *older* version of the aws-java-sdk then Druid, and because the firehose runs in the same JVM as the Druid code, you have to workout the classpath kinks.  When I hit this wall, it got me thinking -- hmmm, maybe some separation might be good here. ;)

To summarize what I did: I took the KPL's push model (implemented IRecordProcessor), put a simple little BlockingQueue in place, and wired it to Druid's pull model (Firehose.nextRow).  It worked like a charm in unit tests. ;)

(ping me, if you want to collaborate on the firehose implementation)

Option 2: The EventReceiver Firehose

Of course, the whole time I'm implementing the Firehose -- I'm on the consuming end of a push model from the KPL and on the producing end of a pull model from Druid, and forced to dance between those two.  It made me wonder, "Isn't there a way to push data into Druid?".   If there were such a thing, then I could just push data into Druid as I receive it from the KPL. (that sounds nice/simple doesn't it?)

This ends up being the crux of the issue, and something that many people have been wrestling with.  It turns out that there is a firehose specifically for this!  Boo yah!  If you have a look at the available firehoses, you'll see an Event Receiver firehose.  The Event Receiver firehose creates a REST endpoint to which you can POST events.  (have a look at the addAll() method's annotation)

Beautiful.  Now, I could receive messages from the KPL and POST them to Druid.  Simple. I  move the checkpoint in Kinesis after I safely receive 200's for all of my events.  Unfortunately, what happens to those events on the Druid side if the node that receives them fails?  Also, how do I scale that ingest mechanism?  Are there multiple REST endpoints behind a load balancer? 

I started up a real-time node with a spec file instructing Druid to fire up (<- 404="" a="" an="" and="" another="" appeared="" back="" but="" came="" code="" complete="" documentation="" druidish="" endpoint.="" eventreceiver.="" found="" from="" grep="" ing="" nbsp="" nothing="" out="" pointed="" pun="" s="" started="" startup="" t="" that="" the="" to="" url="" workerresource:="">


Well, that WorkerResource was over in the indexing-service... hmmm, what is an IndexingService?

We tried to answer all the above questions around availability and scalability, and did some further research on the IndexingService.  That lead us to Tranquility. (hey, that sounds so nice and pleasant, maybe we want to use that!?)

Option 3: Tranquility

In doing some due-diligence on Tranquility, I discovered that while I was away, Druid implemented their own Task management system!!  (See the Indexing-Service documentation for more information)  Honestly, my first reaction was to run for the hills.  Why would Druid implement their own task management system?  MiddleManagers and Peons... sounds an awful lot like YARN.  I thought the rest of the industry was moving to YARN (spark, samza, storm-yarn, etc.) YARN was appropriately named, Yet Another Resource Negotiator, because everyone was building their own! What happened to the simplicity of Druid, and the real-time node?

Despite that visceral reaction, I decided to give Tranquility a whirl.  I fired up the simple cluster from the documentation, incorporated the Finagle-based API integration and was able to get data flowing through the system.  I watched the overlord's console to watch the tasks move from pending to running to complete.  It worked!  I was happy... sort of.  I was left with a "these are small Hadoop jobs" taste in my mouth.   It didn't feel like events were "streaming".  

Now, this might just be me.  In reality, many/all of the streaming frameworks are actually just doing micro-batching.  And with Druid's segment-based architecture, the system really needs to wait for a windowPeriod/segmentGranularity to pass before a segment can be published.  I get it.

I'm just scared.  For me, the more nouns involved, the more brittle the system: tasks, overlords, middle managers, peons, etc.    Maybe I should just get myself some kahonas, and plow ahead.

Next Step:

Regardless,  we plan to meet up with Fangjin Yang and Gian Merlino, who are presently coming out of Stealth mode with a new company.  I have a tremendous amount of faith in these two.   I'm confident they will steer us in the right direction, and when they do -- I'll report back. =)