Monday, March 14, 2016

PySpark on Amazon EMR w/ Kinesis

Spark is fantastic.  And its streaming framework has proven to be a perfect fit, functioning as the real-time leg of a lambda architecture.

In our initial use of Spark, we decided to go with Java, since Spark runs native on the JVM. (and sorry Scala, still not a fan)  That worked well, but we knew we would have to support other languages.  These days python is the lingua franca of the data science community.  And since we want to deploy ever more sophisticated analytics to our real-time pipe, we want to include support for python.

Enter PySpark.  This blog should get you up and running with PySpark on EMR, connected to Kinesis.

Get an EMR Cluster up and running!

First, you need an EMR cluster.   At Monetate, we treat infrastructure as code and use CloudFormation extensively (via troposphere) to accomplish that.  However, to my dismay I found that EMR support in troposphere is still WIP.

No big deal I thought.  I can work from pure JSON against CloudFormation.  I looked up the docs for EMR resources in CloudFormation and was able to WIP up (see what I did there?) the following JSON:

  "Resources": {
      "Type" : "AWS::EMR::Cluster",
      "Properties" : {
        "ReleaseLabel" : "emr-4.3.0",
        "Instances" : {
          "CoreInstanceGroup" : {
            "InstanceType" : "m4.large",
            "InstanceCount" : "2"
          "MasterInstanceGroup" : {
            "InstanceType" : "m4.large",
            "InstanceCount" : "1"
        "JobFlowRole" : "DevPolicy1-Role-1FOOFV2IV",
        "Name" : "MyEmr1",
        "ServiceRole" : "DevPolicy1-Role-1FOOFV2IV",
        "VisibleToAllUsers" : true

Make sure you specify 'ReleaseLabel'  even though the CloudFormation (CF) documentation says that it is optional. If you don't specify it, CF assumes an outdated amiVersion (1.0.0), and will fail to start.

So, I slung the above JSON at CF and it sorta worked, but balked on the security groups.  After much digging, I uncovered the following *important* note in the EMR documentation regarding custom IAM roles.

The IAM role name and the instance profile name must match exactly when you use either the Amazon EMR console or CLI.
In some situations, you might need to work with an IAM role whose associated instance profile does not have the same name as the role. This can occur if you use AWS CloudFormation to manage IAM roles for you, because AWS CloudFormation adds a suffix to the role name to create the instance profile name.  In this case, you can use the Amazon EMR API or CLI to specify the instance profile name. 

Ugh.  This was exactly our case.  We use CF for our roles, and thus the instance profile name and the IAM role name do NOT match exactly, which would have been fine if we were using the CLI or the EMR API.  But again, we wanted to use CF.

So -- in the end -- PUNT on CF for EMR!  Back to the good old CLI.  The following AWS CLI voodoo did the trick:

aws emr create-cluster --release-label emr-4.3.0 --instance-type m3.xlarge --instance-count 3 --no-auto-terminate --service-role DevPolicy1-Role-1FOOFV2IV --applications Name=Spark --name MyEmr1 --ec2-attributes InstanceProfile=DevPolicy1-InstanceProfile-1FOOFV2IV,KeyName=dev-foo --log-uri s3://log-foo-dev/spark/ --enable-debugging

Notice that the CLI lets you specify ec2-attributes, including the InstanceProfile, which is different from the role if you use CloudWatch to construct your roles.

With that voodoo, you should see your EMR cluster in the AWS console.  Take note of the cluster id, you will need that in later steps.

Sling some code...

First, you will want to be able to get onto your EMR master node.  In the AWS CLI command above, we specified a KeyName.  That is the ssh key you will use to connect to the box.  The host name you can find in the AWS Console by clicking on your EMR cluster.  Look under, "Master public DNS".  If you click "SSH" next to that, it will give you specific instructions.

Hints: Be sure that the -i param points to your local private key, and make sure that you are connecting as the 'hadoop' user. 

I was able to connect with the following:

ssh -i ~/.ssh/dev-foo.pem

To test out the configuration of your machine, give the PySpark console a try by typing:

[hadoop@ip-10-171-73-185 ~]$ pyspark
Python 2.7.10 (default, Dec  8 2015, 18:25:23)
[GCC 4.8.3 20140911 (Red Hat 4.8.3-9)] on linux2

Note that EMR is running version 2.7.10 of python!!!  The example code from Spark assumes version 3.  We'll need to make a couple edits to get that sample code to work out on our EMR instance.

Now that we are connected and have a working PySpark shell, we can use the same credentials to transfer code to our cluster.  Specifically, let's transfer the Spark Kinesis example code to our EMR cluster.

First, download that sample code to your local machine.  Next, let's edit the code to make it 2.7 friendly.  Specifically, add the following line to the top of the file:

from __future__ import print_function

This will import the print function from python 3, and make it available in python 2.  With that edit in place, transfer the file to the master node:

scp -i ~/.ssh/dev-foo.pem

That will place the python code up on the master node in the temp directory.

Launch a job...

Finally, we need to launch the job.  Since we are going to use Kinesis as our data source, let's first create a stream for our job:

aws kinesis create-stream --stream-name foo --shard-count 1

In a few minutes, you should see a stream provisioned in the AWS console, and you are ready to connect a job to it.

To connect our job to that stream, we must launch the job.  In EMR terminology, we launch a job by by "adding a step".  Let's stick with the CLI for that.   Again, you need a special incantation:

aws emr add-steps --cluster-id j-8S4BHCR3UV7G --steps Name=Spark,Jar=s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar,Args=[/usr/bin/spark-submit,--packages,org.apache.spark:spark-streaming-kinesis-asl-assembly_2.10:1.6.0,--deploy-mode,client,/tmp/,myapp,foo,,us-east-1],ActionOnFailure=CONTINUE

The cluster-id should match the id you captured when you created the cluster.   You can also find it in the AWS console.  The params to the python are passed along in the Args parameter.  Specifically, we supply four params: "myapp", "foo", "https://kinesis...", and "us-east-1".  These are the kinesis app name, the stream name, the kinesis end point, and the region respectively.

In a few minutes, you should now see a step running in the AWS console.  To examine the stdout and stderr for that task, ssh to the master machine again and look in /var/log/hadoop/steps.  You should see a directory for the new step, and within that directory you will find two files: stderr and stdout. Happily tail those logs with something like the following:

tail -f /var/log/hadoop/steps/{step-id}/std*

In the stdout stream, you should see something like this:

Time: 2016-03-14 20:49:12
Time: 2016-03-14 20:49:13

Spew events...

With our job running out on EMR, the only thing left is to spew events at the stream.   You can use our fancy little utility called Koupler to put data on your stream with the following command line:

~/git/koupler/build/distributions/koupler-0.2.5-SNAPSHOT> printf "hello\nworld\n" | ./ -pipe -streamName foo

Look back over to your tail and you should see:

Time: 2016-03-14 20:56:07
(u'world', 1)
(u'hello', 1)


And there you have it!  There were a couple bumps in the road, but not too bad.  You are now able to run python, at scale, in the cloud, against real-time events.  Enjoy.

No comments: