tag:blogger.com,1999:blog-68720233966476815972024-03-05T03:45:59.992-08:00Brian ONeill's Random ThoughtsBrian O'Neillhttp://www.blogger.com/profile/04198208062865899011noreply@blogger.comBlogger185125tag:blogger.com,1999:blog-6872023396647681597.post-48598469574021230112018-09-19T16:41:00.002-07:002018-09-19T16:41:31.993-07:00CORS Headers, Preflight & Performance (ie. How to get rid of the OPTIONS calls)<br />
Monetate personalizes hundreds of millions of page views each day. To do that, we take hundreds of data points into account (e.g. weather, geolocation, inventory information, population density, past behavior, etc.), make a decision, and then personalize the page (e.g. content, product recommendations, etc). <br />
<br />
But this means that our client's web sites don't render until <i><b>after </b></i>we've made our decisions, which means we need to be fast.<br />
<br />
How fast you ask? 12 milliseconds per decision fast.<br />
And in that kind of environment, every millisecond counts. <br />
<br />
To personalize each page, the browser reaches out to Monetate servers for decisions/actions prior to render. The web page contacts Monetate servers via a cross-origin request, which means <a href="https://developer.mozilla.org/en-US/docs/Web/HTTP/CORS">Cross-Origin Resource Sharing (CORS</a>) comes into play. The CORS interaction comes with a "preflight" request that basically amounts to the client asking the server if it can handle a cross-origin request. If the server replies in the affirmative, then the client sends the actual request. Browsers issue preflight requests for potentially "dangerous" requests. <br />
<br />
For more information on the motivation behind CORs, see <a href="https://stackoverflow.com/questions/15381105/cors-what-is-the-motivation-behind-introducing-preflight-requests">this stack overflow</a>. <br />
<br />
Note specifically:<br />
"New servers that are written with an awareness of CORS. According to standard security practices, the server has to protect its resources in the face of any incoming request -- servers can't trust clients to not do malicious things. This scenario doesn't benefit from the preflight mechanism: the preflight mechanism brings no additional security to a server that has properly protected its resources."<br />
<br />
This is our scenario. Our servers are not only CORs aware, but they are purpose-built to handle cross-origin mutating requests. Thus, in our case the pre-flight request is pure overhead without benefit. (costing tens if not hundreds of milliseconds!)<br />
<br />
To eliminate that pesky preflight request, we need to convince the browser that this is not a "dangerous" request. To do that..<br />
<br />
Some people change their API, see: <a href="https://gooroo.io/GoorooTHINK/Article/16408/Two-Strategies-for-Crossing-Origins-with-Performance-in-Mind/19880#.W5qWJpNKifg" style="font-style: italic;">Two Strategies for Crossing Origins with Performance in Mind</a>.<br />
Some people use proxies, see: <a href="https://medium.com/@praveen.beatle/avoiding-pre-flight-options-calls-on-cors-requests-baba9692c21a"><i>Avoiding pre-flight OPTIONS calls on CORS requests</i></a><br />
Some people try lots of things, see: <a href="https://damon.ghost.io/killing-cors-preflight-requests-on-a-react-spa/"><i>Killing CORS Preflight Requests on a React SPA</i></a><br />
<br />
Of those options, I don't like changing the API, because our API is consumed from lots of different channels. (Mobile apps, etc.) Having two different APIs to maintain, develop, etc. just to accommodate CORs seems like really bad decision. Meanwhile, proxies would just introduce latency without any added value.<br />
<br />
Enter the content-type header...<br />
<br />
In our scenario, the browser flags our request as "dangerous" because it contains a JSON object. The browser knows it contains a JSON object because the content-type header is set to application/json. Changing the value of this header to text/plain allows the browser to send the request with no preflight!<br />
<br />
So, we did that. <br />
Boom... instant performance improvement.<br />
We shaved almost a hundred milliseconds off of our request times!<br />
<br />
Now, I would have preferred a standard way of communicating to the browser that it can "trust this server", but if there was such a method, then a nefarious individual that was trying to issue the errant request against an unsuspecting server could use that same method to bypass the safety check. Oh well. Oh well.<br />
<br />
At this point, it seems to be best practice to eliminate the preflight request in performance sensitive scenarios where there is no benefit to the added check, and changing the content-type seems the least intrusive way of doing that. For you purists out there, you'll just need to squint a bit, and keep telling yourself that JSON is a form of text/plain. ;)<br />
<br />
Happy hacking.<br />
<br />
<br />
<br />
<br />Brian O'Neillhttp://www.blogger.com/profile/04198208062865899011noreply@blogger.com0tag:blogger.com,1999:blog-6872023396647681597.post-29673564639331887872018-09-19T14:15:00.000-07:002018-09-19T14:15:13.558-07:00Charting PagerDuty Incidents over Time (using pandas)<br />
We churn out charts for board meetings to show the health of our system (uptime, etc.). Historically, we did that once per quarter, manually. Recently, I endeavored to create a live dashboard for the same information, starting with production incidents over time.<br />
<br />
We use PagerDuty to alert on-call staff. Each incident is stored in PagerDuty, which is queryable via the <a href="https://v2.developer.pagerduty.com/v2/page/api-reference#!/API_Reference/get_api_reference">PagerDuty API</a>. From there, it is easy enough to transform that JSON into a matplotlib chart using pandas:<br />
<br />
First, we grab the data:<br />
<br />
<pre style="background-color: #eeeeee;">from datetime import datetime, timedelta
import requests
%matplotlib inline
api_url = "https://api.pagerduty.com/incidents"
headers = {
'Accept': 'application/vnd.pagerduty+json;version=2',
'Authorization': 'Token token=YOUR_TOKEN'
}
today = datetime.today()
until = today.replace(day=1)
def get_month(since, until):
current_date = since
total_incidents = 0
while current_date < until:
next_date = current_date + timedelta(days=7)
if (next_date > until):
next_date = until
url = api_url + "?since={}&until={}&time_zone=UTC&limit=100".format(current_date.date(), next_date.date())
response = requests.get(url, headers=headers)
incidents = response.json()['incidents']
total_incidents = total_incidents + len(incidents)
current_date = next_date
return total_incidents
# Lookback over twelve months
incidents_per_month = {}
for delta in range(1,12):
since = (until - timedelta(days=1)).replace(day=1)
num_incidents = get_month(since, until)
incidents_per_month[str(since.date())] = num_incidents
print "{} - {} [{}]".format(since.date(), until.date(), num_incidents)
until = since
</pre>
<br />
At this point,<br />
<br />
<pre style="background-color: #eeeeee;">incidents_per_month = { 2018-07-01": 13, "2018-08-01":5 ... }
</pre>
<br />
From there, it is just a matter of plotting the values using pandas:<br />
<br />
<pre style="background-color: #eeeeee;">import pandas as pd
import numpy as np
import matplotlib
import seaborn as sns
data = pd.DataFrame(incidents_per_month.items(), columns=['Month', 'Incidents'])
data = data.sort_values(by=["Month"], ascending=True)
data.plot(kind='bar', x='Month', y='Incidents', color="lightblue")
</pre>
<br />
And voila, there you have it:<br />
<div class="separator" style="clear: both; text-align: center;">
<a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgfsppRmJHkSdU0PUzNYV46yZeZeP5Sotn4o4e3hW2b0L54feIsRL_SNyPNknJgJZDDGGl3xwT-28_q1L33L0pHBFabK6yaRasIuCTfFGyY0UE0FGfZhm8JthWHQCfnZaynuc_2i5Xq_nI/s1600/Screen+Shot+2018-09-19+at+5.13.46+PM.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" data-original-height="536" data-original-width="644" height="266" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEgfsppRmJHkSdU0PUzNYV46yZeZeP5Sotn4o4e3hW2b0L54feIsRL_SNyPNknJgJZDDGGl3xwT-28_q1L33L0pHBFabK6yaRasIuCTfFGyY0UE0FGfZhm8JthWHQCfnZaynuc_2i5Xq_nI/s320/Screen+Shot+2018-09-19+at+5.13.46+PM.png" width="320" /></a></div>
<br />Brian O'Neillhttp://www.blogger.com/profile/04198208062865899011noreply@blogger.com0tag:blogger.com,1999:blog-6872023396647681597.post-43176672676524386402017-12-10T12:34:00.001-08:002017-12-10T12:34:37.602-08:00A man hit my wife, and I'm offering $1,000 for info that leads to his arrest.<div>
A man hit my wife. He did it in front of my two boys.</div>
<div>
The stranger hit my wife hard enough to give her a concussion, and almost a week later she still can't watch television, tolerate noise, or look at her phone without getting a migraine. She can't work and she hasn't been able to attend our boy's basketball games.</div>
<div>
<br /></div>
<div>
Please help us locate the guy that did this.</div>
<div>
<br /></div>
<div>
Here is the full story:</div>
<div>
<br /></div>
<div>
On Monday, December 4th, 2017, my wife Lisa was headed towards <a href="https://www.providencetowncenter.com/">Providence Town Center</a> around 4:30pm. With my sons in the back of the car, Lisa accidentally pulled in front of a man in a dark navy blue or black Chrysler sedan (maybe a 300 or 200?). The man proceeded to tailgate her into the shopping center. When she came to the stop sign in front of the Movie Tavern, the man pulled in front of her, pinning her in. The older (60's?), obese (300 lb?) man proceeded to get out of his car and came to my wife's window. He had grey/white hair, and my sons said his neck shook as he began to yell at my wife. My wife put her window down to apologize, and he took that opportunity to hit my wife in the face. He got back in his car and took off.</div>
<div>
<br /></div>
<div>
Here is the full <a href="http://philadelphia.cbslocal.com/2017/12/06/upper-providence-road-rage/">news article</a>.</div>
<div>
<br /></div>
<div>
The police are investigating, but we could use your help. Please keep an eye out for this type of vehicle, especially if the driver matches the description above. Get the license plate, and report it to the Upper Providence Police: Officer Shilling at 610-933-7899 or by email at 045shi@uprov-montco.org.</div>
<div>
<br /></div>
<div>
We're offering a $1,000 reward for information that leads to his arrest. Thanks in advance.</div>
Brian O'Neillhttp://www.blogger.com/profile/04198208062865899011noreply@blogger.com0tag:blogger.com,1999:blog-6872023396647681597.post-7303120880686337122017-09-04T17:02:00.000-07:002017-09-04T17:02:39.990-07:00CSS not loading with nginx?<br />
Don't forget to add your mime.types!<br />
<br />
<br />
<pre>
[ec2-user@ip-blah ~]$ grep -C 2 mime /etc/nginx/nginx.conf
http {
include /etc/nginx/mime.types;
upstream app_server {
server unix:/home/ec2-user/run/gunicorn.sock fail_timeout=10s;
</pre>
Brian O'Neillhttp://www.blogger.com/profile/04198208062865899011noreply@blogger.com0tag:blogger.com,1999:blog-6872023396647681597.post-42828503949050953142016-11-12T04:55:00.002-08:002016-11-12T04:55:56.806-08:00Productivity: Audacity and calculated Hubris in EstimationI started work at Unisys as a summer intern when I was 18. By the time I was 19, I had moved over to the Natural Language Processing group and was helping to rewrite their core semantic interpretation engine. I loved it. We had a big client to whom we had committed a working prototype. Timelines were tight and even though no one asked me to, I came in one weekend to work to ensure we hit that deadline. <br />
<br />
The following week I was pulled aside by a Unisys "lifer", who told me to slow down. He was afraid that management would expect that pace out of the rest of the team. It was the first time I realized that software estimation, commitments and execution is a game of social dynamics. <br />
<br />
In the years since, I feel like I've seen every type of player: sand-baggers, trivializers, hot-shot hackers, nose-grinders, etc. And in the commitments/estimation game, I've come to value all of those perspectives. But probably most importantly, I've learned to estimate in context: both social and business.<br />
<br />
Estimating in context means assessing the importance of estimate confidence vs. productivity because they are in fact at odds with one another. Due to the nature of engineering, projects often expand to consume the time they are allotted. You can always do one more test, one more optimization/refactor, etc. Likewise, if you aim for a mark you are confident you can hit, you will likely not achieve as much as if you fell short of a bar that you set much higher.<br />
<br />
To a great extent, audacity drives productivity. But it is important to recognize that companies need varying levels of audacity at different stages of their evolution. Sometimes companies need more certainty around their estimates. Other times, a company may sacrifice certainty for productivity. <br />
<br />
My natural tendency is for the latter. At my first "real job" out of college, employees received the book <i>Built to Last</i>. The concept of a <a href="https://en.wikipedia.org/wiki/Big_Hairy_Audacious_Goal">Big Hairy Audacious Goal (BHAG)</a> resonated with me, and I believe companies can effectively employ BHAGs in both strategic and tactical contexts.<br />
<br />
<i>"A true BHAG is clear and compelling, serves as unifying focal point of effort, and acts as a clear catalyst for team spirit. It has a clear finish line, so the organization can know when it has achieved the goal; people like to shoot for finish lines."</i><br />
<i><br /></i>
— Collins and Porras, Built to Last: Successful Habits of Visionary Companies"<br />
<br />
I'm optimistic by nature. While I try to eliminate that optimism in estimates, I refuse to omit audacity. And while with that approach I cannot guarantee I'll hit every date, I can guarantee that the team will go further, faster than without such calculated hubris.<br />
<br />
fwiw.Brian O'Neillhttp://www.blogger.com/profile/04198208062865899011noreply@blogger.com0tag:blogger.com,1999:blog-6872023396647681597.post-53486382604972201342016-10-27T10:08:00.000-07:002016-10-27T10:11:12.014-07:00A Platform Approach to Personalization (exciting times at Monetate)<br />
Exciting times at Monetate:<br />
<br />
<a href="http://snip.ly/8mlva#http://www.monetate.com/blog/ecommerce-personalization-platform-pros-and-cons">If you aren’t taking a platform approach to personalization, you’re just testing segments.</a>
Brian O'Neillhttp://www.blogger.com/profile/04198208062865899011noreply@blogger.com0tag:blogger.com,1999:blog-6872023396647681597.post-81178246302601951332016-05-13T07:22:00.002-07:002016-05-13T07:22:31.726-07:00Monetate: Philly's Best Technology Work Culture<br />
<br />
Since I moved back to Philly from Silicon Valley, I've had the following on my resume:<br />
<div class="p2" style="background-color: #eeee; padding: 5pt;">
<span class="s1" style="font-family: Courier New, Courier, monospace;"><b>Goal:</b> To bring the creative, inventive spirit of Silicon Valley to the Philadelphia region.</span></div>
<div class="p2">
<span class="s1">Monetate is doing exactly this. It is the first company for which I've worked, where the values of the company are more than just wall art. Everyone here is genuinely "in it to win it" and "loves this stuff". It isn't the free lunch, nerf guns and scooters that makes it a Silicon Valley-esque company (although we have that too). It's the passion and innovation that's demonstrated on a daily basis.</span><br />
<span class="s1"><br /></span>
Monetate was recently named as a finalist for the Philly Tech In Motion (Timmy) Awards. <br />
You can have <a href="http://skookle.com/monetate_culture.pdf">a look at our submission</a>, but the bottom line:<br />
<br />
<i>Monetate embraces: diversity, community outreach, </i><i>open source, </i><i>and cutting edge technology.</i><br />
<br />
That makes for a fantastic work environment and a vibrant culture. And staying true to our culture and values, we want to win that Timmy Award!<br />
<br />
If you are so inclined, please lend us your vote:<br />
http://blogs.techinmotionevents.com/philly-vote/<br />
<br />
<br />
<br /></div>
Brian O'Neillhttp://www.blogger.com/profile/04198208062865899011noreply@blogger.com0tag:blogger.com,1999:blog-6872023396647681597.post-48541536167700071572016-03-14T14:11:00.003-07:002016-03-25T06:07:53.926-07:00PySpark on Amazon EMR w/ Kinesis<br />
Spark is fantastic. And its streaming framework has proven to be a perfect fit, functioning as the real-time leg of a lambda architecture. <br />
<br />
In our initial use of Spark, we decided to go with Java, since Spark runs native on the JVM. (and sorry Scala, still not a fan) That worked well, but we knew we would have to support other languages. These days python is the lingua franca of the data science community. And since we want to deploy ever more sophisticated analytics to our real-time pipe, we want to include support for python. <br />
<br />
Enter PySpark. This blog should get you up and running with PySpark on EMR, connected to Kinesis.<br />
<h3>
Get an EMR Cluster up and running!</h3>
First, you need an EMR cluster. At Monetate, we treat infrastructure as code and use CloudFormation extensively (via <a href="https://github.com/cloudtools/troposphere">troposphere</a>) to accomplish that. However, to my dismay I found that EMR support in troposphere is still <a href="https://github.com/cloudtools/troposphere/issues/412">WIP</a>.<br />
<br />
No big deal I thought. I can work from pure JSON against CloudFormation. I looked up the <a href="http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-emr-cluster.html">docs for EMR resources in CloudFormation</a> and was able to WIP up (see what I did there?) the following JSON:<br />
<br />
<pre name="code">{
"Resources": {
"MyEmrCluster":{
"Type" : "AWS::EMR::Cluster",
"Properties" : {
"ReleaseLabel" : "emr-4.3.0",
"Instances" : {
"CoreInstanceGroup" : {
"InstanceType" : "m4.large",
"InstanceCount" : "2"
},
"MasterInstanceGroup" : {
"InstanceType" : "m4.large",
"InstanceCount" : "1"
}
},
"JobFlowRole" : "DevPolicy1-Role-1FOOFV2IV",
"Name" : "MyEmr1",
"ServiceRole" : "DevPolicy1-Role-1FOOFV2IV",
"VisibleToAllUsers" : true
}
}
}
}
</pre>
<br />
<i>IMPORTANT:</i><br />
<i>Make sure you specify 'ReleaseLabel' even though the CloudFormation (CF) documentation says that it is optional. If you don't specify it, CF assumes an outdated amiVersion (1.0.0), and will fail to start.</i><br />
<br />
So, I slung the above JSON at CF and it sorta worked, but balked on the security groups. After much digging, I uncovered the following <a href="http://docs.aws.amazon.com/ElasticMapReduce/latest/ManagementGuide/emr-iam-roles-custom.html">*important* note in the EMR documentation</a> regarding custom IAM roles.<br />
<br />
<pre name="code">Important
The IAM role name and the instance profile name must match exactly when you use either the Amazon EMR console or CLI.
...
In some situations, you might need to work with an IAM role whose associated instance profile does not have the same name as the role. This can occur if you use AWS CloudFormation to manage IAM roles for you, because AWS CloudFormation adds a suffix to the role name to create the instance profile name. In this case, you can use the Amazon EMR API or CLI to specify the instance profile name. </pre>
<br />
Ugh. This was exactly our case. We use CF for our roles, and thus the instance profile name and the IAM role name do NOT match exactly, which would have been fine if we were using the CLI or the EMR API. But again, we wanted to use CF. <br />
<br />
So -- in the end -- PUNT on CF for EMR! Back to the good old CLI. The following AWS CLI voodoo did the trick:<br />
<br />
<pre name="code">aws emr create-cluster --release-label emr-4.3.0 --instance-type m3.xlarge --instance-count 3 --no-auto-terminate --service-role DevPolicy1-Role-1FOOFV2IV --applications Name=Spark --name MyEmr1 --ec2-attributes InstanceProfile=DevPolicy1-InstanceProfile-1FOOFV2IV,KeyName=dev-foo --log-uri s3://log-foo-dev/spark/ --enable-debugging</pre>
<br />
Notice that the CLI lets you specify ec2-attributes, including the InstanceProfile, which is different from the role if you use CloudWatch to construct your roles.<br />
<br />
With that voodoo, you should see your EMR cluster in the AWS console. Take note of the cluster id, you will need that in later steps.<br />
<br />
<h3>
Sling some code...</h3>
First, you will want to be able to get onto your EMR master node. In the AWS CLI command above, we specified a KeyName. That is the ssh key you will use to connect to the box. The host name you can find in the AWS Console by clicking on your EMR cluster. Look under, "Master public DNS". If you click "SSH" next to that, it will give you specific instructions. <br />
<br />
<i>Hints: Be sure that the -i param points to your local private key, and make sure that you are connecting as the 'hadoop' user. </i><br />
<i><br /></i>
I was able to connect with the following:<br />
<br />
<pre name="code">ssh -i ~/.ssh/dev-foo.pem hadoop@ec2-54-91-171-171.compute-1.amazonaws.com</pre>
<br />
To test out the configuration of your machine, give the PySpark console a try by typing:<br />
<br />
<pre name="code">[hadoop@ip-10-171-73-185 ~]$ pyspark
Python 2.7.10 (default, Dec 8 2015, 18:25:23)
[GCC 4.8.3 20140911 (Red Hat 4.8.3-9)] on linux2</pre>
<br />
<i>IMPORTANT:</i><br />
<i>Note that EMR is running version 2.7.10 of python!!! The example code from Spark assumes version 3. We'll need to make a couple edits to get that sample code to work out on our EMR instance.</i><br />
<br />
Now that we are connected and have a working PySpark shell, we can use the same credentials to transfer code to our cluster. Specifically, let's transfer the <a href="https://raw.githubusercontent.com/apache/spark/branch-1.6/extras/kinesis-asl/src/main/python/examples/streaming/kinesis_wordcount_asl.py">Spark Kinesis example code</a> to our EMR cluster. <br />
<br />
First, download that sample code to your local machine. Next, let's edit the code to make it 2.7 friendly. Specifically, add the following line to the top of the file:<br />
<br />
<pre name="code">from __future__ import print_function</pre>
<br />
This will import the print function from python 3, and make it available in python 2. With that edit in place, transfer the file to the master node:<br />
<br />
<pre name="code">scp -i ~/.ssh/dev-foo.pem kinesis_wordcount_asl.py hadoop@ec2-54-91-171-171.compute-1.amazonaws.com:/tmp/</pre>
<br />
That will place the python code up on the master node in the temp directory.<br />
<h3>
Launch a job...</h3>
<div>
Finally, we need to launch the job. Since we are going to use Kinesis as our data source, let's first create a stream for our job:</div>
<div>
<br /></div>
<pre name="code">aws kinesis create-stream --stream-name foo --shard-count 1</pre>
<div>
<br /></div>
<div>
In a few minutes, you should see a stream provisioned in the AWS console, and you are ready to connect a job to it.<br />
<br />
To connect our job to that stream, we must launch the job. In EMR terminology, we launch a job by by "adding a step". Let's stick with the CLI for that. Again, you need a special incantation:</div>
<div>
<br /></div>
<pre name="code">aws emr add-steps --cluster-id j-8S4BHCR3UV7G --steps Name=Spark,Jar=s3://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar,Args=[/usr/bin/spark-submit,--packages,org.apache.spark:spark-streaming-kinesis-asl-assembly_2.10:1.6.0,--deploy-mode,client,/tmp/kinesis_wordcount_asl.py,myapp,foo,https://kinesis.us-east-1.amazonaws.com,us-east-1],ActionOnFailure=CONTINUE</pre>
<div>
<br />
The cluster-id should match the id you captured when you created the cluster. You can also find it in the AWS console. The params to the python are passed along in the Args parameter. Specifically, we supply four params: "myapp", "foo", "https://kinesis...", and "us-east-1". These are the kinesis app name, the stream name, the kinesis end point, and the region respectively.<br />
<br />
In a few minutes, you should now see a step running in the AWS console. To examine the stdout and stderr for that task, ssh to the master machine again and look in /var/log/hadoop/steps. You should see a directory for the new step, and within that directory you will find two files: stderr and stdout. Happily tail those logs with something like the following:<br />
<br />
<pre name="code">tail -f /var/log/hadoop/steps/{step-id}/std*</pre>
<br /></div>
In the stdout stream, you should see something like this:<br />
<br />
<pre name="code">-------------------------------------------
Time: 2016-03-14 20:49:12
-------------------------------------------
...
-------------------------------------------
Time: 2016-03-14 20:49:13
-------------------------------------------
</pre>
<br />
<h3 style="-webkit-text-stroke-width: 0px; color: black; font-family: Times; font-style: normal; font-variant: normal; letter-spacing: normal; line-height: normal; orphans: auto; text-align: start; text-indent: 0px; text-transform: none; white-space: normal; widows: 1; word-spacing: 0px;">
Spew events...</h3>
<div>
With our job running out on EMR, the only thing left is to spew events at the stream. You can use our <a href="https://github.com/monetate/koupler">fancy little utility called Koupler</a> to put data on your stream with the following command line:</div>
<div>
<br /></div>
<pre name="code">~/git/koupler/build/distributions/koupler-0.2.5-SNAPSHOT> printf "hello\nworld\n" | ./koupler.sh -pipe -streamName foo</pre>
<div>
<br />
Look back over to your tail and you should see:<br />
<br /></div>
<div>
<pre name="code">-------------------------------------------
Time: 2016-03-14 20:56:07
-------------------------------------------
(u'world', 1)
(u'hello', 1)
</pre>
<br />
BOOM!<br />
<br />
And there you have it! There were a couple bumps in the road, but not too bad. You are now able to run python, at scale, in the cloud, against real-time events. Enjoy.</div>
Brian O'Neillhttp://www.blogger.com/profile/04198208062865899011noreply@blogger.com0tag:blogger.com,1999:blog-6872023396647681597.post-32195223614882723782015-11-24T05:40:00.003-08:002015-11-24T05:40:46.828-08:00Monetate open-sources Koupler: a versatile interface to Kinesis!<br />
I'm happy to announce that Monetate has open-sourced Koupler, a versatile interface for Kinesis. We took the <a href="https://blogs.aws.amazon.com/bigdata/post/Tx3ET30EGDKUUI2/Implementing-Efficient-and-Reliable-Producers-with-the-Amazon-Kinesis-Producer-L">best practices outlined by Amazon</a> and codified them.<br />
<br />
Hopefully, this will be the first of many contributions back to the community.<br />
<br />
For the full story, check out the <a href="http://engineering.monetate.com/2015/11/23/monetate-announces-koupler-the-versatile-interface-to-kinesis/">formal announcement</a>.<br />
<br />
<br />Brian O'Neillhttp://www.blogger.com/profile/04198208062865899011noreply@blogger.com0tag:blogger.com,1999:blog-6872023396647681597.post-13658536817552313912015-11-18T18:49:00.002-08:002015-11-18T18:49:20.021-08:00Using Squid as an HTTP Proxy via SSH (to fetch remotely from Amazon yum repos)<br />
We've been playing around with <a href="https://www.vagrantup.com/">vagrant</a> for local development. When combined with Ansible, the pair allows you to recreate complex systems locally with high fidelity to your deployment environment. Through magic voodoo (kudos to @jjpersch and @kmolendyke), we managed to get an Amazon AMI crammed into a virtual box. Unfortunately, Amazon's yum repos are only available from the EC2 network. Thus, when we fired up our vagrant machine locally, it couldn't update!<br />
<br />
No worries. We could use a single EC2 instance to proxy the http requests from yum. Here's how. <br />
<br />
First, fire up an ec2 instance and install squid on that instance.<br />
<br />
<pre><span style="color: blue;">yum install squid</span>
</pre>
<br />
Start squid with:<br />
<br />
<pre><span style="color: blue;">/etc/init.d/squid start</span>
</pre>
<br />
If it has trouble starting, have a look at:<br />
<br />
<pre><span style="color: blue;">cat /var/log/squid/squid.out</span>
</pre>
<br />
If you see something like this:<br />
<br />
<pre><span style="color: blue;">FATAL: Could not determine fully qualified hostname. Please set 'visible_hostname'
</span></pre>
<br />
You may need to set visible_name in the /etc/squid/squid.conf file. Add it to the end of that file:<br />
<br />
<pre><span style="color: blue;">visible_hostname myec2</span>
</pre>
<br />
Then, squid should fire up. By default squid runs on port 3128. <br />
(But you can check with netstat -plant | grep squid)<br />
<br />
Now, you are ready for HTTP proxying. In this scenario, lets assume your ec2 instance is myec2.foo.com, and you want to proxy all HTTP requests from your laptop through that machine. <br />
<br />
On your laptop, you would run:<br />
<br />
<span style="color: blue;">ssh -L 4444:localhost:3128 myec2.foo.com</span><br />
<br />
This will ssh into your ec2 instance, simultaneously setting up port forwarding from your laptop. Every packet sent to port 4444 on your laptop will be forwarded over the secure tunnel to port 3128 on your ec2 instance. And since squid is running on that port, squid will in turn forward any HTTP requests along, but they will now look like they are coming from your ec2 instance!<br />
<br />
Shazam. You can test out the setup using wget. For example, previously we couldn't fetch a package list from amazon's yum repos. The following wget would fail:<br />
<br />
<pre><span style="color: blue;">wget http://packages.ap-southeast-2.amazonaws.com/2014.09/updates/d1c36cf420e2/x86_64/repodata/repomd.xml</span>
</pre>
<br />
wget pays attention to the http_proxy environment variable. When set, wget will forward all requests to that proxy. Thus, to test out our proxy, set http_proxy with the following:<br />
<br />
<pre><span style="color: blue;">export http_proxy=http://localhost:4444</span></pre>
<br />
Once set, you should be able to re-run the wget and it should succeed! Finally, if you have a similar situation, and you want to proxy all yum requests. Go into /etc/yum.conf and set the following:<br />
<br />
<pre><span style="color: blue;">proxy=http://localhost:4444</span>
</pre>
<br />
With that setting, you should be able to yum update all day long from your vagrant machine. =)<br />
<br />
<br />
<br />Brian O'Neillhttp://www.blogger.com/profile/04198208062865899011noreply@blogger.com0tag:blogger.com,1999:blog-6872023396647681597.post-63040624323388767112015-10-28T07:27:00.004-07:002015-10-28T07:27:29.619-07:00Diagnosing memory leaks in Java<br />
Every time I suspect a memory leak, I have to go dig up these commands. <br />
So, here they are for posterity's sake:<br />
<br />
First, I use the following command to monitor the process over time:<br />
<br />
<pre><span style="color: blue;">while ( sleep 1 ) ; do ps -p $PID -o %cpu,%mem,rss ; done</span>
</pre>
<br />
(and/or New Relic if you have it ;)
<br />
<br />
If you see memory creeping up, it could be your VM settings. If you haven't explicitly specified memory settings to the JVM, it will default them. To get the defaults, use the following command:
<br />
<pre></pre>
<pre><span style="color: blue;">java -XX:+PrintFlagsFinal -version | grep -i HeapSize</span>
</pre>
<br />
If those are out of line with what you want, then you'll need to specify the memory settings for the<br />
JVM. You can set minimum and maximum heap sizes with the following:
<br />
<pre></pre>
<pre><span style="color: blue;">java -Xms128m -Xmx256m</span>
</pre>
<br />
Once you have sane memory settings, and you can monitor the process, you might legitimately still see memory increasing over time. To get insight into that, you can start by taking a look at the histogram of object instances using the following:
<br />
<pre></pre>
<pre><span style="color: blue;">jmap -histo $PID</span>
</pre>
<br />
If that isn't enough information then you can take heap dumps with the following command:
<br />
<pre></pre>
<pre><span style="color: blue;">jmap -dump:format=b,file=/tmp/dump1.hprof $PID</span>
</pre>
<br />
<pre></pre>
Typically, I'd take two heap dumps and then compare them using jhat using the following command:
<br />
<pre></pre>
<pre><span style="color: blue;">jhat -baseline /tmp/dump1.hprof /tmp/dump2.hprof</span>
</pre>
<br />
That fires up an HTTP server that you can use to explore the delta between those two heap dumps. By default, the HTTP server will start on port 7000, which you can just hit in a browser. <br />
<br />
If you are behind a firewall, but have ssh access, you can tunnel over to the port using:<br />
<br />
<pre><span style="color: blue;">ssh -L 7000:localhost:7000 $HOST</span>
</pre>
<br />
If you scroll down to the bottom of the first page, you will see two useful links: "<br />
<ul>
<li><a href="http://localhost:7000/histo/">Show heap histogram</a></li>
<li><a href="http://localhost:7000/showInstanceCounts/">Show instance counts for all classes (excluding platform)</a></li>
</ul>
That will show you all "new" instances between the different heaps, which should give you some idea of where your leak is coming from. Screenshot below:<br />
<br />
<br />
<div class="separator" style="clear: both; text-align: center;">
<a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhlhlt9nyXYNfrorNDwbno5BbEE3sQqvz0mPfm844pWj_Xb76_mtkWx82dLPFN0nlEOtbKiVA6gQcahLHbV2UbNUJZeggcC3_4LZbc_kL4zHi_CZzvBgVJ1zWjcquXbxbeAiXlf6PbDUq0/s1600/Screen+Shot+2015-10-28+at+10.21.51+AM.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" height="69" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhlhlt9nyXYNfrorNDwbno5BbEE3sQqvz0mPfm844pWj_Xb76_mtkWx82dLPFN0nlEOtbKiVA6gQcahLHbV2UbNUJZeggcC3_4LZbc_kL4zHi_CZzvBgVJ1zWjcquXbxbeAiXlf6PbDUq0/s320/Screen+Shot+2015-10-28+at+10.21.51+AM.png" width="320" /></a></div>
<br />
<br />
And there you have it, a quick synopsis of the magic command-lines you need when diagnosing memory leaks. (which I always forget)Brian O'Neillhttp://www.blogger.com/profile/04198208062865899011noreply@blogger.com0tag:blogger.com,1999:blog-6872023396647681597.post-60984422875149714042015-10-27T21:12:00.002-07:002015-10-27T21:12:30.599-07:00Using Gradle with AWS and S3 (w/ DefaultAWSCredentialsProviderChain and InstanceProfileCredentialsProvider (FTW!))<br />
We recently switched over to gradle as our build mechanism. As part of that switchover, we wanted to be able to build w/o external dependencies (e.g. Maven Central). We tossed around the idea of Artifactory, but in the end we decided to keep it simple and push our jar files into S3.<br />
<br />
This turned out to be remarkably easy. First, we added a task to copy down our dependencies:<br />
<br />
<pre><span style="color: blue;">task copyRuntimeLibs(type: Copy) {
from configurations.runtime
into "$buildDir/libs"
}</span>
</pre>
<br />
With that, we simply sync'd our dependencies up to S3 using aws cli:<br />
<br />
<pre><span style="color: blue;">aws s3 sync build/libs/ s3://yourbucket/gradle/</span>
</pre>
<br />
That deposits the jar files into your S3 bucket. For example:<br />
<br />
<pre><span style="color: blue;">amazon-kinesis-client-1.5.1.jar</span>
</pre>
<br />
For the next trick, you need to get gradle to pull those artifacts from S3. You can do this by declaring an *ivy* repository in your build.gradle. e.g.<br />
<pre>
</pre>
<pre><span style="color: blue;">repositories {
ivy {
url "s3://yourbucket/gradle/"
credentials(AwsCredentials) {
accessKey "YOUR_AWSAccessKeyId"
secretKey "YOUR_AWSSecretKey"
}
layout "pattern", {
artifact "[artifact]-[revision].[ext]"
}
}
}</span>
</pre>
<br />
That's it. <span style="color: red;">HOWEVER</span>....<br />
<br />
If you are as paranoid about security as we are, you'll want to use EC2 instance credentials for system users like Jenkins, instead of having keys and secrets floating around that are known to many. Fortunately, the AWS Java SDK can do this out-of-the-box. It is part of the <a href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html">DefaultAWSCredentialsProviderChain</a>. With the instance profile credentials, you don't need to specify a key and secret, instead it retrieves the security credentials directly from the host using the <a href="http://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-instance-metadata.html">Amazon EC2 Instance Metadata Service</a>.<br />
<br />
<span style="color: red;">UNFORTUNATELY</span>, Gradle is hard-coded to use the <a href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/BasicAWSCredentials.html">BasicAwsCredentials</a>! <br />
See this <a href="https://discuss.gradle.org/t/s3-repository-credentials-should-use-the-default-credential-provider-chain-if-no-credentials-are-provided/11901">discussion thread</a>.<br />
<br />
The gradle crew is working on a larger design to address security, and because of that they are not going to merge this patch. So, we took matters into our own hands and forked the 2.8 branch of gradle to make the change:<br />
<a href="https://github.com/monetate/gradle/tree/2.8.monetate">https://github.com/monetate/gradle/tree/2.8.monetate</a><br />
<br />
If you want to use instance profile credentials feel free to pull that code and follow the instructions in <a href="https://github.com/monetate/gradle/commit/d221c6e5078636fd56636858188e2d10ba49ed85">this commit</a>.<br />
<br />
With that patch, you can simply omit the credentials from your build.gradle. (woohoo!)<br />
<br />
<pre><span style="color: blue;">repositories {
ivy {
url "s3://yourbucket/gradle/"
layout "pattern", {
artifact "[artifact]-[revision].[ext]"
}
}
}</span></pre>
<br />
It will pickup the credentials from your instance metadata, and you'll be on your way to Cape May (S3).<br />
<br />
FWIW, hopefully this helps people.Brian O'Neillhttp://www.blogger.com/profile/04198208062865899011noreply@blogger.com0tag:blogger.com,1999:blog-6872023396647681597.post-50701402527424664792015-10-22T18:07:00.005-07:002015-10-22T18:07:57.437-07:00Reverse Engineering Kinesis Aggregation and Sharding (and the magic "a" partitionKey)<br />
I love open source. I love being able to take things apart to understand them. And for that reason, I'd like to thank Amazon for open sourcing <a href="https://github.com/awslabs/amazon-kinesis-client">Kinesis Consumer Library</a> and <a href="https://github.com/awslabs/amazon-kinesis-producer">Kinesis Producer Library</a>. It allowed us to get to the bottom of a couple of strange observations and reverse engineer the design for Kinesis Record aggregation in the KPL. (at least we think so?)<br />
<br />
When we started scale testing Kinesis, we saw a couple strange observations:<br />
<br />
(1) The velocity of PUTs (records per second) seemed to scale/increase with the number of shards.<br />
(2) When we attached to the stream using the AWS CLI, we saw records with partition key: "a"<br />
<br />
First, let's discuss the PUT record velocity. One component of Amazon Kinesis's pricing includes the number of PUT records per month. Specifically:<br />
<br />
<table style="border-collapse: collapse; border-spacing: 0px; box-sizing: border-box; color: #333333; font-family: HelveticaNeue, Helvetica, Helvetica, Arial, sans-serif; font-size: 14px; line-height: 22.4px; width: 535px;"><tbody class="body" style="border-bottom-color: rgb(221, 221, 221); border-bottom-style: solid; border-bottom-width: 1px; box-sizing: border-box;">
<tr style="box-sizing: border-box;"><td class="tier" style="background-color: #f7f7f7; box-sizing: border-box; padding: 8px;">PUT Payload Units, per 1,000,000 units</td><td class="price" style="background-color: #f7f7f7; box-sizing: border-box; padding: 8px; text-align: center;">$0.014</td></tr>
</tbody></table>
<br />
So, mo' PUTs is mo' money. Fortunately, the Kinesis Producer Library (KPL) lets you aggregate multiple records into a single PUT. If you are optimizing for cost, you want to maximize that records per PUT ratio. (and obviously at a 1:1 ratio you are paying the most) But, the amount of aggregation you can perform depends on record throughput and your tolerance for latency.<br />
<br />
KPL ingests records, accumulating them in a buffer. The buffering is configured via the <i>kpl.properties</i> file, and the key parameter in the system is the <i>RecordMaxBufferedTime, </i>which specifies the maximum amount of time that the KPL will wait before it sends a PUT. After this maximum amount of time passes, KPL PUTs the buffered records, regardless of how many it has accumulated/aggregated. If your system has low throughput, KPL sends partially filled PUTs. (which is more expensive)<br />
<br />
For example,<br />
If your throughput is 1 record / second, and you have RecordMaxBufferedTime set to 1 second. You will have a compression ratio of 1:1 (not good). Now, if you increase that duration from 1 second to 4 seconds, you will immediately quadruple your compression ratio (and cut costs by 75%!).<br />
<br />
When we started scale testing, we saw an order of magnitude more PUT records than we saw during our initial testing (on streams with few shards). We noticed that the velocity of PUTs increased linearly with the number of shards. For example, with 2X the number of shards, we saw 2X the number of PUTs. This lead us to hypothesize that aggregation was somehow related to shards. <br />
<br />
Our straw man hypothesis looked like this:<br />
<br />
<div class="separator" style="clear: both; text-align: center;">
<a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjWZMlI0Nf-rQ-SuSiBFJrb_WtNLDgHJCIFAyY-xh4J-38CPqn2BnSOzKrdc7OU43G5oElMYiJriJ4Ap5SNoklJrGS8mS3pAP7th-uqIPkf4XPIEag899AgIvszU95whRhYTgJzjnNrnCA/s1600/kinesis_sharding.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" height="393" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjWZMlI0Nf-rQ-SuSiBFJrb_WtNLDgHJCIFAyY-xh4J-38CPqn2BnSOzKrdc7OU43G5oElMYiJriJ4Ap5SNoklJrGS8mS3pAP7th-uqIPkf4XPIEag899AgIvszU95whRhYTgJzjnNrnCA/s640/kinesis_sharding.png" width="640" /></a></div>
<br />
<br />
If our hypothesis was right, then locally the KPL was keeping an aggregate KinesisRecord (aka PUT) for each shard. Then, when KPL hit the <i>RecordMaxBufferedTime</i> time it would flush all of those records, meaning that the minimum velocity of PUTs would be equal to the RecordMaxBufferedTime multiplied by the number of shards. <br />
<br />
e.g. 100ms RecordBufferedTime = 10 PUTs / second / shard<br />
which would be: 500 PUTs / second for 50 shards.<br />
<br />
This is exactly what we saw. Even with a trickling of traffic, we saw hundreds of PUT requests.<br />
<br />
When you think about it, this makes sense. Since a partition key determines which shard receives the record, a single PUT can't contain records destined for multiple shards, unless Amazon somehow fanned out the records on the backend.<br />
<br />
...<br />
<br />
Now, let's look at our second observation, the illusive "a" partition key. When checking the accuracy of our stream data, we happened to attach using AWS CLI. When we attached to the stream and read records directly using `aws kinesis get-records...`, we saw JSON data in the form:<br />
<br />
<span style="font-family: monospace;">{</span><br />
<ul class="obj collapsible" style="font-family: monospace; list-style-type: none; margin: 0px 0px 0px 2em; padding: 0px;">
<li style="position: relative;"><div class="hoverable" style="border-radius: 2px; display: inline-block; padding: 1px 2px; transition: background-color 0.2s ease-out 0s;">
<span class="property" style="font-weight: bold;">PartitionKey</span>: <span class="type-string" style="color: green;">"a"</span>,</div>
</li>
<li style="position: relative;"><div class="hoverable" style="border-radius: 2px; display: inline-block; padding: 1px 2px; transition: background-color 0.2s ease-out 0s;">
<span class="property" style="font-weight: bold;">Data</span>:<span class="type-string" style="color: green;">"84mawgo...rnz1f7h"</span>,</div>
</li>
<li style="position: relative;"><div class="hoverable" style="border-radius: 2px; display: inline-block; padding: 1px 2px; transition: background-color 0.2s ease-out 0s;">
<span class="property" style="font-weight: bold;">SequenceNumber</span>: <span class="type-string" style="color: green;">"49555608841303555555555578136812826047777764134359384066"</span></div>
</li>
</ul>
<span style="font-family: monospace;">}</span><br />
<br />
Some of the JSON elements had the proper partitionKey in them, while others looked like the above. We had no idea where the mysterious "a" partitionKey was coming from. Immediately, we thought we had a bug somewhere. I proceeded to go into debug mode, and Base64 decoded the Data element of the JSON. What I saw was... (and paraphrasing a bit...)<br />
<br />
<span style="font-family: monospace;">pk1 pk2 pk3 pk4 ?? data1 ?? data2 ?? data3 ?? data4</span><br />
<div>
<br /></div>
Very interesting... the beginning of the Data element had all of the partitionKeys from my records. They were then followed by the data for each of those records. Okay, so the data looked good. And it looks like KPL just packs all of the partition keys up front, followed by the data. But that did'n't explain the "a" partition key. And if the partition key was always "a", how was the data getting distributed across shards?!<br />
<br />
If you recall, the Kinesis Producer Library is a native application, coated in a Java wrapper. If that "a" was coming from somewhere, it had to be deep in the native application. I put on my waders, strapped on my spelunking headlamp and dusted off my C++. After some digging, I eventually found the code(droid) I was looking for:<br />
<br />
<pre>117 std::string KinesisRecord::partition_key() const {
118 if (items_.empty()) {
119 throw std::runtime_error(
120 "Cannot compute partition_key for empty container");
121 }
122
123 if (items_.size() == 1) {
124 return items_.front()->partition_key();
125 }
126
127 // We will always set an explicit hash key if we created an aggregated record.
128 // We therefore have no need to set a partition key since the records within
129 // the container have their own parition keys anyway. We will therefore use a
130 // single byte to save space.
131 return "a";
132 }
</pre>
<br />
Aha Mr. Mustard, that explains everything! Kinesis typically hashes the partition key to determine the shard. But if you have a a look at the <a href="http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html">Kinesis API, you'll notice that you can actually specify ExplicitHashKey</a>. When an ExplicitHashKey is provided, the partition key is ignored!<br />
<br />
If we closely examine the code above, notice that whenever the aggregated KinesisRecord had a single record, it used that record's partition key. These were the "good" records we were seeing through AWS CLI. However, whenever KPL had more than one record, it set the partition key to the magic "a", and set the ExplicitHashKey for the Kinesis Record (which contained multiple partition keys!). It all became clear, and a rainbow unicorn galloped in from the street.<br />
<br />
There you have it.<br />
<br />
The case of the crazy "a", and the accelerated PUTs. Solved.<br />
Until next time. Long live open source software.<br />
<br />Brian O'Neillhttp://www.blogger.com/profile/04198208062865899011noreply@blogger.com0tag:blogger.com,1999:blog-6872023396647681597.post-49057742855775109662015-10-20T05:59:00.000-07:002015-10-21T05:39:52.957-07:00BaDaS Arsenal : from grep to graph in under 5 minutes w/ Pyplot<h4 id="badas-big-and-data-and-science">
BaDaS = Big and Data and Science</h4>
These days, BaDaS is all the rage. And to be successful at it, you need an arsenal of weapons to gain insights into vast amounts of data quickly. And more and more, I believe python is the go-to swiss-army knife used in the field. And if you’ve read <a href="http://brianoneill.blogspot.com/2015/10/getting-started-w-python-kinesis.html">my previous blog</a>, I believe iPython Notebook (a.k.a. Jupyter) is a great way to collaborate with other people when doing data analysis.<br />
<br />
So…<br />
<br />
Recently, when I was analyzing the throughput/scalability of Kinesis and I wanted to graph the results, instead of turning to Excel to churn out the graph, I fired up the old iPython Notebook. (admittedly, I was also inspired by <a href="http://austinrochford.com/posts/2015-10-05-bayes-survival.html">@AustinRochford’s excellent blog on Bayesian inference</a>)<br />
<br />
I was blown away. I was able to go from a log file to a visible graph in just minutes. Here is how I did it…<br />
<br />
After a couple minutes of googling, I realized that the secret sauce was in <a href="http://matplotlib.org/index.html">matplotlib</a>. More specifically, it was <a href="http://matplotlib.org/users/pyplot_tutorial.html">Pyplot</a> FTW! <br />
<br />
To get to work, I first grep’d the log files for each of my runs, and created a log file for each. I happened to be playing around with RecordBufferSizes in the Kinesis Producer Library (KPL), and ran an experiment at 10ms, 25ms, 50ms and 100ms buffer times.<br />
<br />
I then fired up the trusty Notebook and started hacking away.<br />
<br />
First, I imported pyplot:<br />
<br />
<pre class="prettyprint"><code class="language-python hljs " style="background-color: #eeeeee;"><span class="hljs-keyword">from</span> matplotlib <span class="hljs-keyword">import</span> pyplot <span class="hljs-keyword">as</span> plt</code></pre>
<br />
Then, I imported csv and dateutil to munge the data…<br />
<pre class="prettyprint"><code class="language-python hljs " style="background-color: #eeeeee;"><span class="hljs-keyword">
</span></code></pre>
<pre class="prettyprint"><code class="language-python hljs " style="background-color: #eeeeee;"><span class="hljs-keyword">import</span> csv</code></pre>
<pre class="prettyprint"><code class="language-python hljs " style="background-color: #eeeeee;"><span class="hljs-keyword">import</span> dateutil</code></pre>
<br />
Finally, all I had to do was write some python to parse each data file and create a representative dataset. Pyplot took care of the rest.<br />
<br />
In the below code, I loop through each CSV file and gather up the dataset for that file. The dataset is a dictionary called <em>data</em>. The dictionary contains two lists, one for timestamps, which is what will become the y axis, and one for the throughput (a.k.a. recordsPerSecond), which is what will become the x axis.<br />
<br />
The first element of each row in the CSV contains the epoch time. But because I wanted a time series graph , I didn’t want to use the absolute timestamp. Instead, inline I just calculate the delta from the first timestamp I see. (neato!) <br />
<br />
In the end, I stuff each of the data dictionaries, into an overarching dictionary that maps filename to the data dictionary representing the data for that file.<br />
<br />
The final few lines tell pyplot to graph each of the datasets. There is some voodoo in there to set the color (hint: ‘r’ = red, ‘y’ = yellow, etc.)<br />
<br />
In the end, you hit execute on the cell, and SHAZAM – it’s graphalicious.<br />
<pre class="prettyprint"><code class="language-python hljs ">
</code></pre>
<pre class="prettyprint"><code class="language-python hljs " style="background-color: #eeeeee;">%matplotlib inline
files = [<span class="hljs-string">'10ms.csv'</span>, <span class="hljs-string">'25ms.csv'</span>, <span class="hljs-string">'50ms.csv'</span>, <span class="hljs-string">'100ms.csv'</span>]
datasets = {}
<span class="hljs-keyword">for</span> filename <span class="hljs-keyword">in</span> files:
data = {
<span class="hljs-string">"timestamp"</span>: [],
<span class="hljs-string">"recordsPerSecond"</span>: []
}
<span class="hljs-keyword">with</span> open(filename, <span class="hljs-string">'rb'</span>) <span class="hljs-keyword">as</span> csvfile:
reader = csv.reader(csvfile, delimiter=<span class="hljs-string">','</span>)
starttime = <span class="hljs-keyword">None</span>
<span class="hljs-keyword">for</span> row <span class="hljs-keyword">in</span> reader:
timestamp = dateutil.parser.parse(row[<span class="hljs-number">0</span>])
<span class="hljs-keyword">if</span> (starttime <span class="hljs-keyword">is</span> <span class="hljs-keyword">None</span>):
starttime = timestamp
delta = (timestamp - starttime).total_seconds()
data[<span class="hljs-string">"timestamp"</span>].append(delta)
data[<span class="hljs-string">"recordsPerSecond"</span>].append(row[<span class="hljs-number">1</span>])
datasets[filename] = data
plt.plot(
datasets[<span class="hljs-string">"10ms.csv"</span>][<span class="hljs-string">"timestamp"</span>],
datasets[<span class="hljs-string">"10ms.csv"</span>][<span class="hljs-string">"recordsPerSecond"</span>],
<span class="hljs-string">'r'</span>, label=<span class="hljs-string">'10ms'</span>)
plt.plot(datasets[<span class="hljs-string">"25ms.csv"</span>][<span class="hljs-string">"timestamp"</span>],
datasets[<span class="hljs-string">"25ms.csv"</span>][<span class="hljs-string">"recordsPerSecond"</span>],
<span class="hljs-string">'b'</span>, label=<span class="hljs-string">"25ms"</span>)
plt.plot(datasets[<span class="hljs-string">"50ms.csv"</span>][<span class="hljs-string">"timestamp"</span>],
datasets[<span class="hljs-string">"50ms.csv"</span>][<span class="hljs-string">"recordsPerSecond"</span>],
<span class="hljs-string">'g'</span>, label=<span class="hljs-string">"50ms"</span>)
plt.plot(datasets[<span class="hljs-string">"100ms.csv"</span>][<span class="hljs-string">"timestamp"</span>],
datasets[<span class="hljs-string">"100ms.csv"</span>][<span class="hljs-string">"recordsPerSecond"</span>],
<span class="hljs-string">'y'</span>, label=<span class="hljs-string">"100ms"</span>)
plt.ylabel(<span class="hljs-string">'Records / Second'</span>)
plt.xlabel(<span class="hljs-string">'Seconds'</span>)
plt.legend()
plt.show
</code></pre>
<pre><code><span style="background-color: #eeeeee;"><function matplotlib.pyplot.show></span>
</code></pre>
<pre><code>
</code></pre>
<pre><code>
</code></pre>
<div class="separator" style="clear: both; text-align: center;">
<a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhOyqaSagp1YvGs09YrVIwkf0Yp58r7MxkEkfe0cBM4B_RzT_o0kAyOExtu0e-PpnfFbQXPnqKhU4WAnII9FgdvSb6SXV2THoKoWVn8ywBhvYlgFpo701Am3dsxdzGFiI8vijW4tZA0kNA/s1600/graph.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" height="216" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEhOyqaSagp1YvGs09YrVIwkf0Yp58r7MxkEkfe0cBM4B_RzT_o0kAyOExtu0e-PpnfFbQXPnqKhU4WAnII9FgdvSb6SXV2THoKoWVn8ywBhvYlgFpo701Am3dsxdzGFiI8vijW4tZA0kNA/s320/graph.png" width="320" /></a></div>
<pre><code>
</code></pre>
<br />
And FWIW, this entire blog post was written from within Jupyter!<br />
<br />
If I get around to it, I’ll hopefully post what the actual results were when we played with buffer sizes with Kinesis. (HINT: Be careful – the buffer sizes control how much aggregation occurs, which not only affects throughput, but also affects cost!)Brian O'Neillhttp://www.blogger.com/profile/04198208062865899011noreply@blogger.com0tag:blogger.com,1999:blog-6872023396647681597.post-70703203501740176622015-10-11T03:49:00.001-07:002015-10-11T03:49:15.390-07:00A quick proof that Black is not White (to save all of mankind)<br />
In the Hitchhiker's guide to the galaxy, there is a proof about the non-existence of God. The reasoning roughly mimics that of the <a href="https://en.wikipedia.org/wiki/Ontological_argument">Ontological argument for God</a>. <br />
<br />
The passage has stuck with me:<br />
<i>"I refuse to prove that I exist," says God, "for proof denies faith, and without faith I am nothing."
"But," says Man, "the Babel fish is a dead giveaway, isn't it? It could not have evolved by chance. It proves that You exist, and so therefore, by Your own arguments, You don't. QED". "Oh dear," says God, "I hadn't thought of that," and promptly vanishes in a puff of logic.
"Oh, that was easy," says Man, and for an encore goes on to prove that black is white and gets himself killed on the next zebra crossing.
</i><br />
<br />
I always thought it weird that <i>Man</i> is capitalized in this passage. Does that in fact mean all of Man? If so, it would be tragic if we all died during a zebra crossing simply because we did not have proof that black was not white. Thus, I formally submit this proof for posterity, to avoid a Zebrapoclypse.<br />
<br />
<span style="color: #0b5394;"><i><u>Proof that Black is not White:</u></i></span><br />
<span style="color: #0b5394;"><i><br /></i></span>
<span style="color: #0b5394;"><i>Axiom (1): A chess board comprises 64 black and white squares.</i></span><br />
<span style="color: #0b5394;"><i><br /></i></span>
<span style="color: #0b5394;"><i>Now, let us assume for contradiction that back is white. </i></span><br />
<span style="color: #0b5394;"><i><br /></i></span>
<span style="color: #0b5394;"><i>With black and white indistinguishable from one another, squares on a chess board would be indistinguishable from one another. </i></span><br />
<span style="color: #0b5394;"><i><br /></i></span>
<span style="color: #0b5394;"><i>Thus, the chess board would have only one square.</i></span><br />
<span style="color: #0b5394;"><i><br /></i></span>
<span style="color: #0b5394;"><i>We know from (1) that a chess board has 64 squares, therefore our initial assumption must be incorrect. Black must not be white.</i></span><br />
<span style="color: #0b5394;"><i><br /></i></span>
<span style="color: #0b5394;"><i>Q.E.D.</i></span><br />
<span style="color: #0b5394;"><i><br /></i></span>
<a href="https://en.wikipedia.org/wiki/Proof_by_contradiction">Proof by contradiction</a>, never fails. (or does it always fail? ;)<br />
<br />
And there you have it. All of mankind saved from a zebra crossing.<br />
<br />
(oh, the things I think about while waiting in line to pay for pumpkins. =)<br />
<br />Brian O'Neillhttp://www.blogger.com/profile/04198208062865899011noreply@blogger.com0tag:blogger.com,1999:blog-6872023396647681597.post-77065338136818791642015-10-08T15:39:00.002-07:002015-10-08T15:39:22.648-07:00Integrating Syslog w/ Kinesis : Anticipating use of the FirehoseOn the heals of the <a href="https://aws.amazon.com/blogs/aws/amazon-kinesis-firehose-simple-highly-scalable-data-ingestion/">Kinesis Firehose announcement</a>, more people are going to be looking to integrate Kinesis with logging systems. (to expedite/simplify the ingestion of logs into S3 and Redshift) Here is one take on solving that problem that integrates syslog-ng with Kinesis.<br />
<br />
First, let’s have a look at the syslog-ng configuration. In the syslog-ng configuration, you wire sources to destinations:<br />
<pre><code>
</code></pre>
<pre><code><span style="color: blue;">source s_sys {
udp(ip(<span class="hljs-number">127.0</span><span class="hljs-number">.0</span><span class="hljs-number">.1</span>) port(<span class="hljs-number">514</span>));
};
destination d_test { file(<span class="hljs-string">"/var/log/monetate/test.log"</span>
perm(<span class="hljs-number">0644</span>)
<span class="hljs-keyword">template</span>(<span class="hljs-string">"$MSGONLY\n"</span>)
template_escape(no)
); };
destination d_fact_kinesis { tcp(<span class="hljs-string">"localhost"</span>
port(<span class="hljs-number">4242</span>)
<span class="hljs-keyword">template</span>(<span class="hljs-string">"$MSGONLY\n"</span>)
template_escape(no)
); };
<span class="hljs-built_in">log</span> { source(s_sys); destination(d_test); destination(d_kinesis); };</span>
</code></pre>
<br />
In this example, we have one source of events: UDP, and we have two destinations: one file and one TCP. We wire them together in the last <code><span style="color: blue;">log</span></code> statement. With this configuration, any packets syslog-ng receives via UDP, are written to the TCP port and to file. File is our backup strategy, just in case something happens to Kinesis, we will also persist the events locally. <i>TCP is our path to Kinesis</i>.<br />
<br />
On the TCP side of things, we can easily implement a simple TCP server in Java that receives the messages from syslog-ng and sends them out via KPL. At Monetate, we’ve done exactly that. (and I’ll look to see if we can open source the solution). <b><i>However</i></b>, it is really important to understand the <a href="https://www.balabit.com/sites/default/files/documents/syslog-ng-ose-latest-guides/en/syslog-ng-ose-guide-admin/html/concepts-flow-control.html">syslog-ng flow control</a>, and more specifically how we will handle the back pressure if/when Kinesis starts to back up.<br />
<br />
Let's consider how we back that thang’ up.<br />
<br />
Here is a visual that shows the whole flow:<br />
<br />
<div class="separator" style="clear: both; text-align: center;">
<a href="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjeYdQwPxM8VaAb6Z8G5KePTrTAcLJBsAH6kanrkzcgCsewbBiRigPZSRiDBfjFfeiojZR34COKiZfvEbka_znse0k7vmU3lfMD12qSkmzH2zhFLva19gsP3VaZJMIHN0JUDBZqWbDQCM0/s1600/syslog_kinesis_integration.png" imageanchor="1" style="margin-left: 1em; margin-right: 1em;"><img border="0" height="348" src="https://blogger.googleusercontent.com/img/b/R29vZ2xl/AVvXsEjeYdQwPxM8VaAb6Z8G5KePTrTAcLJBsAH6kanrkzcgCsewbBiRigPZSRiDBfjFfeiojZR34COKiZfvEbka_znse0k7vmU3lfMD12qSkmzH2zhFLva19gsP3VaZJMIHN0JUDBZqWbDQCM0/s640/syslog_kinesis_integration.png" width="640" /></a></div>
<br />
<br />
At each point in the sequence, we need to make sure that we are prepared to handle back-pressure. Specifically, we need to validate the behavior of each component when the next downstream component refuses to accept additional data.<br />
<br />
For example, if our little TCP server (“bridge”) blindly accepted packets from the TCP stream, we would quickly OoM the JVM if/when Kinesis slowed down. Thus, in the “bridge” we need to apply back-pressure upstream, to prevent that from happening. More specifically, we need to stop consuming off of the TCP stream, and hope that syslog-ng reacts appropriately.<br />
<br />
Fortunately, syslog-ng has accounted for this case with their “output buffer”, shown above. The output buffer is used to store messages for delivery to destinations. If the max-size is reached on that buffer, than the destination is “disconnected” as not to crash syslog. You control the max-size of the buffer through the <code>log_fifo_size</code> setting in syslog-ng.<br />
<br />
Different destionation types have different behaviors under this scenario, and we tested out a few of them. First we considered a straight-up pipe between syslog and our bridge process using the “program” destination within syslog-ng. That worked well, but in the event of a backed-up queue, syslog-ng kills the program and respawns a new process. This was less than ideal. We also considered using UDP. That worked as well, and in effect eliminated the back pressure scenario because UDP is “fire and forget”. Under heavy load however, we noticed packet drops, which meant we were losing events.<br />
<br />
In the end, we decided to go with TCP. With TCP we won’t silently lose messages, and syslog-ng won’t continually restart our process. With that decision, we needed to confirm the behavior of syslog-ng. And for that, we needed to monitor syslog-ng. This handy dandy command-line does exactly that:<br />
<pre><code>
</code></pre>
<pre><code><span style="color: blue;">syslog-ng -Fevd >& syslog<span class="hljs-class">.log</span></span>
</code></pre>
<br />
While monitoring syslog, we tested two failure scenarios. The first tested a dead bridge (no TCP listener). In this situation, syslog-ng won’t be able to connect. We needed to make sure syslog-ng behaves well under that case. To test this, we ran 100K messages through the system over 40 seconds (>2K/sec). When monitoring syslog-ng, we saw it trying to reconnect:<br />
<br />
<pre><code><span style="color: blue;"><span class="hljs-variable">$grep</span> <span class="hljs-number">4242</span> syslog.log | grep failed
Connection failed; server=<span class="hljs-string">'AF_INET(127.0.0.1:4242)'</span>, error=<span class="hljs-string">'Connection refused (111)'</span>, time_reopen=<span class="hljs-string">'10'</span>
...
Connection failed; server=<span class="hljs-string">'AF_INET(127.0.0.1:4242)'</span>, error=<span class="hljs-string">'Connection refused (111)'</span>, time_reopen=<span class="hljs-string">'10'</span></span>
</code></pre>
<br />
All the messages made it through to our log file (hallelujah!), which means the downed TCP connection had no affect on the log file destination. And furthermore, syslog-ng continually retried to establish the TCP connection, so when the process did come back up, it reconnected! nice! One very important thing to note from the above output is the “time_reopen”. It turns out that this is a <a href="https://www.balabit.com/sites/default/files/documents/syslog-ng-ose-latest-guides/en/syslog-ng-ose-guide-admin/html/reference-options.html">global configuration option</a>, which tells syslog-ng how long to wait until re-attempting to establish the connection. So, the behavior was solid – and we can actually configure how noisy things get if/when we lose the bridge process.<br />
<br />
For the final test, we needed to see what happens when we apply back-pressure from the java process itself. In this case, syslog-ng can connect, but the java process refuses to read off the stream/socket. For this, we ran the same 100K test, but paused consuming in the java process to simulate a Kinesis slow-down. And again, we saw good things... All the messages made it through to the log file, and this time we saw messages from syslog-ng indicating that the ouput buffer (fifo queue) was full:<br />
<pre><code>
</code></pre>
<pre><code><span style="color: blue;">syslog<span class="hljs-built_in">.</span><span class="hljs-keyword">log</span>:Destination <span class="hljs-built_in">queue</span> <span class="hljs-literal">full</span>, dropping message; queue_len=<span class="hljs-string">'1000'</span>, mem_fifo_size=<span class="hljs-string">'1000'</span>
<span class="hljs-attribute">...</span>
syslog<span class="hljs-built_in">.</span><span class="hljs-keyword">log</span>:Destination <span class="hljs-built_in">queue</span> <span class="hljs-literal">full</span>, dropping message; queue_len=<span class="hljs-string">'1000'</span>, mem_fifo_size=<span class="hljs-string">'1000'</span></span>
</code></pre>
<br />
And the log message tells you everything, after the queue fills up, it drops messages on the floor until it can re-establish the socket.<br />
<br />
So, there you have it… to connect syslog to Kinesis, I’d recommend using a TCP output destination with some glue code between that and the Kinesis Producer Library (KPL). (and again, I’ll see if we can open source some of that) Just be careful, and apply the back pressure to syslog-ng. <br />
<br />
Per our experiments, syslog-ng can <a href="https://en.wikipedia.org/wiki/Back_That_Thang_Up">back that thang up</a>!Brian O'Neillhttp://www.blogger.com/profile/04198208062865899011noreply@blogger.com0tag:blogger.com,1999:blog-6872023396647681597.post-91019895917355484502015-10-06T10:05:00.000-07:002015-10-06T10:05:11.500-07:00Destinations for syslog-ng : UDP + Program (Java)This is just a quick mental note because I keep losing them...<br />
<br />
Here are two quick gists that show how to configure destinations within syslog-ng.<br />
<br />
<h4>
UDP Destination</h4>
This destination will spew UDP datagram packets:
<br />
<pre><span style="color: #3d85c6;">destination udp_spew { udp("localhost"
port(8052)
template("$MSGONLY\n")
template_escape(no)
); };</span>
</pre>
<h4>
Process Destination</h4>
This destination will spew log events at a java program:<br />
<pre><span style="color: #3d85c6;">destination program_spew { program("/opt/jdk1.7.0_79/bin/java -jar /mnt/foo.jar arg1 arg2"
template("$MSGONLY\n")
template_escape(no)
); };;</span>
</pre>
<h4>
Connecting the Destination to the Source</h4>
For both of these, don't forget to connect it to the source!<br />
<pre><span style="color: #3d85c6;">log { source(s_sys); ...; destination(program_spew); };</span></pre>
<br />Brian O'Neillhttp://www.blogger.com/profile/04198208062865899011noreply@blogger.com0tag:blogger.com,1999:blog-6872023396647681597.post-67405933514229392572015-10-05T06:34:00.002-07:002015-10-05T06:34:27.145-07:00Getting started w/ Python Kinesis Consumer Library (KCL) (via IPython Notebook)<br />
We are racing ahead with Kinesis. With streaming events in hand, we need to get our data scientists tapped in. One problem, the KCL and KPL are heavily focused on Java, but our data scientists (and the rest of our organization) love Python. =)<br />
<br />
We use IPython notebook to share python code/samples. So, in this blog I'm going to get you up and running, consuming events from Kinesis, using Python and IPython notebook. If you are already familiar with IPython notebook, you can pull the notebook from here:<br />
<a href="https://github.com/boneill42/kcl-python-notebook/">https://github.com/boneill42/kcl-python-notebook/</a><br />
<br />
If you aren't familiar with iPython Notebook, first go <a href="http://jupyter.org/">here</a> to get a quick overview. In short, IPython Notebook is a great little webapp that lets you play with python commands via the browser, creating a wiki/gist-like "recipe" that can be shared with others. Once familiar, read on.<br />
<br />
To get started with KCL-python, let's clone the amazon-kinesis-client-python github repo:<br />
<br />
<pre><span style="color: #3d85c6;"> git clone git@github.com:awslabs/amazon-kinesis-client-python.git</span>
</pre>
<br />
Then, create a new <a href="https://virtualenv.pypa.io/en/latest/">virtualenv</a>. (and if you don't use virtualenv, get with the program =)<br />
<pre>
<span style="color: #6fa8dc;"> virtualenv venv/kcl
source venv/kcl/bin/activate
</span></pre>
<br />
Next, get IPython Notebook setup. Install ipython notebook, by running the following commands from the directory containing the cloned repo:<br />
<br />
<pre><span style="color: #6fa8dc;"> pip install "ipython[notebook]"
ipython notebook</span>
</pre>
<br />
That should open a new tab in your browser. Start a new notebook by clicking "New->Python 2". A notebook comprises one or more cells. Each cell has a type. For this exercise, we'll use "Code" cells. We'll take advantage of "magic" functions available in IPython Notebook. We'll use <i>run,</i> which will execute a command. Specifically, need execute the python setup for KCL. <br />
<br />
The amazon-kinesis-client-python library actually rides on top of a Java process, and uses <a href="https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/multilang/MultiLangDaemon.java">MultiLangDaemon</a> for interprocess communication. Thus, to get KCL setup you need to download the jars. You can do that in the notebook by entering and executing the following in the first cell:<br />
<br />
<pre><span style="color: #3d85c6;"> run setup.py download_jars</span>
</pre>
<br />
Next, you need to install those jars with:<br />
<br />
<pre><span style="color: #3d85c6;"> run setup.py install</span>
</pre>
<br />
That command installed a few libraries for python (specifically -- it installed <a href="https://github.com/boto/boto">boto</a>, which is the aws library for python). Because the notebook doesn't dynamically reload, you'll need to restart IPython Notebook and re-open the notebook. (hmmm... Is there a way to reload the virtualenv used by the notebook w/o restarting?)<br />
<br />
Once, reloaded you are ready to start emitting events into a kinesis stream. For now, we'll use the sample application in the KCL repo. Create a new cell and run:<br />
<br />
<pre><span style="color: #3d85c6;"> run samples/sample_kinesis_wordputter.py --stream words -w cat -w dog -w bird -w lobster</span>
</pre>
<br />
From this, you should see:<br />
<br />
<pre><span style="color: #93c47d;">Connecting to stream: words in us-east-1
Put word: cat into stream: words
Put word: dog into stream: words
Put word: bird into stream: words
Put word: lobster into stream: words</span>
</pre>
<br />
Woo hoo! We are emitting events to Kinesis! Now, we need to consume them. =)<br />
<br />
To get that going, you will want to edit <i>sample.propertie</i>s. This file is loaded by KCL to configure the consumer. Most importantly, have a look at the <i>executableName</i> property in that file. This sets the name of the python code that the KCL will execute when it receives records. In the sample, this is <i>sample_kclpy_app.py. </i><br />
<br />
Have a look at that sample Python code. You'll notice there are four important methods that you will need to implement: <i>init</i>, <i>process_records</i>, <i>checkpoint</i>, and <i>shutdown</i>. The purpose of those methods is almost self-evident, but the documentation in the sample is quite good. Have a read through there.<br />
<br />
Also in the <i>sample.properties</i>, notice the <i>AWSCredentialsProvider</i>. Since it is using Java underneath the covers, you need to set the Java classname of the credentials provider. If left unchanged, it will use: <a href="http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html">DefaultAWSCredentialsProviderChain</a>.<br />
<br />
It is worth looking at that documentation, but probably the easiest route to get the authentication working is to create a <i>~/.aws/credentials</i> file that contains the following:<br />
<br />
<pre><span style="color: #f1c232;">[default]
aws_access_key_id=XXX
aws_secret_access_key=XXX</span>
</pre>
<br />
Once you have your credentials setup, you can crank up the consumer by executing the following command in another notebook cell:<br />
<br />
<pre> <span style="color: #3d85c6;">run samples/amazon_kclpy_helper.py --print_command --java /Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/bin/java --properties samples/sample.properties</span>
</pre>
<br />
Note that the <i>--java</i> parameter needs the location of the java executable. Now, running that in the notebook will give you the command line that you can go ahead and execute to consume the stream. Go ahead and cut-and-paste that into a command-line and execute it. At this point you are up and running.<br />
<br />
To get started with your own application, simply replace the <i>sample_kclpy_app.py</i> code with your own, and update the properties file, and you should be off and running.<br />
<br />
Now, I find this bit hokey. There is no native python consumer (yet!). And fact you actually need to run *java*, which will turn around and call python. Honestly, with this approach, you aren't getting much from the python library over what you would get from the Java library. (except for some pipe manipulation in MultiLangDaemon) Sure, it is a micro-services approach... but maybe it would be better to simply run a little python web service (behind a load balancer?). Then, we'd be able to scale the consumers better without re-sharding the Kinesis stream. (One shard/KCL worker could fan-out work to many python consumers across many boxes) You would need to be careful about checkpointing, but it might be a bit more flexible. (and would completely decouple the "data science" components from Kinesis) Definitely food for thought. =)<br />
<br />
As always, let me know if you run into trouble.<br />
<br />
Brian O'Neillhttp://www.blogger.com/profile/04198208062865899011noreply@blogger.com0tag:blogger.com,1999:blog-6872023396647681597.post-49556491054180505742015-09-25T15:04:00.007-07:002015-09-25T15:04:58.517-07:00Druid : Vagrant Up (and Tranquility!)<div>
<br /></div>
<div>
We've been shown the light. </div>
<div>
<br /></div>
<div>
After a great meeting with <a href="https://twitter.com/fangjin">Fangjin</a> and <a href="https://twitter.com/gianmerlino">Giam</a>, we are headed down the Tranquility route for real-time ingestion from Kinesis to Druid. For me, that means getting comfortable with the expanded set of node types in the Druid cluster.</div>
<div>
<br /></div>
<div>
There is no better way to get comfortable with the cluster and the new node types, than to spin up a complete environment and kick the tires a bit. And there is no better way to do that than "<a href="https://www.vagrantup.com/">vagrant up</a>"!</div>
<div>
<br /></div>
<div>
So, this morning I set out to create a complete environment suitable for Tranquility, inclusive of middle managers (which are typically omitted from a standalone "simple" cluster). I started with <a href="https://github.com/Quantiply/druid-vagrant">Quantily's vagrant config for Druid 0.6.160</a>, forked the repo, and went to work.</div>
<div>
<br /></div>
<div>
If you are impatient, you can find <a href="https://github.com/boneill42/druid-vagrant">my fork here</a> and get going. If you have the patience...</div>
<div>
<br /></div>
<div>
It's important to understand the anatomy of Druid cluster. First off, Druid relies on Zookeeper and MySQL. The <a href="https://github.com/boneill42/druid-vagrant/blob/master/install.sh">install.sh script</a> installs vanilla versions of these, and creates a druid database and user in MySQL. (Druid itself populates the schema at startup.)</div>
<div>
<br /></div>
<div>
The following is a list of the server types in a Druid cluster. On our vagrant server, each of the servers occupies a different port, and has its own configuration. (also detailed below)</div>
<div>
<br /></div>
<h4>
Overlord: (port 8080)</h4>
<div>
The overlord is responsible for task management on the Druid cluster. Since Tranquility is largely an orchestration engine, doling out tasks to create segments as they form in real-time, the overlord is the entry point into the Druid cluster for Tranquility. </div>
<div>
<a href="https://github.com/boneill42/druid-vagrant/blob/master/config/overlord/runtime.properties">https://github.com/boneill42/druid-vagrant/blob/master/config/overlord/runtime.properties</a></div>
<div>
<div>
<h4>
<br /></h4>
<h4>
Coordinator: (port 8081)</h4>
</div>
</div>
<div>
The coordinator is responsible for segment management, telling nodes to load/drop segments.</div>
<div>
<a href="https://github.com/boneill42/druid-vagrant/blob/master/config/coordinator/runtime.properties">https://github.com/boneill42/druid-vagrant/blob/master/config/coordinator/runtime.properties</a></div>
<div>
<h4>
<br /></h4>
<h4>
Broker: (port 8090)</h4>
</div>
<div>
The Broker is the query proxy in the system. It receives the query, knows which nodes have which segments, and proxies the query to the respective nodes.</div>
<div>
<a href="https://github.com/boneill42/druid-vagrant/blob/master/config/broker/runtime.properties">https://github.com/boneill42/druid-vagrant/blob/master/config/broker/runtime.properties</a></div>
<div>
<h4>
<br /></h4>
<h4>
Historical: (port 8082)</h4>
</div>
<div>
Historical nodes are the beasts that load the segments and respond to queries for "static" data. Once a segment has been committed to deep storage, it is loaded by a historical node, which responds to queries.</div>
<div>
<a href="https://github.com/boneill42/druid-vagrant/blob/master/config/historical/runtime.properties">https://github.com/boneill42/druid-vagrant/blob/master/config/historical/runtime.properties</a></div>
<div>
<div>
<h4>
<br /></h4>
<h4>
MiddleManager: (port 8100)</h4>
</div>
<div>
MiddleManagers are exactly that. =) They push tasks to Peons, which they spawn on the same node. Right now, one Peon works on one task, which is produces one segment. (that may change) </div>
</div>
<div>
<a href="https://github.com/boneill42/druid-vagrant/blob/master/config/middleManager/runtime.properties">https://github.com/boneill42/druid-vagrant/blob/master/config/middleManager/runtime.properties</a></div>
<div>
<br /></div>
<div>
<i><br /></i></div>
<div>
<i>Special Notes about Real-Time nodes:</i></div>
<div>
<a href="http://brianoneill.blogspot.com/2015/09/kinesis-druid-options-analysis-to-push.html">Per my previous blog</a>, we were debating the use of a real-time (RT) node. But with Tranquility, you don't need RT nodes. They are replaced with transient peon instances that run a temporary firehose ingesting the events for that segment directly from the Tranquility client. </div>
<div>
<br /></div>
<div>
Druid is targeting highly-available, fully-replicated, transactional real-time ingestion. Since RT nodes share nothing. They are unable to coordinate and unable to replicate data, which means they don't fit well into the strategic vision for the project. Tranquility, and its coordinated ingestion model, may eventually obsolesce/replace RT nodes entirely. (See <a href="https://github.com/druid-io/druid/issues/1642">#1642</a> for more information)</div>
<div>
<br /></div>
<h3>
Getting Started</h3>
<div>
OK -- down to business.</div>
<div>
<br /></div>
<div>
To fire up your own cluster, simply clone <a href="https://github.com/boneill42/druid-vagrant/">the repository</a> and "vagrant up". Once things are up and running, you can:</div>
<div>
<br /></div>
<div>
Hit the Overlord at:</div>
<div>
<a href="http://192.168.50.4:8080/console.html">http://192.168.50.4:8080/console.html</a></div>
<div>
<br /></div>
<div>
Hit the Coordinator at:</div>
<div>
<a href="http://192.168.50.4:8081/">http://192.168.50.4:8081/</a></div>
<div>
<br /></div>
<div>
In my next blog, I'll detail the client-side of integrating Tranquility using the direct Finagle API.</div>
<div>
<br /></div>
<div>
<br /></div>
<div>
<br /></div>
<div>
<br /></div>
Brian O'Neillhttp://www.blogger.com/profile/04198208062865899011noreply@blogger.com0tag:blogger.com,1999:blog-6872023396647681597.post-72531696167324715342015-09-24T07:03:00.002-07:002015-09-24T08:12:46.179-07:00Kinesis -> Druid : Options Analysis (to Push? to Pull? to Firehose? to Nay Nay?)<br />
Druid is one of the best slicers and dicers on the planet (except maybe for the <a href="https://www.youtube.com/watch?v=1GniNeqSX5U">Vegematic-2</a> ;). And I know there are people out there that might argue that Elastic Search can do the job (<a href="https://twitter.com/kimchy/status/410336696660946944">Shay</a> & Isaac, I'm looking at you), but MetaMarkets and Yahoo have proved that Druid can scale to some pretty insane numbers, with <a href="https://www.linkedin.com/pulse/combining-druid-spark-interactive-flexible-analytics-scale-butani">query times an order of magnitude better than Spark</a>. And because of that, we've come to rely on Druid as our main analytics database.<br />
<br />
The challenge is figuring out how best to pipe events into Druid. As I pointed out in my previous post, <a href="http://brianoneill.blogspot.com/2015/09/kinesis-20x-cheaper-when-used-with.html">Kinesis is now a contender</a>. Alas, Druid has no first class support for Kinesis. Thus, we need to figure out how to plumb Kinesis to Druid. (<-- druidish="" intended="" p="" pun=""><br />
First, let me send out some Kudos to Cheddar, Fangjin, Gian and the crew...<br />
They've made *tremendous* progress since I last looked at Druid. (0.5.X days)<br />
That progress has given us the additional options below.<br />
<br />
Here are the options as I see them:<br />
<br />
</--><br />
<h4>
Option 1: Traditional KinesisFirehose Approach</h4>
<div>
In the good ole' days of 0.5.X, real-time data flowed via Firehoses. To introduce a new ingest mechanism, one simply implemented the Firehose interface, fired up a real-time node, and voila: data flowed like spice. (this is what I detailed in <a href="http://www.amazon.com/Storm-Blueprints-Distributed-Real-time-Computation/dp/178216829X">the Storm book</a>)</div>
<div>
<br /></div>
<div>
And since 0.5, Druid has made some updates to the Firehose API, to specifically address the issues that we saw before (around check-pointing, and making sure events are processed only once, in a highly-available manner). For the update on the Firehose API, check out <a href="https://groups.google.com/forum/#!topic/druid-development/9HB9hCcqvuI">this thread</a>.</div>
<div>
<br /></div>
<div>
After reading that thread, I was excited to crank out a KinesisFirehose. And we *nearly* have a KinesisFirehose up and running. As with most Java projects these days, you spend 15 minutes on the code, and then two hours working out dependencies. Ironically, Amazon's Kinesis Producer Library (KPL) uses an *older* version of the aws-java-sdk then Druid, and because the firehose runs in the same JVM as the Druid code, you have to workout the classpath kinks. When I hit this wall, it got me thinking -- hmmm, maybe some separation might be good here. ;)</div>
<div>
<br /></div>
<div>
To summarize what I did: I took the KPL's push model (implemented IRecordProcessor), put a simple little BlockingQueue in place, and wired it to Druid's pull model (Firehose.nextRow). It worked like a charm in unit tests. ;)</div>
<div>
<br /></div>
<div>
(ping me, if you want to collaborate on the firehose implementation)</div>
<br />
<h4>
Option 2: The EventReceiver Firehose</h4>
<div>
Of course, the whole time I'm implementing the Firehose -- I'm on the consuming end of a push model from the KPL and on the producing end of a pull model from Druid, and forced to dance between those two. It made me wonder, "Isn't there a way to push data into Druid?". If there were such a thing, then I could just push data into Druid as I receive it from the KPL. (that sounds nice/simple doesn't it?)</div>
<div>
<br /></div>
<div>
This ends up being the crux of the issue, and something that <a href="https://groups.google.com/forum/#!searchin/druid-development/fangjin$20yang$20%22thoughts%22/druid-development/aRMmNHQGdhI/muBGl0Xi_wgJ">many people have been wrestling with</a>. It turns out that there is a firehose specifically for this! Boo yah! If you have a look at the <a href="http://druid.io/docs/latest/ingestion/firehose.html">available firehoses</a>, you'll see an Event Receiver firehose. The<a href="https://github.com/druid-io/druid/blob/master/server/src/main/java/io/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.java"> Event Receiver firehose</a> creates a REST endpoint to which you can POST events. (have a look at the addAll() method's <complete id="goog_1043808443">javax.ws.rs.POST annotation)</complete></div>
<div>
<complete><br /></complete></div>
<div>
<complete>Beautiful. Now, I could receive messages from the KPL and POST them to Druid. Simple. I move the checkpoint in Kinesis after I safely receive 200's for all of my events. Unfortunately, what happens to those events on the Druid side if the node that receives them fails? Also, how do I scale that ingest mechanism? Are there multiple REST endpoints behind a load balancer? </complete></div>
<div>
<complete><br /></complete></div>
<div>
<complete>I started up a real-time node with a spec file instructing Druid to fire up (<- 404="" a="" an="" and="" another="" appeared="" back="" but="" came="" code="" complete="" documentation="" druidish="" endpoint.="" eventreceiver.="" found="" from="" grep="" ing="" nbsp="" nothing="" out="" pointed="" pun="" s="" started="" startup="" t="" that="" the="" to="" url="" workerresource:=""><!-----></-></complete></div>
<div>
<complete><span style="background-color: #e0e0e0; color: #c7254e; font-family: Menlo, Monaco, Consolas, 'Courier New', monospace; font-size: 12.6px; line-height: 23.8px;"><br /></span></complete></div>
<div>
<complete><span style="background-color: #e0e0e0; color: #c7254e; font-family: Menlo, Monaco, Consolas, 'Courier New', monospace; font-size: 12.6px; line-height: 23.8px;">http://<peonhost>:<port>/druid/worker/v1/chat/<eventreceiverservicename>/push-events/</eventreceiverservicename></port></peonhost></span></complete></div>
<div>
<complete><br /></complete></div>
<div>
<complete>Well, that <a href="https://github.com/druid-io/druid/blob/master/indexing-service/src/main/java/io/druid/indexing/worker/http/WorkerResource.java">WorkerResource</a> was over in the indexing-service... hmmm, what is an IndexingService?</complete></div>
<div>
<complete><br /></complete></div>
<div>
<complete>We tried to answer all the above questions around availability and scalability, and did some further research on the IndexingService. That lead us to <a href="https://github.com/druid-io/tranquility">Tranquility</a>. (hey, that sounds so nice and pleasant, maybe we want to use that!?)</complete></div>
<div>
<complete><br /></complete></div>
<div>
<complete></complete><br />
<h4>
Option 3: Tranquility</h4>
<div>
In doing some due-diligence on Tranquility, I discovered that while I was away, Druid implemented their own Task management system!! (See the <a href="http://druid.io/docs/latest/design/indexing-service.html">Indexing-Service documentation</a> for more information) Honestly, my first reaction was to run for the hills. Why would Druid implement their own task management system? MiddleManagers and Peons... sounds an awful lot like YARN. I thought the rest of the industry was moving to YARN (spark, samza, storm-yarn, etc.) YARN was appropriately named, Yet Another Resource Negotiator, because everyone was building their own! What happened to the simplicity of Druid, and the real-time node?</div>
<div>
<br /></div>
<div>
Despite that visceral reaction, I decided to give Tranquility a whirl. I fired up the <a href="http://druid.io/docs/latest/configuration/simple-cluster.html">simple cluster from the documentation</a>, incorporated the Finagle-based API integration and was able to get data flowing through the system. I watched the overlord's console to watch the tasks move from pending to running to complete. It worked! I was happy... sort of. I was left with a "these are small Hadoop jobs" taste in my mouth. It didn't feel like events were "streaming". </div>
<div>
<br /></div>
<div>
Now, this might just be me. In reality, many/all of the streaming frameworks are actually just doing micro-batching. And with Druid's segment-based architecture, the system really needs to wait for a windowPeriod/segmentGranularity to pass before a segment can be published. I get it.</div>
<div>
<br /></div>
<div>
I'm just scared. For me, the more nouns involved, the more brittle the system: tasks, overlords, middle managers, peons, etc. Maybe I should just get myself some kahonas, and plow ahead.</div>
<h4>
<br /></h4>
<h4>
Next Step:</h4>
<div>
Regardless, we plan to meet up with Fangjin Yang and Gian Merlino, who are presently coming out of Stealth mode with a new company. I have a tremendous amount of faith in these two. I'm confident they will steer us in the right direction, and when they do -- I'll report back. =)</div>
<div>
<br /></div>
<div>
<br /></div>
<div>
<br /></div>
</div>
<div>
<br /></div>
<br />
<br />
<br />
<br /><!------>Brian O'Neillhttp://www.blogger.com/profile/04198208062865899011noreply@blogger.com0tag:blogger.com,1999:blog-6872023396647681597.post-11628099154572405602015-09-21T20:12:00.005-07:002015-09-21T20:16:19.855-07:00Amazon Kinesis : 20x cheaper when used with the Kinesis Producer Library (KPL)!We've been looking at the tradeoffs between Kafka and Kinesis, and just recently.... Kinesis won! <br />
<br />
Kinesis pricing is based on two dimensions: hourly price for each shard (which dictates overall concurrency/throughput), and a unit price per 1M messages. As I alluded to in my previous post, we process a *ton* of events, and if we simply pushed those events onto Kinesis streams 1:1, we would quickly hand over all of our money to Amazon and go out of business.<br />
<br />
However, with the recent release of the <a href="https://github.com/awslabs/amazon-kinesis-producer">Kinesis Producer Library</a> (KPL), Amazon exposed a native capability to aggregate multiple messages/events into a single PUT unit. The maximum size of a PUT unit is 25Kb. If you have a message size of 150 bytes, you can cram about 150 messages into a single PUT unit! (and save some serious dough!)<br />
<br />
Here is the math, straight from the <a href="http://calculator.s3.amazonaws.com/index.html#s=KINESIS">Kinesis Pricing Calculator</a>:<br />
<br />
Without the KPL,<br />
100K m / s * (150 bytes / m) = 100 shards and 263,520M PUT units = <b>$4,787.28 / month</b><br />
<br />
* Note: That each shard can only process 1K/s, which is why we end up with 100 shards.<br />
<div>
<br /></div>
Now with KPL, let's assume we can only fit 100 messages in each PUT unit.<br />
We would reduce our required throughput (messages / second) down to 1K / s.<br />
With 100 messages in each PUT unit, each unit would be 15Kb in size. (100 * 150 bytes)<br />
<br />
This gives us:<br />
1K m / s * (15 Kb / m) = 15 shards and 2,635.2M PUT units = <b>$201.60 / month</b><br />
<br />
That is s savings of: ~20x!!<br />
<br />
So, let's look at what that means architecturally...<br />
<br />
First, you are going to want to look at the KPL recommended usage matrix:<br />
<a href="http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-kpl-integration.html">http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-kpl-integration.html</a><br />
<br />
My naive interpretation of that chart is: Use Java!<br />
<br />
Ironically, the producer is actually a native built micro-service/binary. And the KPL is a java wrapper around that native binary that uses interprocess communication to delegate work to the micro-service.<br />
<br />
In Java, it is straightforward to integrate the KPL. Here are the key components.<br />
<h4>
First, you need to configure the producer:</h4>
<div style="background: #ffffff; border-width: .1em .1em .1em .8em; border: solid gray; overflow: auto; padding: .2em .6em; width: auto;">
<table><tbody>
<tr><td><pre style="line-height: 125%; margin: 0;">1
2
3
4
5
6
7
8
9</pre>
</td><td><pre style="line-height: 125%; margin: 0;">KinesisProducerConfiguration config <span style="color: #333333;">=</span> <span style="color: #008800; font-weight: bold;">new</span> KinesisProducerConfiguration<span style="color: #333333;">();</span>
config<span style="color: #333333;">.</span><span style="color: #0000cc;">setRegion</span><span style="color: #333333;">(</span>region<span style="color: #333333;">);</span>
config<span style="color: #333333;">.</span><span style="color: #0000cc;">setCredentialsProvider</span><span style="color: #333333;">(</span><span style="color: #008800; font-weight: bold;">new</span> ProfileCredentialsProvider<span style="color: #333333;">());</span>
config<span style="color: #333333;">.</span><span style="color: #0000cc;">setMaxConnections</span><span style="color: #333333;">(</span><span style="color: #0000dd; font-weight: bold;">1</span><span style="color: #333333;">);</span>
config<span style="color: #333333;">.</span><span style="color: #0000cc;">setRequestTimeout</span><span style="color: #333333;">(</span><span style="color: #0000dd; font-weight: bold;">60000</span><span style="color: #333333;">);</span>
config<span style="color: #333333;">.</span><span style="color: #0000cc;">setRecordMaxBufferedTime</span><span style="color: #333333;">(</span><span style="color: #0000dd; font-weight: bold;">15000</span><span style="color: #333333;">);</span>
config<span style="color: #333333;">.</span><span style="color: #0000cc;">setNativeExecutable</span><span style="color: #333333;">(</span>
<span style="background-color: #fff0f0;">"/Users/brianoneill/git/amazon-kinesis-producer/bin/darwin-4.2.1/debug/kinesis_producer"</span><span style="color: #333333;">);</span>
producer <span style="color: #333333;">=</span> <span style="color: #008800; font-weight: bold;">new</span> KinesisProducer<span style="color: #333333;">(</span>config<span style="color: #333333;">);</span>
</pre>
</td></tr>
</tbody></table>
</div>
<br />
<br />
Notice, that the KPL actually sends the messages asynchronously, buffering/aggregating those messages internally. (which makes for challenging guarantees around message delivery -- but we'll defer that problem for another day) <br />
<br />
Also notice, that I built my own native binary, and specified its location. You don't necessarily need to do that, the binary is actually bundled into the JAR file and extracted into a temporary location at runtime. (yeah, crazy eh?)<br />
<br />
<b>Next, you need to send some data:</b><br />
<br />
This is done simply, one line of code:<br />
<br />
<div style="background: #ffffff; border-width: .1em .1em .1em .8em; border: solid gray; overflow: auto; padding: .2em .6em; width: auto;">
<table><tbody>
<tr><td><pre style="line-height: 125%; margin: 0;">1
2</pre>
</td><td><pre style="line-height: 125%; margin: 0;">ListenableFuture<span style="color: #333333;"><</span>UserRecordResult<span style="color: #333333;">></span> f <span style="color: #333333;">=</span> producer<span style="color: #333333;">.</span><span style="color: #0000cc;">addUserRecord</span><span style="color: #333333;">(</span>streamName<span style="color: #333333;">,</span>
Long<span style="color: #333333;">.</span><span style="color: #0000cc;">toString</span><span style="color: #333333;">(</span>System<span style="color: #333333;">.</span><span style="color: #0000cc;">currentTimeMillis</span><span style="color: #333333;">()),</span> data<span style="color: #333333;">);</span>
</pre>
</td></tr>
</tbody></table>
</div>
<br />
<br />
<b>Finally, you need to find out what happened to your message:</b><br />
<br />
For this one, we'll add a callback to the future:<br />
<br />
<div style="background: #ffffff; border-width: .1em .1em .1em .8em; border: solid gray; overflow: auto; padding: .2em .6em; width: auto;">
<table><tbody>
<tr><td><pre style="line-height: 125%; margin: 0;">1</pre>
</td><td><pre style="line-height: 125%; margin: 0;">Futures<span style="color: #333333;">.</span><span style="color: #0000cc;">addCallback</span><span style="color: #333333;">(</span>f<span style="color: #333333;">,</span> callback<span style="color: #333333;">);</span>
</pre>
</td></tr>
</tbody></table>
</div>
<br />
Where, callback is a method that will be called when the record is processed. Here is an example:<br />
<br />
<!-- HTML generated using hilite.me --><br />
<div style="background: #ffffff; border-width: .1em .1em .1em .8em; border: solid gray; overflow: auto; padding: .2em .6em; width: auto;">
<table><tbody>
<tr><td><pre style="line-height: 125%; margin: 0;"> 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18</pre>
</td><td><pre style="line-height: 125%; margin: 0;"><span style="color: #008800; font-weight: bold;">final</span> FutureCallback<span style="color: #333333;"><</span>UserRecordResult<span style="color: #333333;">></span> callback <span style="color: #333333;">=</span> <span style="color: #008800; font-weight: bold;">new</span> FutureCallback<span style="color: #333333;"><</span>UserRecordResult<span style="color: #333333;">>()</span> <span style="color: #333333;">{</span>
<span style="color: #555555; font-weight: bold;">@Override</span>
<span style="color: #008800; font-weight: bold;">public</span> <span style="color: #333399; font-weight: bold;">void</span> <span style="color: #0066bb; font-weight: bold;">onFailure</span><span style="color: #333333;">(</span>Throwable t<span style="color: #333333;">)</span> <span style="color: #333333;">{</span>
<span style="color: #008800; font-weight: bold;">if</span> <span style="color: #333333;">(</span>t <span style="color: #008800; font-weight: bold;">instanceof</span> UserRecordFailedException<span style="color: #333333;">)</span> <span style="color: #333333;">{</span>
Attempt last <span style="color: #333333;">=</span> Iterables<span style="color: #333333;">.</span><span style="color: #0000cc;">getLast</span><span style="color: #333333;">(((</span>UserRecordFailedException<span style="color: #333333;">)</span> t<span style="color: #333333;">).</span><span style="color: #0000cc;">getResult</span><span style="color: #333333;">().</span><span style="color: #0000cc;">getAttempts</span><span style="color: #333333;">());</span>
LOGGER<span style="color: #333333;">.</span><span style="color: #0000cc;">error</span><span style="color: #333333;">(</span>String<span style="color: #333333;">.</span><span style="color: #0000cc;">format</span><span style="color: #333333;">(</span><span style="background-color: #fff0f0;">"Record failed to put - %s : %s"</span><span style="color: #333333;">,</span> last<span style="color: #333333;">.</span><span style="color: #0000cc;">getErrorCode</span><span style="color: #333333;">(),</span>
last<span style="color: #333333;">.</span><span style="color: #0000cc;">getErrorMessage</span><span style="color: #333333;">()));</span>
<span style="color: #333333;">}</span>
LOGGER<span style="color: #333333;">.</span><span style="color: #0000cc;">error</span><span style="color: #333333;">(</span><span style="background-color: #fff0f0;">"Exception during put"</span><span style="color: #333333;">,</span> t<span style="color: #333333;">);</span>
System<span style="color: #333333;">.</span><span style="color: #0000cc;">exit</span><span style="color: #333333;">(</span><span style="color: #0000dd; font-weight: bold;">1</span><span style="color: #333333;">);</span>
<span style="color: #333333;">}</span>
<span style="color: #555555; font-weight: bold;">@Override</span>
<span style="color: #008800; font-weight: bold;">public</span> <span style="color: #333399; font-weight: bold;">void</span> <span style="color: #0066bb; font-weight: bold;">onSuccess</span><span style="color: #333333;">(</span>UserRecordResult result<span style="color: #333333;">)</span> <span style="color: #333333;">{</span>
<span style="color: #888888;">// LOGGER.info("Successfully wrote [" + result.toString() + "]");</span>
completed<span style="color: #333333;">.</span><span style="color: #0000cc;">getAndIncrement</span><span style="color: #333333;">();</span>
<span style="color: #333333;">}</span>
<span style="color: #333333;">};</span>
</pre>
</td></tr>
</tbody></table>
</div>
<br />
<br />
And there you have it, KPL in a nutshell.<br />
<br />
For us, it was a huge win, and actually made Kinesis a viable alternative.<br />
Now, we just need to figure out how to plumb it into everything else. =)<br />
<br />
Kinesis Firehose to the rescue! (stay tuned)<br />
<br />
<br />
<br />
<br />Brian O'Neillhttp://www.blogger.com/profile/04198208062865899011noreply@blogger.com0tag:blogger.com,1999:blog-6872023396647681597.post-40149895044713484702015-09-18T09:03:00.000-07:002015-09-18T09:10:54.643-07:00Gone Monetate : Personalizing Marketing at 100K events/second<br />
Internet advertising has gone the way of junk mail. There are tons of impersonal advertisements sucking up screen real-estate and mailbox space. The majority of the advertising industry leverages only inane intelligence that assumes, "because I clicked on a rubber ducky once, I must want to buy a rubber ducky". In fact, I probably want to see rubber duckies on every site I visit, which is the internet equivalent of a mailbox full of refinance offers the second I have my credit checked. I'd much rather be shown things I care about. Even better, show me stuff I didn't know I cared about!<br />
<br />
That is the reason I joined Monetate. Well, it was that combined with the opportunity to join a rock-star team of passionate geniuses, working on an incredibly challenging real-time data and analytics problem, hell-bent on changing the world.<br />
<br />
I've only been on the team for a week, and I've been blown away. Directed by Jeff Persch, the development teach has architected a phenomenal platform: scalable and resilient, with simplicity as a core tenant. With Monetate at the center of many of the e-commerce interactions,<a href="http://www.monetate.com/news/monetate-influences-one-in-every-three-dollars-spent-online-during-black-friday-2013-in-u-s/"> influencing almost 1/3 of the dollars spent on Black Friday</a>, that is no small feat. We are gearing up for another black friday, for which peak traffic may hit 50-100K events / second!<br />
<br />
Doing some math on that, the numbers get big really fast:<br />
<br />
100K / s = 6M / minute = 360M / hour = ~8B events / day<br />
<br />
Yep, that ended in the B word... per day. And like everyone else, we are pushing to deliver better, faster, and more insightful analytics on those user activity streams... in near real-time.<br />
<br />
And because of that, I hope to have many useful/fun blog posts coming.<br />
<br />
Right now, we're in the process of rolling out Druid, a favorite technology of mine. (and something to which we dedicated a whole chapter in<a href="http://www.amazon.com/Storm-Blueprints-Distributed-Real-time-Computation/dp/178216829X"> our Storm book</a> -- (shameless plug ;)) With Druid in place, we'll be able to slice and dice the real-time events. <br />
<br />
And we already have the ability to ingest user context in real-time. When you combine that user context with stream processing that allows us to identify behavioral trends, our decision engine can make contextual decisions, adapt on the fly, and deliver relevant/interesting content to the end-user!<br />
<br />
Hooray!<br />
<br />
Imagine if we could only do that for the real world too. Instead of coming home to a mailbox filled with junk mail that I immediately throw awawy, I'd have updates on the latest gadgetry and fishing paraphernalia. Hmm, maybe I can get that on the roadmap. ;)<br />
<br />
Anyway, I couldn't be happier with my choice to join Monetate. The people, culture and technology are astounding.<br />
<br />
Stay tuned -- likely my next blog post will show how to leverage Amazon's Kinesis for 1/100th the cost. (by aggregating events into a smaller number of Kinesis records, decreasing the overall message rate, you can save mondo dough =)<br />
<br />
<br />
<br />
<br />
<br />Brian O'Neillhttp://www.blogger.com/profile/04198208062865899011noreply@blogger.com0tag:blogger.com,1999:blog-6872023396647681597.post-54835722723694155782015-07-16T11:24:00.001-07:002015-07-20T06:15:41.387-07:00Cloud Formation on AWS for Cassandra + HPCC<br />
If your primary objective is to setup a simple Cassandra cluster, then you probably want to start here:<br />
http://docs.datastax.com/en/cassandra/2.1/cassandra/install/installAMI.html<br />
<br />
However, if you have an existing AWS cluster to which you want to add Cassandra, then read on.<br />
<br />
In my case, I wanted to add Cassandra to an existing HPCC cluster. More specifically, I wanted to be able to spin-up an HPCC + Cassandra cluster with a single command. To accomplish this, I decided to add a bit of python scripting on top of Cloud Formation.<br />
<br />
Amazon has a facility called Cloud Formation. Cloud Formation reads a JSON template file, and creates instances as described in that file. (pretty slick) Within that JSON, you can execute shell commands that do the heavy lifting. The JSON file can define parameters that the administrator can then provide via the management console, or via AWS CLI. <br />
(IMHO, I suggest <a href="http://docs.aws.amazon.com/cli/latest/userguide/installing.html">installing AWS CLI</a>)<br />
<br />
<h3>
Running a Cloud Formation</h3>
First, I started with Tim Humphrie's <a href="https://github.com/tlhumphrey2/EasyFastHPCCoAWS">EasyFastHPCCoAWS</a>. That cloud formation template is a great basis. It installs AWS CLI, and copies the contents of an S3 bucket down into /home/ec2-users. Have a look at the <a href="https://github.com/tlhumphrey2/EasyFastHPCCoAWS/blob/master/MyHPCCCloudFormationTemplate.json">template file</a>. To get that up and running, it is a simple matter of creating a PlacementGroup, a KeyPair, and an S3 bucket, into which you copy the contents of the github repo. For simplicity, I named all of those the same thing: "realtime-hpcc".<br />
<br />
Now, with a single command, I can fire up a low-cost HPCC cluster with the following:<br />
<br />
<div style="background: #ffffff; border-width: .1em .1em .1em .8em; border: solid gray; overflow: auto; padding: .2em .6em; width: auto;">
<pre style="line-height: 125%; margin: 0;">aws cloudformation create-stack --capabilities CAPABILITY_IAM --stack-name realtime-hpcc --template-body https://s3.amazonaws.com/realtime-hpcc/MyHPCCCloudFormationTemplate.json --parameters <span style="background-color: #fff0f0; color: #666666; font-weight: bold;">\</span>
<span style="color: #996633;">ParameterKey</span><span style="color: #333333;">=</span>HPCCPlacementGroup,ParameterValue<span style="color: #333333;">=</span>realtime-hpcc <span style="background-color: #fff0f0; color: #666666; font-weight: bold;">\</span>
<span style="color: #996633;">ParameterKey</span><span style="color: #333333;">=</span>HPCCPlatform,ParameterValue<span style="color: #333333;">=</span>HPCC-Platform-5.2.2-1 <span style="background-color: #fff0f0; color: #666666; font-weight: bold;">\</span>
<span style="color: #996633;">ParameterKey</span><span style="color: #333333;">=</span>KeyPair,ParameterValue<span style="color: #333333;">=</span>realtime-hpcc <span style="background-color: #fff0f0; color: #666666; font-weight: bold;">\</span>
<span style="color: #996633;">ParameterKey</span><span style="color: #333333;">=</span>MasterInstanceType,ParameterValue<span style="color: #333333;">=</span>c3.2xlarge <span style="background-color: #fff0f0; color: #666666; font-weight: bold;">\</span>
<span style="color: #996633;">ParameterKey</span><span style="color: #333333;">=</span>NumberOfRoxieNodes,ParameterValue<span style="color: #333333;">=</span>1 <span style="background-color: #fff0f0; color: #666666; font-weight: bold;">\</span>
<span style="color: #996633;">ParameterKey</span><span style="color: #333333;">=</span>NumberOfSlaveInstances,ParameterValue<span style="color: #333333;">=</span>1 <span style="background-color: #fff0f0; color: #666666; font-weight: bold;">\</span>
<span style="color: #996633;">ParameterKey</span><span style="color: #333333;">=</span>NumberOfSlavesPerNode,ParameterValue<span style="color: #333333;">=</span>2 <span style="background-color: #fff0f0; color: #666666; font-weight: bold;">\</span>
<span style="color: #996633;">ParameterKey</span><span style="color: #333333;">=</span>RoxieInstanceType,ParameterValue<span style="color: #333333;">=</span>c3.2xlarge <span style="background-color: #fff0f0; color: #666666; font-weight: bold;">\</span>
<span style="color: #996633;">ParameterKey</span><span style="color: #333333;">=</span>ScriptsS3BucketFolder,ParameterValue<span style="color: #333333;">=</span>s3://riptide-hpcc/ <span style="background-color: #fff0f0; color: #666666; font-weight: bold;">\</span>
<span style="color: #996633;">ParameterKey</span><span style="color: #333333;">=</span>SlaveInstanceType,ParameterValue<span style="color: #333333;">=</span>c3.2xlarge <span style="background-color: #fff0f0; color: #666666; font-weight: bold;">\</span>
<span style="color: #996633;">ParameterKey</span><span style="color: #333333;">=</span>UserNameAndPassword,ParameterValue<span style="color: #333333;">=</span>riptide/HIDDEN
</pre>
</div>
<br />
Note, I specified the template via https url. I also specified a stack-name, which is what you'll use when querying AWS for status, which you can do with the following command:<br />
<!-- HTML generated using hilite.me --><br />
<div style="background: #ffffff; border-width: .1em .1em .1em .8em; border: solid gray; overflow: auto; padding: .2em .6em; width: auto;">
<pre style="line-height: 125%; margin: 0;">aws cloudformation describe-stacks --stack-name realtime-hpcc
</pre>
</div>
<br />
With that you get a nice, clean JSON back that looks something like this:<br />
<br />
<div style="background: #ffffff; border-width: .1em .1em .1em .8em; border: solid gray; overflow: auto; padding: .2em .6em; width: auto;">
<pre style="line-height: 125%; margin: 0;">{
<span style="color: #007700;">"Stacks"</span>: [
{
<span style="color: #007700;">"StackId"</span>: <span style="background-color: #fff0f0;">"arn:aws:cloudformation:us-east-1:633162230041:stack/realtime-hpcc/e609e0b0-2595-11e5-97b7-5001b34a4a0a"</span>,
<span style="color: #007700;">"Description"</span>: <span style="background-color: #fff0f0;">"Launches instances for fast executing HPCC on AWS. Plus, it sets up and starts HPCC System."</span>,
<span style="color: #007700;">"Parameters"</span>: [
{
<span style="color: #007700;">"ParameterValue"</span>: <span style="background-color: #fff0f0;">"realtime-hpcc"</span>,
<span style="color: #007700;">"ParameterKey"</span>: <span style="background-color: #fff0f0;">"KeyPair"</span>
}<span style="background-color: #ffaaaa; color: red;">...</span>
],
<span style="color: #007700;">"Tags"</span>: [],
<span style="color: #007700;">"CreationTime"</span>: <span style="background-color: #fff0f0;">"2015-07-08T17:22:24.461Z"</span>,
<span style="color: #007700;">"Capabilities"</span>: [
<span style="background-color: #fff0f0;">"CAPABILITY_IAM"</span>
],
<span style="color: #007700;">"StackName"</span>: <span style="background-color: #fff0f0;">"realtime-hpcc"</span>,
<span style="color: #007700;">"NotificationARNs"</span>: [],
<span style="color: #007700;">"StackStatus"</span>: <span style="background-color: #fff0f0;">"CREATE_IN_PROGRESS"</span>,
<span style="color: #007700;">"DisableRollback"</span>: <span style="color: #008800; font-weight: bold;">false</span>
}
]
}
</pre>
</div>
<br />
The "StackStatus" is the key property. You'll want to wait until that says, "CREATE_COMPLETE".<br />
Once it completes, you can go into the management console and see your EC2 instances.<br />
<br />
If something went wrong, you can go have a look in /var/log/user-data.log. Tim's template conveniently redirects the output of the shell commands to that log file.<br />
<h3>
Installing Cassandra </h3>
NOW -- to actually get Cassandra installed on the machines, I simply forked Tim's work and altered the Cloud Formation template to include <a href="https://github.com/boneill42/hpcc-cassandra-cluster-on-aws/commit/783458ccb774507a8e1252319a30a5ee9b899271">the datastax repo and a yum install of Cassandra</a>. And the next time I created my cluster: poof magic voodoo, Cassandra was installed!<br />
<br />
Next I needed to configure the Cassandra instances into a cluster. At first, I tried to do this using a shell script executed as part of the cloud formation, but that proved difficult because I wanted the IP addresses for all the nodes, not just the one on which the script was running. I shifted gears and decided to orchestrate the configuration from python after the cloud had already formed.<br />
<br />
I wrote a <a href="https://github.com/boneill42/hpcc-cassandra-cluster-on-aws/blob/master/configure_local_cassandra.py">quick little python script</a> (configure_local_cassandra.py) that takes four parameters: the location of the cassandra.yaml file, the cluster name, the private IPs of the Cassandra nodes, and the IP of the node itself. The python script updates the cassandra config, substituting those values into <a href="https://github.com/boneill42/hpcc-cassandra-cluster-on-aws/blob/master/cassandra.yaml.template">the template file</a>. I added this to the S3 bucket, and Cloud Formation took care of deploying the template and the python script to the machines. (thanks to Tim's template)<br />
<br />
<h3>
Configuring Cassandra </h3>
With that script and the template in place on each machine, the final piece is the script that gathers the IP addresses for the nodes and calls the python script via ssh. For this, we use the aws ec2 cli, and fetch the JSON for all of our instances. The aws ec2 command looks like this:<br />
<!-- HTML generated using hilite.me --><br />
<div style="background: #ffffff; border-width: .1em .1em .1em .8em; border: solid gray; overflow: auto; padding: .2em .6em; width: auto;">
<pre style="line-height: 125%; margin: 0;">aws ec2 describe-instances
</pre>
</div>
<br />
I wrote a python script (<a href="https://github.com/boneill42/cassandra-cloudformation-for-aws/blob/master/configure_cassandra_cluster.py">configure_cassandra_cluster.py</a>) that parses that JSON and run commands on each of the nodes via ssh. <br />
<br />
To make everything simple, I added a bunch of shell scripts that wrap all the command lines (so I don't need to remember all the parameters). The shell scripts are as follow<br />
<h3>
Convenience Scripts</h3>
<div>
To keep simple, I also added <a href="https://github.com/boneill42/cassandra-cloudformation-for-aws">a bunch of shell scripts</a> that wrap all the command lines (so I don't need to remember all the parameters). The shell scripts allow you to create a cluster, get the status of a cluster, and delete a cluster using a single command line:<br />
<br />
<pre style="line-height: 16.25px;">create_stack.sh, get_status.sh, delete_stack.sh</pre>
</div>
<div>
<br /></div>
<div>
(respectively)</div>
<div>
<br /></div>
<div>
<h3>
Putting it all together...</h3>
</div>
<div>
To summarize, the create_stack.sh script uses aws cloudformation to create the cluster.</div>
<div>
Then, you can watch the status of the cluster with, get_status.sh.</div>
<div>
Once formed, the configure_cassandra_cluster.py script installs, configures and starts Cassandra.</div>
<div>
<br /></div>
<div>
After that, you should be able to run ecl using Casssandra!</div>
<div>
<br /></div>
<div>
Feel free to take these scripts, and apply them to other things. And kudos to Tim Humphries for the cloud formation template.</div>
<div>
<br /></div>
<div>
<br /></div>
Brian O'Neillhttp://www.blogger.com/profile/04198208062865899011noreply@blogger.com0tag:blogger.com,1999:blog-6872023396647681597.post-88871239549166313112015-06-10T13:35:00.002-07:002015-06-10T13:44:51.588-07:00Amazon Echo : Syntax, Semantics, Intents and Goals: NLP over time. So I caved. Even with all my Apple paraphernalia, I bought an <a href="http://www.amazon.com/oc/echo/">Amazon Echo</a>. I've had it for a little over a week, and I'm hooked. We use it to play music, check the weather, and set timers -- all of the out of the box functionality. You may think, "It's just Siri in a room". And that is one perspective. From a different perspective, Alexa (the name you use to interact with Echo) will change everything, and bring the Internet of Things (IoT) to the masses. Regardless, I'm just amazed at how far we've come with NLP.<br />
<br />
Along those lines, I recently had an academic discussion this week around syntax and semantics in machine-to-machine interfaces (APIs), and how that correlates to user/consumer intent -- specifically whether or not you can infer user intent from captured REST calls. (FWIW -- I believe you can infer intent (at least some notion of it), although it might only be a portion of the user's goal, and may be unintentionally misaligned with the semantics of the call.)<br />
<br />
With that conversation fresh in my mind, I found it amusing that the developer API for Echo specifically calls out "intent" and defines it as:<br />
<br />
"In the context of Alexa apps, an intent represents a high-level action that fulfills a user’s spoken request. Intents can optionally have arguments called slots. Note that intents for Alexa apps are not related in any way to Android intents." - Echo Developer : Getting Started Guide<br />
<br />
That made me nostalgic. I loved my days in NLP, and it's absolutely phenomenal to see how things have played out over the years... <br />
<br />
(NOTE: what follows is almost entirely self-serving, and you may not get anything out of it. I am not responsible for any of the time you lose in reading it ;)<br />
<br />
I think I was eighteen when I started working at the Natural Language Processing (NLP) group within Unisys. I was one of the many developers building those terrible voice recognition systems on the other end of the phone when you dialed in for customer service and received anything but that. We frustrated our end-users, but -- I got to work with some amazing people: <a href="http://www.conversational-technologies.com/aboutus.html">Debbie Dahl</a>, <a href="https://www.linkedin.com/pub/bill-scholz/0/34/929">Bill Scholz</a>, and <a href="https://www.linkedin.com/in/jamessirwin">Jim Irwin</a>. <br />
<br />
And we thought we were smart. We built all sorts of <a href="https://www.google.com.tr/patents/US20060206299">tools that helped map text into actions</a> ("intents"), and <a href="http://www.google.com/patents/US20050028085">new fangled web servers for voice recognition</a>. They were good times, but frankly we were only inflicting pain on people. Voice recognition wasn't there yet. We spent our time trying to engineer the questions properly, to guide users to answer with certain terms to make it easy on the voice recognition system. (The year was (1995-1999ish)<br />
<br />
For a time after that, I wanted nothing to do with voice recognition. I still loved NLP though, and went on to build a system to automate email routing and responses for customer service. That worked because it was numbers game. If the system was confident enough to answer 50% of the customer inquires, it meant they didn't need humans to respond to that subset, which saved moolah. We went on to extend that into a real-time instant messaging over the web (<a href="http://www.kana.com/customer-service/datasheets/kana-iq-for-customers.pdf">Kana IQ</a>). Again, good times and lots of <a href="http://www.google.com/patents/US6560590">brain work/patents in the process</a>, and we congratulated ourselves for figuring out how to map a Bayesian inference engine on to a grammar. (1999-2001ish) But there is no way we could have put that system directly in the hands of consumers with no human support.<br />
<br />
However -- it was during this time that I started appreciating the difference between: Syntax, Semantics, Intents and Goals. Here are the straight-up definitions:<br />
<br />
<b>Syntax </b>: the arrangement of words and phrases to create well-formed sentences in a language.<br />
<b>Semantics </b>: the branch of linguistics and logic concerned with meaning.<br />
<b>Intent </b>: the reason for which something is done or created or for which something exists.<br />
<b>Goal </b>: the object of a person's ambition or effort; an aim or desired result.<br />
<br />
Consider the process of taking characters in a string as input and converting it into actions that help the user achieve their goal. First, you need to consider <i>syntax</i>. In this part of the process, you are converting useless string of characters into related tokens. There are lots of ways to do this, and multiple pieces to the puzzle (e.g. Part of Speech Tagging), etc. Back at Brown, I had the pleasure of studying under Eugene Charniak, and his book on Statistical Natural Language Learning became a favorite of mine (after initially hating it for a semester ;). Ever since that course, I've fallen back on Context-Free-Grammars (CFGs) and chart-parsing to attempt to relate word tokens to each other in a sentence.<br />
<br />
Once you have related tokens, and you know the parts of speech (ADJ, NOUN, etc), and how they relate (ADJ _modifies_ <modifies>NOUN) via chart-parsing, you can attempt to assign semantics. IMHO -- this is the hard part. </modifies><br />
<br />
After Kana, I took a job with a company that recorded patient/doctor conversations, transcribed them, and then attempted to perform computational linguistics on them. If you can imagine, trying to discern between a mention of a *symptom* and a *side-effect* is incredibly difficult. You not only need solid parsing to associate the terms in the sentence, but you need a knowledge base to know what those terms mean, and the context in which they are being used. In this phase, we are assigning meaning to the terms. (i.e. semantics) Sure, we used Hadoop for the NLP processing, but more horse power didn't translate into better results... (now 2008-2010ish) <br />
<br />
Even assuming you can properly assign semantics, you may misinterpret the intent of a communique. Even with human-to-human communication, this happens all the time. English is a horrible language. And the same thing can happen with machines. However, you have to assume that more often than not, assuming you can parse the sentence (i.e. assign syntax), and you can interpret meaning (i.e. semantics), you can infer some notion of intent from the textual gibberish. Otherwise, communication is just fundamentally broken.<br />
<br />
With all this in mind -- back to Echo. <br />
<br />
Amazon nailed it. I see the 20 years of my NLP frustration, solved in a 12" cylinder. It rarely misses on the voice recognition. And they have a fantastic engine taking phonemes to "intent", allowing developers to plugin at that highest layer. Boo yah. That's empowering. I might just build an app for Echo. Maybe an AWS integration -- "Alexa, please expand my AWS cluster by 10 nodes". It'd be therapeutic. ;)<br />
<br />
<br />
<br />
<br />
<br />
<br />Brian O'Neillhttp://www.blogger.com/profile/04198208062865899011noreply@blogger.com0tag:blogger.com,1999:blog-6872023396647681597.post-59989451188802787422015-05-14T06:46:00.002-07:002015-05-14T06:46:49.620-07:00Spark SQL against Cassandra Example<br />
Spark SQL is awesome. It allows you to query any Resilient Distributed Dataset (RDD) using SQL. (including data stored in Cassandra!)<br />
<br />
First thing to do is to create a SQLContext from your SparkContext. I'm using Java so... <br />
(sorry -- i'm still not hip enough for Scala) <br />
<br />
<div style="background: #ffffff; border-width: .1em .1em .1em .8em; border: solid gray; overflow: auto; padding: .2em .6em; width: auto;">
<pre style="line-height: 125%; margin: 0;"> JavaSparkContext <span style="line-height: 125%;">context </span><span style="color: #333333; line-height: 125%;">=</span><span style="line-height: 125%;"> </span><span style="color: #008800; font-weight: bold; line-height: 125%;">new</span><span style="line-height: 125%;"> JavaSparkContext</span><span style="color: #333333; line-height: 125%;">(</span><span style="line-height: 125%;">conf</span><span style="color: #333333; line-height: 125%;">);</span></pre>
<pre style="line-height: 125%; margin: 0;"> JavaSQLContext <span style="line-height: 125%;">sqlContext </span><span style="color: #333333; line-height: 125%;">=</span><span style="line-height: 125%;"> </span><span style="color: #008800; font-weight: bold; line-height: 125%;">new</span><span style="line-height: 125%;"> JavaSQLContext</span><span style="color: #333333; line-height: 125%;">(</span><span style="line-height: 125%;">context</span><span style="color: #333333; line-height: 125%;">);</span></pre>
<pre style="line-height: 125%; margin: 0;"></pre>
</div>
<br />
Now you have a SQLContext, but you have no data. Go ahead and create an RDD, just like you would in regular Spark:<br />
<br />
<!-- HTML generated using hilite.me --><br />
<div style="background: #ffffff; border-width: .1em .1em .1em .8em; border: solid gray; overflow: auto; padding: .2em .6em; width: auto;">
<pre style="line-height: 125%; margin: 0;"> JavaPairRDD<span style="color: #333333;"><</span>Integer<span style="color: #333333;">,</span> Product<span style="color: #333333;">></span> productsRDD <span style="color: #333333;">=</span> </pre>
<pre style="line-height: 125%; margin: 0;"> javaFunctions<span style="color: #333333;">(</span>context<span style="color: #333333;">).</span><span style="color: #0000cc;">cassandraTable</span><span style="color: #333333;">(</span><span style="background-color: #fff0f0;">"test_keyspace"</span><span style="color: #333333;">,</span> <span style="background-color: #fff0f0;">"products"</span><span style="color: #333333;">,</span>
productReader<span style="color: #333333;">).</span><span style="color: #0000cc;">keyBy</span><span style="color: #333333;">(</span><span style="color: #008800; font-weight: bold;">new</span> Function<span style="color: #333333;"><</span>Product<span style="color: #333333;">,</span> Integer<span style="color: #333333;">>()</span> <span style="color: #333333;">{</span>
<span style="color: #555555; font-weight: bold;">@Override</span>
<span style="color: #008800; font-weight: bold;">public</span> Integer <span style="color: #0066bb; font-weight: bold;">call</span><span style="color: #333333;">(</span>Product product<span style="color: #333333;">)</span> <span style="color: #008800; font-weight: bold;">throws</span> Exception <span style="color: #333333;">{</span>
<span style="color: #008800; font-weight: bold;">return</span> product<span style="color: #333333;">.</span><span style="color: #0000cc;">getId</span><span style="color: #333333;">();</span>
<span style="color: #333333;">}</span>
<span style="color: #333333;">});</span>
</pre>
</div>
<br />
(The example above comes from the <a href="https://github.com/boneill42/spark-on-cassandra-quickstart">spark-on-cassandra-quickstart project</a>, as described in <a href="http://brianoneill.blogspot.com/2015/04/holy-momentum-batman-spark-and.html">my previous post</a>.)<br />
<br />
Now that we have a plain vanilla RDD, we need to spice it up with a schema, and let the sqlContext know about it. We can do that with the following lines:<br />
<!-- HTML generated using hilite.me --><br />
<div style="background: #ffffff; border-width: .1em .1em .1em .8em; border: solid gray; overflow: auto; padding: .2em .6em; width: auto;">
<pre style="line-height: 125%; margin: 0;"> JavaSchemaRDD schemaRDD <span style="color: #333333;">=</span> sqlContext<span style="color: #333333;">.</span><span style="color: #0000cc;">applySchema</span><span style="color: #333333;">(</span>productsRDD<span style="color: #333333;">.</span><span style="color: #0000cc;">values</span><span style="color: #333333;">(),</span> Product<span style="color: #333333;">.</span><span style="color: #0000cc;">class</span><span style="color: #333333;">);</span>
sqlContext<span style="color: #333333;">.</span><span style="color: #0000cc;">registerRDDAsTable</span><span style="color: #333333;">(</span>schemaRDD<span style="color: #333333;">,</span> <span style="background-color: #fff0f0;">"products"</span><span style="color: #333333;">);</span>
</pre>
</div>
<br />
Shazam. Now your sqlContext is ready for querying. Notice that it inferred the schema from the Java bean. (Product.class). (Next blog post, I'll show how to do this dynamically)<br />
<br />
You can prime the pump with a:<br />
<br />
<div style="background: #ffffff; border-width: .1em .1em .1em .8em; border: solid gray; overflow: auto; padding: .2em .6em; width: auto;">
<pre style="line-height: 125%; margin: 0;"> System<span style="color: #333333;">.</span><span style="color: #0000cc;">out</span><span style="color: #333333;">.</span><span style="color: #0000cc;">println</span><span style="color: #333333;">(</span><span style="background-color: #fff0f0;">"Total Records = ["</span> <span style="color: #333333;">+</span> productsRDD<span style="color: #333333;">.</span><span style="color: #0000cc;">count</span><span style="color: #333333;">()</span> <span style="color: #333333;">+</span> <span style="background-color: #fff0f0;">"]"</span><span style="color: #333333;">);</span>
</pre>
</div>
<br />
The count operation forces Spark to load the data into memory, which makes queries like the following lightning fast:<br />
<!-- HTML generated using hilite.me --><br />
<div style="background: #ffffff; border-width: .1em .1em .1em .8em; border: solid gray; overflow: auto; padding: .2em .6em; width: auto;">
<pre style="line-height: 125%; margin: 0;"> JavaSchemaRDD result <span style="color: #333333;">=</span> sqlContext<span style="color: #333333;">.</span><span style="color: #0000cc;">sql</span><span style="color: #333333;">(</span><span style="background-color: #fff0f0;">"SELECT id from products WHERE price < 0.50"</span><span style="color: #333333;">);</span>
<span style="color: #008800; font-weight: bold;">for</span> <span style="color: #333333;">(</span>Row row <span style="color: #333333;">:</span> result<span style="color: #333333;">.</span><span style="color: #0000cc;">collect</span><span style="color: #333333;">()){</span>
System<span style="color: #333333;">.</span><span style="color: #0000cc;">out</span><span style="color: #333333;">.</span><span style="color: #0000cc;">println</span><span style="color: #333333;">(</span>row<span style="color: #333333;">);</span>
<span style="color: #333333;">}</span></pre>
</div>
<br />
That's it. Your off to the SQL races.<br />
<div>
<br /></div>
<div>
<br /></div>
<div>
P.S. If you try querying the sqlContext without applying a schema and/or without registering the RDD as a table, you may see something similar to this:</div>
<br />
<pre><span style="color: red;">Exception in thread "main" org.apache.spark.sql.catalyst.errors.package$TreeNodeException: Unresolved attributes: 'id, tree:</span>
'Project ['id]
'Filter ('price < 0.5)
NoRelation$
</pre>
<div>
<br /></div>
Brian O'Neillhttp://www.blogger.com/profile/04198208062865899011noreply@blogger.com0