Friday, September 25, 2015

Druid : Vagrant Up (and Tranquility!)


We've been shown the light.  

After a great meeting with Fangjin and Giam, we are headed down the Tranquility route for real-time ingestion from Kinesis to Druid.  For me, that means getting comfortable with the expanded set of node types in the Druid cluster.

There is no better way to get comfortable with the cluster and the new node types, than to spin up a complete environment and kick the tires a bit.  And there is no better way to do that than "vagrant up"!

So, this morning I set out to create a complete environment suitable for Tranquility, inclusive of middle managers (which are typically omitted from a standalone "simple" cluster).  I started with Quantily's vagrant config for Druid 0.6.160, forked the repo, and went to work.

If you are impatient, you can find my fork here and get going.  If you have the patience...

It's important to understand the anatomy of Druid cluster.  First off, Druid relies on Zookeeper and MySQL.  The install.sh script installs vanilla versions of these, and creates a druid database and user in MySQL.  (Druid itself populates the schema at startup.)

The following is a list of the server types in a Druid cluster.  On our vagrant server, each of the servers occupies a different port, and has its own configuration. (also detailed below)

Overlord: (port 8080)

The overlord is responsible for task management on the Druid cluster.  Since Tranquility is largely an orchestration engine, doling out tasks to create segments as they form in real-time, the overlord is the entry point into the Druid cluster for Tranquility. 


Coordinator: (port 8081)

The coordinator is responsible for segment management, telling nodes to load/drop segments.


Broker: (port 8090)

The Broker is the query proxy in the system.  It receives the query, knows which nodes have which segments, and proxies the query to the respective nodes.


Historical: (port 8082)

Historical nodes are the beasts that load the segments and respond to queries for "static" data.  Once a segment has been committed to deep storage, it is loaded by a historical node, which responds to queries.


MiddleManager: (port 8100)

MiddleManagers are exactly that. =)  They push tasks to Peons, which they spawn on the same node.  Right now, one Peon works on one task, which is produces one segment.  (that may change) 


Special Notes about Real-Time nodes:
Per my previous blog, we were debating the use of a real-time (RT) node.   But with Tranquility, you don't need RT nodes.  They are replaced with transient peon instances that run a temporary firehose ingesting the events for that segment directly from the Tranquility client.  

Druid is targeting highly-available, fully-replicated, transactional real-time ingestion.  Since RT nodes share nothing.  They are unable to coordinate and unable to replicate data, which means they don't fit well into the strategic vision for the project.  Tranquility, and its coordinated ingestion model, may eventually obsolesce/replace RT nodes entirely. (See #1642 for more information)

Getting Started

OK -- down to business.

To fire up your own cluster, simply clone the repository and "vagrant up".  Once things are up and running, you can:

Hit the Overlord at:

Hit the Coordinator at:

In my next blog, I'll detail the client-side of integrating Tranquility using the direct Finagle API.




Thursday, September 24, 2015

Kinesis -> Druid : Options Analysis (to Push? to Pull? to Firehose? to Nay Nay?)


Druid is one of the best slicers and dicers on the planet (except maybe for the Vegematic-2 ;).  And I know there are people out there that might argue that Elastic Search can do the job (Shay & Isaac, I'm looking at you), but MetaMarkets and Yahoo have proved that Druid can scale to some pretty insane numbers, with query times an order of magnitude better than Spark.  And because of that, we've come to rely on Druid as our main analytics database.

The challenge is figuring out how best to pipe events into Druid.  As I pointed out in my previous post, Kinesis is now a contender.  Alas, Druid has no first class support for Kinesis.  Thus, we need to figure out how to plumb Kinesis to Druid.  (<-- druidish="" intended="" p="" pun="">
First, let me send out some Kudos to Cheddar, Fangjin, Gian and the crew...
They've made *tremendous* progress since I last looked at Druid. (0.5.X days)
That progress has given us the additional options below.

Here are the options as I see them:


Option 1: Traditional KinesisFirehose Approach

In the good ole' days of 0.5.X, real-time data flowed via Firehoses.  To introduce a new ingest mechanism, one simply implemented the Firehose interface, fired up a real-time node, and voila: data flowed like spice. (this is what I detailed in the Storm book)

And since 0.5, Druid has made some updates to the Firehose API, to specifically address the issues that we saw before (around check-pointing, and making sure events are processed only once, in a highly-available manner).  For the update on the Firehose API, check out this thread.

After reading that thread, I was excited to crank out a KinesisFirehose.  And we *nearly* have a KinesisFirehose up and running.  As with most Java projects these days, you spend 15 minutes on the code, and then two hours working out dependencies.  Ironically, Amazon's Kinesis Producer Library (KPL) uses an *older* version of the aws-java-sdk then Druid, and because the firehose runs in the same JVM as the Druid code, you have to workout the classpath kinks.  When I hit this wall, it got me thinking -- hmmm, maybe some separation might be good here. ;)

To summarize what I did: I took the KPL's push model (implemented IRecordProcessor), put a simple little BlockingQueue in place, and wired it to Druid's pull model (Firehose.nextRow).  It worked like a charm in unit tests. ;)

(ping me, if you want to collaborate on the firehose implementation)

Option 2: The EventReceiver Firehose

Of course, the whole time I'm implementing the Firehose -- I'm on the consuming end of a push model from the KPL and on the producing end of a pull model from Druid, and forced to dance between those two.  It made me wonder, "Isn't there a way to push data into Druid?".   If there were such a thing, then I could just push data into Druid as I receive it from the KPL. (that sounds nice/simple doesn't it?)

This ends up being the crux of the issue, and something that many people have been wrestling with.  It turns out that there is a firehose specifically for this!  Boo yah!  If you have a look at the available firehoses, you'll see an Event Receiver firehose.  The Event Receiver firehose creates a REST endpoint to which you can POST events.  (have a look at the addAll() method's javax.ws.rs.POST annotation)

Beautiful.  Now, I could receive messages from the KPL and POST them to Druid.  Simple. I  move the checkpoint in Kinesis after I safely receive 200's for all of my events.  Unfortunately, what happens to those events on the Druid side if the node that receives them fails?  Also, how do I scale that ingest mechanism?  Are there multiple REST endpoints behind a load balancer? 

I started up a real-time node with a spec file instructing Druid to fire up (<- 404="" a="" an="" and="" another="" appeared="" back="" but="" came="" code="" complete="" documentation="" druidish="" endpoint.="" eventreceiver.="" found="" from="" grep="" ing="" nbsp="" nothing="" out="" pointed="" pun="" s="" started="" startup="" t="" that="" the="" to="" url="" workerresource:="">

http://:/druid/worker/v1/chat//push-events/

Well, that WorkerResource was over in the indexing-service... hmmm, what is an IndexingService?

We tried to answer all the above questions around availability and scalability, and did some further research on the IndexingService.  That lead us to Tranquility. (hey, that sounds so nice and pleasant, maybe we want to use that!?)


Option 3: Tranquility

In doing some due-diligence on Tranquility, I discovered that while I was away, Druid implemented their own Task management system!!  (See the Indexing-Service documentation for more information)  Honestly, my first reaction was to run for the hills.  Why would Druid implement their own task management system?  MiddleManagers and Peons... sounds an awful lot like YARN.  I thought the rest of the industry was moving to YARN (spark, samza, storm-yarn, etc.) YARN was appropriately named, Yet Another Resource Negotiator, because everyone was building their own! What happened to the simplicity of Druid, and the real-time node?

Despite that visceral reaction, I decided to give Tranquility a whirl.  I fired up the simple cluster from the documentation, incorporated the Finagle-based API integration and was able to get data flowing through the system.  I watched the overlord's console to watch the tasks move from pending to running to complete.  It worked!  I was happy... sort of.  I was left with a "these are small Hadoop jobs" taste in my mouth.   It didn't feel like events were "streaming".  

Now, this might just be me.  In reality, many/all of the streaming frameworks are actually just doing micro-batching.  And with Druid's segment-based architecture, the system really needs to wait for a windowPeriod/segmentGranularity to pass before a segment can be published.  I get it.

I'm just scared.  For me, the more nouns involved, the more brittle the system: tasks, overlords, middle managers, peons, etc.    Maybe I should just get myself some kahonas, and plow ahead.


Next Step:

Regardless,  we plan to meet up with Fangjin Yang and Gian Merlino, who are presently coming out of Stealth mode with a new company.  I have a tremendous amount of faith in these two.   I'm confident they will steer us in the right direction, and when they do -- I'll report back. =)








Monday, September 21, 2015

Amazon Kinesis : 20x cheaper when used with the Kinesis Producer Library (KPL)!

We've been looking at the tradeoffs between Kafka and Kinesis, and just recently.... Kinesis won!

Kinesis pricing is based on two dimensions: hourly price for each shard (which dictates overall concurrency/throughput), and a unit price per 1M messages.  As I alluded to in my previous post, we process a *ton* of events, and if we simply pushed those events onto Kinesis streams 1:1, we would quickly hand over all of our money to Amazon and go out of business.

However, with the recent release of the Kinesis Producer Library (KPL), Amazon exposed a native capability to aggregate multiple messages/events into a single PUT unit.  The maximum size of a PUT unit is 25Kb.  If you have a message size of 150 bytes, you can cram about 150 messages  into a single PUT unit! (and save some serious dough!)

Here is the math, straight from the Kinesis Pricing Calculator:

Without the KPL,
100K m / s * (150 bytes / m) = 100 shards and 263,520M PUT units = $4,787.28 / month

* Note: That each shard can only process 1K/s, which is why we end up with 100 shards.

Now with KPL, let's assume we can only fit 100 messages in each PUT unit.
We would reduce our required throughput (messages / second) down to 1K / s.
With 100 messages in each PUT unit,  each unit would be 15Kb in size. (100 * 150 bytes)

This gives us:
1K m / s * (15 Kb / m) = 15 shards and 2,635.2M PUT units = $201.60 / month

That is s savings of: ~20x!!

So, let's look at what that means architecturally...

First, you are going to want to look at the KPL recommended usage matrix:
http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-kpl-integration.html

My naive interpretation of that chart is: Use Java!

Ironically, the producer is actually a native built micro-service/binary.  And the KPL is a java wrapper around that native binary that uses interprocess communication to delegate work to the micro-service.

In Java, it is straightforward to integrate the KPL.  Here are the key components.

First, you need to configure the producer:

1
2
3
4
5
6
7
8
9
KinesisProducerConfiguration config = new KinesisProducerConfiguration();
config.setRegion(region);
config.setCredentialsProvider(new ProfileCredentialsProvider());
config.setMaxConnections(1);
config.setRequestTimeout(60000);
config.setRecordMaxBufferedTime(15000);
config.setNativeExecutable(
  "/Users/brianoneill/git/amazon-kinesis-producer/bin/darwin-4.2.1/debug/kinesis_producer");
producer = new KinesisProducer(config);


Notice, that the KPL actually sends the messages asynchronously, buffering/aggregating those messages internally.  (which makes for challenging guarantees around message delivery -- but we'll defer that problem for another day)

Also notice, that I built my own native binary, and specified its location.  You don't necessarily need to do that, the binary is actually bundled into the JAR file and extracted into a temporary location at runtime.  (yeah, crazy eh?)

Next, you need to send some data:

This is done simply, one line of code:

1
2
ListenableFuture<UserRecordResult> f = producer.addUserRecord(streamName,
  Long.toString(System.currentTimeMillis()), data);


Finally, you need to find out what happened to your message:

For this one, we'll add a callback to the future:

1
Futures.addCallback(f, callback);

Where, callback is a method that will be called when the record is processed.  Here is an example:


 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
final FutureCallback<UserRecordResult> callback = new FutureCallback<UserRecordResult>() {
 @Override
 public void onFailure(Throwable t) {
  if (t instanceof UserRecordFailedException) {
   Attempt last = Iterables.getLast(((UserRecordFailedException) t).getResult().getAttempts());
   LOGGER.error(String.format("Record failed to put - %s : %s", last.getErrorCode(),
     last.getErrorMessage()));
  }
  LOGGER.error("Exception during put", t);
  System.exit(1);
 }

 @Override
 public void onSuccess(UserRecordResult result) {
  // LOGGER.info("Successfully wrote [" + result.toString() + "]");
  completed.getAndIncrement();
 }
};


And there you have it, KPL in a nutshell.

For us, it was a huge win, and actually made Kinesis a viable alternative.
Now, we just need to figure out how to plumb it into everything else. =)

Kinesis Firehose to the rescue!  (stay tuned)




Friday, September 18, 2015

Gone Monetate : Personalizing Marketing at 100K events/second


Internet advertising has gone the way of junk mail.  There are tons of impersonal advertisements sucking up screen real-estate and mailbox space.   The majority of the advertising industry leverages only inane intelligence that assumes, "because I clicked on a rubber ducky once, I must want to buy a rubber ducky".  In fact, I probably want to see rubber duckies on every site I visit, which is the internet equivalent of a mailbox full of refinance offers the second I have my credit checked.  I'd much rather be shown things I care about.  Even better, show me stuff I didn't know I cared about!

That is the reason I joined Monetate.  Well, it was that combined with the opportunity to join a rock-star team of passionate geniuses, working on an incredibly challenging real-time data and analytics problem, hell-bent on changing the world.

I've only been on the team for a week, and I've been blown away.  Directed by Jeff Persch, the development teach has architected a phenomenal platform: scalable and resilient, with simplicity as a core tenant.  With Monetate at the center of many of the e-commerce interactions, influencing almost 1/3 of the dollars spent on Black Friday, that is no small feat.  We are gearing up for another black friday, for which peak traffic may hit 50-100K events / second!

Doing some math on that, the numbers get big really fast:

100K / s = 6M / minute = 360M / hour = ~8B events / day

Yep, that ended in the B word... per day.  And like everyone else, we are pushing to deliver better, faster, and more insightful analytics on those user activity streams... in near real-time.

And because of that, I hope to have many useful/fun blog posts coming.

Right now, we're in the process of rolling out Druid, a favorite technology of mine. (and something to which we dedicated a whole chapter in our Storm book -- (shameless plug ;))  With Druid in place, we'll be able to slice and dice the real-time events.

And we already have the ability to ingest user context in real-time. When you combine that user context with stream processing that allows us to identify behavioral trends, our decision engine can make contextual decisions, adapt on the fly, and deliver relevant/interesting content to the end-user!

Hooray!

Imagine if we could only do that for the real world too.  Instead of coming home to a mailbox filled with junk mail that I immediately throw awawy,  I'd have updates on the latest gadgetry and fishing paraphernalia.  Hmm, maybe I can get that on the roadmap. ;)

Anyway, I couldn't be happier with my choice to join Monetate.  The people, culture and technology are astounding.

Stay tuned -- likely my next blog post will show how to leverage Amazon's Kinesis for 1/100th the cost.  (by aggregating events into a smaller number of Kinesis records, decreasing the overall message rate, you can save mondo dough =)