Tuesday, April 14, 2015

Tuning Hadoop & Cassandra : Beware of vNodes, Splits and Pages

When running Hadoop jobs against Cassandra, you will want to be careful about a few parameters.

Specifically, pay special attention to vNodes, Splits and Page Sizes.

vNodes were introduced in Cassandra 1.2.  vNodes allow a host to have multiple portions of the token range.  This allows for more evenly distributed data, which means nodes can share the burden of a node rebuild (and it doesn't fall all on one node).  Instead the rebuild is distributed across a number of nodes.  (way cool feature)

BUT, vNodes made Hadoop jobs a little trickier...

The first time I went to run a Hadoop job using CQL, I was running a local Hadoop cluster (only one container) against a Cassandra with lots of vNodes (~250).  I was running a simple job (probably a flavor of word count) over a few thousand records.  I expected the job to complete in seconds.  To my dismay, it was taking *forever*.

When you run a Hadoop job, the minimum number of splits is the number of vNodes.

In Hadoop, each split is consumed by a single container, and a container runs against only one split at a time.  Effectively, a Hadoop container is a JVM, which is spun-up to process a specific split of data.

In any Hadoop cluster, there is a maximum number of available containers.  In my case, I had one available container, which meant the splits had to be processed sequentially with spin-up and tear-down overhead for each split.  boooooo...

So, before you do anything check to see how many vNodes you are running.  This is controlled by you Cassandra yaml file.  There is a num_tokens parameter.  Below is an excerpt from cassandra.yaml.

# This defines the number of tokens randomly assigned to this node on the ring
# The more tokens, relative to other nodes, the larger the proportion of data
# that this node will store. You probably want all nodes to have the same number
# of tokens assuming they have equal hardware capability.
# If you already have a cluster with 1 token per node, and wish to migrate to
# multiple tokens per node, see http://wiki.apache.org/cassandra/Operations
num_tokens: 25

To see how many vNodes you have on an existing cluster, you can use nodetool:

➜  cassandra  bin/nodetool ring
Datacenter: datacenter1
Address    Rack        Status State   Load            Owns                Token
                                                                          8743874685407455894  rack1       Up     Normal  41.27 KB        100.00%             -8851282698028303387  rack1       Up     Normal  41.27 KB        100.00%             -8032384077572986919  rack1       Up     Normal  41.27 KB        100.00%             -7279609033604637725
...  rack1       Up     Normal  41.27 KB        100.00%             8107926707118065773  rack1       Up     Normal  41.27 KB        100.00%             8743874685407455894

You will notice that the number of token ranges, equals the number you set in the configuration file(s):

➜  cassandra  bin/nodetool ring | grep rack1 | wc -l

For more information on vNodes and configuration, please see Patrick's most excellent deck:

Because the minimum number of splits is the number of vNodes, you may want to run a separate Cassandra ring, with a smaller number of vNodes, that you can use for analytics.

OK, so what does this mean and how do you fix it?

Well, if you have a small number of containers and a high number of vNodes, split overhead may become a problem (as it was in my local Hadoop cluster).  But perhaps more importantly, if you run against a large number of records, and use the default split size, you will end up with an insane number of splits.  For example,  we recently ran against a table that had 5 billion records.  The default split size is 64K. [1]  Doing the math:

# of rows / split size = # of splits
5B / 64K = 78,125 splits!

That is an awful lot of splits, and you are going to take an overhead spin-up/tear-down penalty for each of those splits.  Even assuming only 5 seconds of overhead for each split, that is a ton of time:

78125 * 10 seconds  / 60  / 60  = ~100 hours of overhead (NOT GOOD!)

Optimally, the number of splits would equal the number of containers available. That typically won't be possible, but we can get as close as possible by setting the split size on the Hadoop job with the following line:

org.apache.cassandra.hadoop.ConfigHelper.setInputSplitSize(job.getConfiguration(), 10000000); // splitSize = 10M

Let's redo the math with this value:

5B / 10M = 500 splits (MUCH BETTER!)

If we are running on a Hadoop cluster with 250 available containers, we'll finish in two passes on the splits, each container processing two splits.

With the split sizes worked out, you will want to mind the page size parameter.  The page size parameter tells the CQL driver how many rows you want back at a time.  It is the equivalent of the LIMIT clause on the CQL statement.  For this one, you will want to pull back as many as possible without blowing up memory. (which I've done ;)   The default value of page size is 1000. To configure the page size, use the following line:

CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), Integer.toString(100));

Hopefully this saves people some time. Happy tuning. (shout out to gnana ;)

Monday, April 6, 2015

Holy momentum Batman! Spark and Cassandra (circa 2015) w/ Datastax Connector and Java

Over a year ago, I did a post on Spark and Cassandra.  At the time, Calliope was your best best.  Since then, Spark has exploded in popularity.

Check out this Google Trends chart.  That's quite a hockey stick for Spark.
Also notice their github project, which has almost 500 contributors and 3000 forks!!

Datastax is riding that curve.  And just in the time since my last post, Datastax developed and released their own spark-cassandra connector.  

I am putting together some materials for Philly Tech Week, where I'll be presenting on Cassandra development, so I thought I would give it a spin to see how things have evolved.  Here is a follow-along preview of the Spark piece of what I'll be presenting.

First, get Spark running:

Download Spark 1.2.1 (this is the version that works with the latest Datastax connector)

Next, unpack it and build it.  (mvn install)

And that is the first thing I noticed...

[INFO] Spark Project Parent POM
[INFO] Spark Project Networking
[INFO] Spark Project Shuffle Streaming Service
[INFO] Spark Project Core
[INFO] Spark Project Bagel
[INFO] Spark Project GraphX
[INFO] Spark Project Streaming
[INFO] Spark Project Catalyst
[INFO] Spark Project SQL
[INFO] Spark Project ML Library
[INFO] Spark Project Tools
[INFO] Spark Project Hive
[INFO] Spark Project REPL
[INFO] Spark Project Assembly
[INFO] Spark Project External Twitter
[INFO] Spark Project External Flume Sink
[INFO] Spark Project External Flume
[INFO] Spark Project External MQTT
[INFO] Spark Project External ZeroMQ
[INFO] Spark Project External Kafka
[INFO] Spark Project Examples

Look at all these new cool toys?  GraphX?  SQL? Kafka? In my last post, I was using Spark 0.8.1.  I took a look at the 0.8 branch on github and sure enough, all of this stuff was built just in the last year! It is crazy what momentum can do. A

After you've built spark, go into the conf directory and copy the template environment file.

cp spark-env.sh.template spark-env.sh

Then, edit that file and add a line to configure the master IP/bind interface:


(If you don't set the IP, the master may bind to the wrong interface, and your application won't be able to connect, which is what happened to me initially)

Next, launch the master:  (It gives you the path to the logs, which I recommend tailing)


In the logs, you should see:

15/04/06 12:46:39 INFO Master: Starting Spark master at spark://
15/04/06 12:46:39 INFO Utils: Successfully started service 'MasterUI' on port 8080.
15/04/06 12:46:39 INFO MasterWebUI: Started MasterWebUI at http://localhost:8080

Go hit that WebUI at http://localhost:8080.

Second,  get yourself some workers:

Spark has its own concept of workers.  To start one, run the following command:

bin/spark-class org.apache.spark.deploy.worker.Worker spark://

After a few seconds, you should see the following:
15/04/06 13:54:23 INFO Utils: Successfully started service 'WorkerUI' on port 8081.
15/04/06 13:54:23 INFO WorkerWebUI: Started WorkerWebUI at http://localhost:8081
15/04/06 13:54:23 INFO Worker: Connecting to master spark://
15/04/06 13:54:25 INFO Worker: Successfully registered with master spark://

You can refresh the MasterWebUI and you should see the worker.

Third, sling some code:

This go around, I wanted to use Java instead of Scala.  (Sorry, but I'm still not on the Scala bandwagon, it feels like Java 8 is giving me what I needed with respect to functions and lambdas)

I found this Datastax post:

Which lead me to this code:

Kudos to Jacek, but that gist is directed at an old version of Spark.  It also rolled everything into a single class,  (which means it doesn't help you in a real-world situation where you have lots of classes and dependencies) In the end, I decided to update my quick start project so everyone can get up and running quickly.

Go clone this:

Build with maven:

mvn clean install

Then, have a look at the run.sh.  This actually submits the job to the Spark cluster (single node in our case).  The contents of that script are as follows:

spark-submit --class com.github.boneill42.JavaDemo --master spark:// target/spark-on-cassandra-0.0.1-SNAPSHOT-jar-with-dependencies.jar spark://

The --class parameter tells Spark which class to execute.  The --master parameter is the url that you see at the top of MasterWebUI, and tells spark to which master it should submit the job.  The jar file is the result of the build, which is a fat jar that includes the job (courtesy of the maven assembly plugin).  The last two parameters are the args for the program.  Spark passes those into the JavaDemo class.  After you run this, you should see the job process...

15/04/06 17:16:57 INFO DAGScheduler: Stage 8 (toArray at JavaDemo.java:185) finished in 0.028 s
15/04/06 17:16:57 INFO DAGScheduler: Job 3 finished: toArray at JavaDemo.java:185, took 0.125340 s
(Product{id=4, name='Product A1', parents=[0, 1]},Optional.of(Summary{product=4, summary=505.178}))
(Product{id=7, name='Product B2', parents=[0, 2]},Optional.of(Summary{product=7, summary=494.177}))
(Product{id=5, name='Product A2', parents=[0, 1]},Optional.of(Summary{product=5, summary=500.635}))
(Product{id=2, name='Product B', parents=[0]},Optional.of(Summary{product=2, summary=994.037}))

I'll go into the details of the example in my next post.
Or you can just come to my presentation at Philly Tech Week. =)

Friday, April 3, 2015

High-Performance Computing Clusters (HPCC) and Cassandra on OS X

Our new parent company, LexisNexis, has one of the world's largest public records database:

"...our comprehensive collection of more than 46 billion records from more than 10,000 diverse sources—including public, private, regulated, and derived data. You get comprehensive information on approximately 269 million individuals and 277 million unique businesses."

And they've been managing, analyzing and searching this database for decades.  Over that time period, they've built up quite an assortment of "Big Data" technologies.  Collectively, LexisNexis refers to those technologies as their High-Performance Computing Cluster (HPCC) platform.

HPCC is entirely open source:

Naturally, we are working through the marriage of HPCC with our real-time data management and analytics stack.  The potential is really exciting.  Specifically, HPCC has sophisticated machine learning and statistics libraries, and a query engine (Roxie) capable of serving up those statistics.

Low and behold, HPCC can use Cassandra as a backend storage mechanism! (FTW!)

The HPCC platform isn't technically supported on a Mac, but here is what I did to get it running:

HPCC Install

      brew install icu4c
      brew install boost
      brew install libarchive
      brew install bison27
      brew install openldap
      brew install nodejs
  • Make a build directory, and run cmake from there:
      export CC=/usr/bin/clang
      export CXX=/usr/bin/clang++
      cmake ../ -DICU_LIBRARIES=/usr/local/opt/icu4c/lib/libicuuc.dylib -DICU_INCLUDE_DIR=/usr/local/opt/icu4c/include -DLIBARCHIVE_INCLUDE_DIR=/usr/local/opt/libarchive/include -DLIBARCHIVE_LIBRARIES=/usr/local/opt/libarchive/lib/libarchive.dylib -DBOOST_REGEX_LIBRARIES=/usr/local/opt/boost/lib -DBOOST_REGEX_INCLUDE_DIR=/usr/local/opt/boost/include  -DUSE_OPENLDAP=true -DOPENLDAP_INCLUDE_DIR=/usr/local/opt/openldap/include -DOPENLDAP_LIBRARIES=/usr/local/opt/openldap/lib/libldap_r.dylib -DCLIENTTOOLS_ONLY=false -DPLATFORM=true
  • Then, compile and install with (sudo make install)
  • After that, you'll need to muck with the permissions a bit:
      chmod -R a+rwx /opt/HPCCSystems/
      chmod -R a+rwx /var/lock/HPCCSystems
      chmod -R a+rwx /var/log/HPCCSystems
  • Now, ordinarily you would run hpcc-init to get the system configured, but that script fails on OS X, so I used linux to generate config files that work and posted those to a repository here:
  • Clone this repository and replace /var/lib/HPCCSystems with the content of var_lib_hpccsystems.zip
      sudo rm -fr /var/lib/HPCCSystems
      sudo unzip var_lib_hpccsystems.zip -d /var/lib
      chmod -R a+rwx /var/lib/HPCCSystems
  • Then, from the directory containing the xml files in this repository, you can run:
      daserver (Runs the Dali server, which is the persistence mechanism for HPCC)
      esp (Runs the ESP server, which is the web services and UI layer for HPCC)
      eclccserver (Runs the ECL compile server, which takes the ECL and compiles it down to C++ and then a dynmic library)
      roxie (Runs the Roxie server, which is capable of responding to queries)
  • Kickoff each one of those, then you should be ready to run some ECL. Then, go to http://localhost:8010 in a browser.  You are ready to run some ECL!

Running ECL

Like Pig with Hadoop, HPCC runs a DSL called ECL.  More information on ECL can be found here:
  • As a simple smoke test, go into your HPCC-Platform repository, and go under: ./testing/regress/ecl.  
  • Then, run the following:
      ecl run hello.ecl --target roxie --server=localhost:8010
  • You should see the following:
        <dataset name="Result 1"> 
        <row><result_1>Hello world</result_1></row> 

Cassandra Plugin

With HPCC up and running, we are ready to have some fun with Cassandra.  HPCC has plugins.  Those plugins reside in /opt/HPCCSystems/plugins.  For me, I had to copy those libraries into /opt/HPCCSystems/lib to get HPCC to recognize them.

Go back to the /opt/HPCCSystems/testing/regress/ecl directory and have a look at cassandra-simple.ecl. A snippet is shown below:


childrec := RECORD
   string name,
   integer4 value { default(99999) },
   boolean boolval { default(true) },
   real8 r8 {default(99.99)},
   real4 r4 {default(999.99)},
   DATA d {default (D'999999')},
   DECIMAL10_2 ddd {default(9.99)},
   UTF8 u1 {default(U'9999 ß')},
   UNICODE u2 {default(U'9999 ßßßß')},
   STRING a,
   SET OF STRING set1,
   SET OF INTEGER4 list1,
   LINKCOUNTED DICTIONARY(maprec) map1{linkcounted};

init := DATASET([{'name1', 1, true, 1.2, 3.4, D'aa55aa55', 1234567.89, U'Straße', U'Straße','Ascii',['one','two','two','three'],[5,4,4,3],[{'a'=>'apple'},{'b'=>'banana'}]},
                 {'name2', 2, false, 5.6, 7.8, D'00', -1234567.89, U'là', U'là','Ascii', [],[],[]}], childrec);

load(dataset(childrec) values) := EMBED(cassandra : user('boneill'),keyspace('test'),batch('unlogged'))
  INSERT INTO tbl1 (name, value, boolval, r8, r4,d,ddd,u1,u2,a,set1,list1,map1) values (?,?,?,?,?,?,?,?,?,?,?,?,?);


In this example, we define childrec as a RECORD with a set of fields. We then create a DATASET of type childrec. Then we define a method that takes a dataset of type childrec and runs the Cassandra insert command for each of the records in the dataset.

Startup a Cassandra locally.  (download Cassandra, unzip it, then run bin/cassandra -f (to keep it in foreground))

Once Cassandra is up, simply run the ECL like you did the hello program.

ecl run cassandra-simple.ecl --target roxie --server=localhost:8010

You can then go over to cqlsh and validate that all the data made it back into Cassandra:

➜  cassandra  bin/cqlsh
Connected to Test Cluster at localhost:9160.
[cqlsh 4.1.1 | Cassandra 2.0.7 | CQL spec 3.1.1 | Thrift protocol 19.39.0]
Use HELP for help.
cqlsh> select * from test.tbl1 limit 5;

 name      | a | boolval | d              | ddd  | list1 | map1 | r4     | r8     | set1 | u1     | u2        | value
  name1575 |   |    True | 0x393939393939 | 9.99 |  null | null | 1576.6 |   1575 | null | 9999 ß | 9999 ßßßß |  1575
  name3859 |   |    True | 0x393939393939 | 9.99 |  null | null | 3862.9 |   3859 | null | 9999 ß | 9999 ßßßß |  3859
 name11043 |   |    True | 0x393939393939 | 9.99 |  null | null |  11054 |  11043 | null | 9999 ß | 9999 ßßßß | 11043
  name3215 |   |    True | 0x393939393939 | 9.99 |  null | null | 3218.2 |   3215 | null | 9999 ß | 9999 ßßßß |  3215
  name7608 |   |   False | 0x393939393939 | 9.99 |  null | null | 7615.6 | 7608.1 | null | 9999 ß | 9999 ßßßß |  7608

OK -- that should give a little taste of ECL and HPCC.    It is a powerful platform.
As always, let me know if you run into any trouble.