Tuesday, March 25, 2014

Shark on Cassandra (w/ Cash) : Interrogating cached data from C* using HiveQL

As promised, here is part deux of the Spark/Shark on Cassandra series.

In the previous post, we got up and running with Spark on Cassandra.   Spark gave us a way to report off of data stored in Cassandra.  It was an improvement over MR/Hadoop, but we were still left articulating our operations in Java code.  Shark provides an integration layer between Hive and Spark, which allows us to articulate operations in HiveQL at the shark prompt.  This enables a non-developer community to explore and analyze data in Cassandra.

Setup a Spark Cluster

Before we jump to Shark, let's get a Spark cluster going.  To start a spark cluster, first start the master server with:
$SPARK_HOME> bin/start-master.sh
starting org.apache.spark.deploy.master.Master, logging to /Users/bone/tools/spark-0.8.1-incubating-bin-hadoop2/bin/../logs/spark-bone-org.apache.spark.deploy.master.Master-1-zen.local.out

To ensure the master started properly, tail the logs:
14/03/25 19:54:42 INFO ActorSystemImpl: RemoteServerStarted@akka://sparkMaster@zen.local:7077
14/03/25 19:54:42 INFO Master: Starting Spark master at spark://zen.local:7077
14/03/25 19:54:42 INFO MasterWebUI: Started Master web UI at

In the log output, you will see the master Spark URL (e.g. spark://zen.local:7077).  You will also see the URL for the web UI.  Cut and paste that URL into a browser and have a look at the UI. You'll notice that no workers are available.  So, let's start one:
$SPARK_HOME> bin/start-slaves.sh
localhost: starting org.apache.spark.deploy.worker.Worker, logging to /Users/bone/tools/spark-0.8.1-incubating-bin-hadoop2/bin/../logs/spark-bone-org.apache.spark.deploy.worker.Worker-1-zen.local.out

Again, tail the logs.  You should see the worker successfully register with the master.  You should also see the worker show up in the web UI.  And now, we are ready to get moving with Shark.

Setup Shark

First, download Shark and Hive.  I used shark-0.8.1-bin-hadoop1.tgz and hive-0.9.0-bin.tgz. Untar each of those.  In the $SHARK_HOME/conf directory, copy the shark-env.sh.template file to shark-env.sh and edit the file.  Ensure the settings are configured properly.  For example:
export SPARK_MEM=4g
export SCALA_HOME="/Users/bone/tools/scala-2.9.3"
export HIVE_HOME="/Users/bone/tools/hive-0.9.0-bin"
export SPARK_HOME="/Users/bone/tools/spark-0.8.1-incubating-bin-hadoop2"
export MASTER="spark://zen.local:7077"
Note that the MASTER variable is set to the master URL from the spark cluster.  Make sure that the HADOOP_HOME variable is *NOT* set.  Shark can operate directly on Spark. (you need not have Hadoop deployed)

As with Spark, we are going to use an integration layer developed by TupleJump.   The integration layer is called Cash:

To get started, clone the cash repo and follow the instructions here.   In summary, build the project and copy target/*.jar and target/dependency/cassandra-*.jar into $HIVE_HOME/lib.

Play with Shark

Fun time.  Start shark with the following:
bone@zen:~/tools/shark-> bin/shark

Note that there are two other versions of this command (bin/shark-withinfo and bin/shark-withdebug).  Both are *incredibly* useful if you run into trouble. 

Once you see the shark prompt, you should be able to refresh the Spark Web UI and see Shark under Running Applications.  To get started, first create a database.  Using the schema from our previous example/post, let's call our database "northpole":
shark> create database northpole;
Time taken: 0.264 seconds

Next, you'll want to create an external table that maps to your cassandra table with:
shark> CREATE EXTERNAL TABLE northpole.children(child_id string, country string, first_name string, last_name string, state string, zip string)
     >    STORED BY 'org.apache.hadoop.hive.cassandra.cql.CqlStorageHandler'
     >    WITH SERDEPROPERTIES ("cql.primarykey"="child_id", "comment"="check", "read_repair_chance"="0.1", "cassandra.host"="localhost", "cassandra.port"="9160", "dclocal_read_repair_chance"="0.0", "gc_grace_seconds"="864000", "bloom_filter_fp_chance"="0.1", "cassandra.ks.repfactor"="1", "compaction"="{'class' : 'SizeTieredCompactionStrategy'}", "replicate_on_write"="false", "caching"="all");
Time taken: 0.419 seconds

At this point, you are free to execute some HiveQL queries!  Let's do a simple select:
shark> select * from northpole.children;
977.668: [Full GC 672003K->30415K(4054528K), 0.2639420 secs]
michael.myers USA Michael Myers PA 18964
bart.simpson USA Bart Simpson CA 94111
johny.b.good USA Johny Good CA 94333
owen.oneill IRL Owen O'Neill D EI33
richie.rich USA Richie Rich CA 94333
collin.oneill IRL Collin O'Neill D EI33
dennis.menace USA Dennis Menace CA 94222
Time taken: 13.251 seconds


How cool is that? Now, let's create a cached table!
shark> CREATE TABLE child_cache TBLPROPERTIES ("shark.cache" = "true") AS SELECT * FROM northpole.children;
Moving data to: file:/user/hive/warehouse/child_cache
Time taken: 10.294 seconds

And finally, let's try that select again, this time against the cached table:
shark> select * from child_cache;
owen.oneill IRL Owen O'Neill D EI33
bart.simpson USA Bart Simpson CA 94111
michael.myers USA Michael Myers PA 18964
dennis.menace USA Dennis Menace CA 94222
richie.rich USA Richie Rich CA 94333
johny.b.good USA Johny Good CA 94333
collin.oneill IRL Collin O'Neill D EI33
Time taken: 6.511 seconds


Alrighty, that should get you started.
Again -- kudos to TupleJump for all their work on the Spark/Shark -> C* bridge.

No comments: