Tuesday, April 14, 2015

Tuning Hadoop & Cassandra : Beware of vNodes, Splits and Pages

When running Hadoop jobs against Cassandra, you will want to be careful about a few parameters.

Specifically, pay special attention to vNodes, Splits and Page Sizes.

vNodes were introduced in Cassandra 1.2.  vNodes allow a host to have multiple portions of the token range.  This allows for more evenly distributed data, which means nodes can share the burden of a node rebuild (and it doesn't fall all on one node).  Instead the rebuild is distributed across a number of nodes.  (way cool feature)

BUT, vNodes made Hadoop jobs a little trickier...

The first time I went to run a Hadoop job using CQL, I was running a local Hadoop cluster (only one container) against a Cassandra with lots of vNodes (~250).  I was running a simple job (probably a flavor of word count) over a few thousand records.  I expected the job to complete in seconds.  To my dismay, it was taking *forever*.

When you run a Hadoop job, the minimum number of splits is the number of vNodes.

In Hadoop, each split is consumed by a single container, and a container runs against only one split at a time.  Effectively, a Hadoop container is a JVM, which is spun-up to process a specific split of data.

In any Hadoop cluster, there is a maximum number of available containers.  In my case, I had one available container, which meant the splits had to be processed sequentially with spin-up and tear-down overhead for each split.  boooooo...

So, before you do anything check to see how many vNodes you are running.  This is controlled by you Cassandra yaml file.  There is a num_tokens parameter.  Below is an excerpt from cassandra.yaml.

# This defines the number of tokens randomly assigned to this node on the ring
# The more tokens, relative to other nodes, the larger the proportion of data
# that this node will store. You probably want all nodes to have the same number
# of tokens assuming they have equal hardware capability.
# If you already have a cluster with 1 token per node, and wish to migrate to
# multiple tokens per node, see http://wiki.apache.org/cassandra/Operations
num_tokens: 25

To see how many vNodes you have on an existing cluster, you can use nodetool:

➜  cassandra  bin/nodetool ring
Datacenter: datacenter1
Address    Rack        Status State   Load            Owns                Token
                                                                          8743874685407455894  rack1       Up     Normal  41.27 KB        100.00%             -8851282698028303387  rack1       Up     Normal  41.27 KB        100.00%             -8032384077572986919  rack1       Up     Normal  41.27 KB        100.00%             -7279609033604637725
...  rack1       Up     Normal  41.27 KB        100.00%             8107926707118065773  rack1       Up     Normal  41.27 KB        100.00%             8743874685407455894

You will notice that the number of token ranges, equals the number you set in the configuration file(s):

➜  cassandra  bin/nodetool ring | grep rack1 | wc -l

For more information on vNodes and configuration, please see Patrick's most excellent deck:

Because the minimum number of splits is the number of vNodes, you may want to run a separate Cassandra ring, with a smaller number of vNodes, that you can use for analytics.

OK, so what does this mean and how do you fix it?

Well, if you have a small number of containers and a high number of vNodes, split overhead may become a problem (as it was in my local Hadoop cluster).  But perhaps more importantly, if you run against a large number of records, and use the default split size, you will end up with an insane number of splits.  For example,  we recently ran against a table that had 5 billion records.  The default split size is 64K. [1]  Doing the math:

# of rows / split size = # of splits
5B / 64K = 78,125 splits!

That is an awful lot of splits, and you are going to take an overhead spin-up/tear-down penalty for each of those splits.  Even assuming only 5 seconds of overhead for each split, that is a ton of time:

78125 * 10 seconds  / 60  / 60  = ~100 hours of overhead (NOT GOOD!)

Optimally, the number of splits would equal the number of containers available. That typically won't be possible, but we can get as close as possible by setting the split size on the Hadoop job with the following line:

org.apache.cassandra.hadoop.ConfigHelper.setInputSplitSize(job.getConfiguration(), 10000000); // splitSize = 10M

Let's redo the math with this value:

5B / 10M = 500 splits (MUCH BETTER!)

If we are running on a Hadoop cluster with 250 available containers, we'll finish in two passes on the splits, each container processing two splits.

With the split sizes worked out, you will want to mind the page size parameter.  The page size parameter tells the CQL driver how many rows you want back at a time.  It is the equivalent of the LIMIT clause on the CQL statement.  For this one, you will want to pull back as many as possible without blowing up memory. (which I've done ;)   The default value of page size is 1000. To configure the page size, use the following line:

CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), Integer.toString(100));

Hopefully this saves people some time. Happy tuning. (shout out to gnana ;)

Monday, April 6, 2015

Holy momentum Batman! Spark and Cassandra (circa 2015) w/ Datastax Connector and Java

Over a year ago, I did a post on Spark and Cassandra.  At the time, Calliope was your best best.  Since then, Spark has exploded in popularity.

Check out this Google Trends chart.  That's quite a hockey stick for Spark.
Also notice their github project, which has almost 500 contributors and 3000 forks!!

Datastax is riding that curve.  And just in the time since my last post, Datastax developed and released their own spark-cassandra connector.  

I am putting together some materials for Philly Tech Week, where I'll be presenting on Cassandra development, so I thought I would give it a spin to see how things have evolved.  Here is a follow-along preview of the Spark piece of what I'll be presenting.

First, get Spark running:

Download Spark 1.2.1 (this is the version that works with the latest Datastax connector)

Next, unpack it and build it.  (mvn install)

And that is the first thing I noticed...

[INFO] Spark Project Parent POM
[INFO] Spark Project Networking
[INFO] Spark Project Shuffle Streaming Service
[INFO] Spark Project Core
[INFO] Spark Project Bagel
[INFO] Spark Project GraphX
[INFO] Spark Project Streaming
[INFO] Spark Project Catalyst
[INFO] Spark Project SQL
[INFO] Spark Project ML Library
[INFO] Spark Project Tools
[INFO] Spark Project Hive
[INFO] Spark Project REPL
[INFO] Spark Project Assembly
[INFO] Spark Project External Twitter
[INFO] Spark Project External Flume Sink
[INFO] Spark Project External Flume
[INFO] Spark Project External MQTT
[INFO] Spark Project External ZeroMQ
[INFO] Spark Project External Kafka
[INFO] Spark Project Examples

Look at all these new cool toys?  GraphX?  SQL? Kafka? In my last post, I was using Spark 0.8.1.  I took a look at the 0.8 branch on github and sure enough, all of this stuff was built just in the last year! It is crazy what momentum can do. A

After you've built spark, go into the conf directory and copy the template environment file.

cp spark-env.sh.template spark-env.sh

Then, edit that file and add a line to configure the master IP/bind interface:


(If you don't set the IP, the master may bind to the wrong interface, and your application won't be able to connect, which is what happened to me initially)

Next, launch the master:  (It gives you the path to the logs, which I recommend tailing)


In the logs, you should see:

15/04/06 12:46:39 INFO Master: Starting Spark master at spark://
15/04/06 12:46:39 INFO Utils: Successfully started service 'MasterUI' on port 8080.
15/04/06 12:46:39 INFO MasterWebUI: Started MasterWebUI at http://localhost:8080

Go hit that WebUI at http://localhost:8080.

Second,  get yourself some workers:

Spark has its own concept of workers.  To start one, run the following command:

bin/spark-class org.apache.spark.deploy.worker.Worker spark://

After a few seconds, you should see the following:
15/04/06 13:54:23 INFO Utils: Successfully started service 'WorkerUI' on port 8081.
15/04/06 13:54:23 INFO WorkerWebUI: Started WorkerWebUI at http://localhost:8081
15/04/06 13:54:23 INFO Worker: Connecting to master spark://
15/04/06 13:54:25 INFO Worker: Successfully registered with master spark://

You can refresh the MasterWebUI and you should see the worker.

Third, sling some code:

This go around, I wanted to use Java instead of Scala.  (Sorry, but I'm still not on the Scala bandwagon, it feels like Java 8 is giving me what I needed with respect to functions and lambdas)

I found this Datastax post:

Which lead me to this code:

Kudos to Jacek, but that gist is directed at an old version of Spark.  It also rolled everything into a single class,  (which means it doesn't help you in a real-world situation where you have lots of classes and dependencies) In the end, I decided to update my quick start project so everyone can get up and running quickly.

Go clone this:

Build with maven:

mvn clean install

Then, have a look at the run.sh.  This actually submits the job to the Spark cluster (single node in our case).  The contents of that script are as follows:

spark-submit --class com.github.boneill42.JavaDemo --master spark:// target/spark-on-cassandra-0.0.1-SNAPSHOT-jar-with-dependencies.jar spark://

The --class parameter tells Spark which class to execute.  The --master parameter is the url that you see at the top of MasterWebUI, and tells spark to which master it should submit the job.  The jar file is the result of the build, which is a fat jar that includes the job (courtesy of the maven assembly plugin).  The last two parameters are the args for the program.  Spark passes those into the JavaDemo class.  After you run this, you should see the job process...

15/04/06 17:16:57 INFO DAGScheduler: Stage 8 (toArray at JavaDemo.java:185) finished in 0.028 s
15/04/06 17:16:57 INFO DAGScheduler: Job 3 finished: toArray at JavaDemo.java:185, took 0.125340 s
(Product{id=4, name='Product A1', parents=[0, 1]},Optional.of(Summary{product=4, summary=505.178}))
(Product{id=7, name='Product B2', parents=[0, 2]},Optional.of(Summary{product=7, summary=494.177}))
(Product{id=5, name='Product A2', parents=[0, 1]},Optional.of(Summary{product=5, summary=500.635}))
(Product{id=2, name='Product B', parents=[0]},Optional.of(Summary{product=2, summary=994.037}))

I'll go into the details of the example in my next post.
Or you can just come to my presentation at Philly Tech Week. =)