When running Hadoop jobs against Cassandra, you will want to be careful about a few parameters.
Specifically, pay special attention to vNodes, Splits and Page Sizes.
vNodes were introduced in Cassandra 1.2. vNodes allow a host to have multiple portions of the token range. This allows for more evenly distributed data, which means nodes can share the burden of a node rebuild (and it doesn't fall all on one node). Instead the rebuild is distributed across a number of nodes. (way cool feature)
BUT, vNodes made Hadoop jobs a little trickier...
The first time I went to run a Hadoop job using CQL, I was running a local Hadoop cluster (only one container) against a Cassandra with lots of vNodes (~250). I was running a simple job (probably a flavor of word count) over a few thousand records. I expected the job to complete in seconds. To my dismay, it was taking *forever*.
When you run a Hadoop job, the minimum number of splits is the number of vNodes.
In Hadoop, each split is consumed by a single container, and a container runs against only one split at a time. Effectively, a Hadoop container is a JVM, which is spun-up to process a specific split of data.
In any Hadoop cluster, there is a maximum number of available containers. In my case, I had one available container, which meant the splits had to be processed sequentially with spin-up and tear-down overhead for each split. boooooo...
So, before you do anything check to see how many vNodes you are running. This is controlled by you Cassandra yaml file. There is a num_tokens parameter. Below is an excerpt from cassandra.yaml.
# This defines the number of tokens randomly assigned to this node on the ring # The more tokens, relative to other nodes, the larger the proportion of data # that this node will store. You probably want all nodes to have the same number # of tokens assuming they have equal hardware capability. ... # If you already have a cluster with 1 token per node, and wish to migrate to # multiple tokens per node, see http://wiki.apache.org/cassandra/Operations num_tokens: 25
To see how many vNodes you have on an existing cluster, you can use nodetool:
➜ cassandra bin/nodetool ring Datacenter: datacenter1 ========== Address Rack Status State Load Owns Token 8743874685407455894 127.0.0.1 rack1 Up Normal 41.27 KB 100.00% -8851282698028303387 127.0.0.1 rack1 Up Normal 41.27 KB 100.00% -8032384077572986919 127.0.0.1 rack1 Up Normal 41.27 KB 100.00% -7279609033604637725 ... 127.0.0.1 rack1 Up Normal 41.27 KB 100.00% 8107926707118065773 127.0.0.1 rack1 Up Normal 41.27 KB 100.00% 8743874685407455894
You will notice that the number of token ranges, equals the number you set in the configuration file(s):
➜ cassandra bin/nodetool ring | grep rack1 | wc -l 25
For more information on vNodes and configuration, please see Patrick's most excellent deck:
http://www.slideshare.net/patrickmcfadin/cassandra-virtual-node-talk
SIDENOTE:
Because the minimum number of splits is the number of vNodes, you may want to run a separate Cassandra ring, with a smaller number of vNodes, that you can use for analytics.
OK, so what does this mean and how do you fix it?
Well, if you have a small number of containers and a high number of vNodes, split overhead may become a problem (as it was in my local Hadoop cluster). But perhaps more importantly, if you run against a large number of records, and use the default split size, you will end up with an insane number of splits. For example, we recently ran against a table that had 5 billion records. The default split size is 64K. [1] Doing the math:
# of rows / split size = # of splits 5B / 64K = 78,125 splits!
That is an awful lot of splits, and you are going to take an overhead spin-up/tear-down penalty for each of those splits. Even assuming only 5 seconds of overhead for each split, that is a ton of time:
78125 * 10 seconds / 60 / 60 = ~100 hours of overhead (NOT GOOD!)
Optimally, the number of splits would equal the number of containers available. That typically won't be possible, but we can get as close as possible by setting the split size on the Hadoop job with the following line:
org.apache.cassandra.hadoop.ConfigHelper.setInputSplitSize(job.getConfiguration(), 10000000); // splitSize = 10M
Let's redo the math with this value:
5B / 10M = 500 splits (MUCH BETTER!)
If we are running on a Hadoop cluster with 250 available containers, we'll finish in two passes on the splits, each container processing two splits.
With the split sizes worked out, you will want to mind the page size parameter. The page size parameter tells the CQL driver how many rows you want back at a time. It is the equivalent of the LIMIT clause on the CQL statement. For this one, you will want to pull back as many as possible without blowing up memory. (which I've done ;) The default value of page size is 1000. To configure the page size, use the following line:
CqlConfigHelper.setInputCQLPageRowSize(job.getConfiguration(), Integer.toString(100));
Hopefully this saves people some time. Happy tuning. (shout out to gnana ;)