Wednesday, October 28, 2015

Diagnosing memory leaks in Java

Every time I suspect a memory leak, I have to go dig up these commands.
So, here they are for posterity's sake:

First, I use the following command to monitor the process over time:

while ( sleep 1 ) ; do ps -p $PID -o %cpu,%mem,rss  ; done

(and/or New Relic if you have it ;)

If you see memory creeping up, it could be your VM settings. If you haven't explicitly specified memory settings to the JVM, it will default them. To get the defaults, use the following command:

java -XX:+PrintFlagsFinal -version | grep -i HeapSize

If those are out of line with what you want, then you'll need to specify the memory settings for the
JVM. You can set minimum and maximum heap sizes with the following:

java -Xms128m -Xmx256m

Once you have sane memory settings, and you can monitor the process, you might legitimately still see memory increasing over time. To get insight into that, you can start by taking a look at the histogram of object instances using the following:

jmap -histo $PID

If that isn't enough information then you can take heap dumps with the following command:

jmap -dump:format=b,file=/tmp/dump1.hprof $PID

Typically, I'd take two heap dumps and then compare them using jhat using the following command:

jhat -baseline /tmp/dump1.hprof /tmp/dump2.hprof

That fires up an HTTP server that you can use to explore the delta between those two heap dumps. By default, the HTTP server will start on port 7000, which you can just hit in a browser.

If you are behind a firewall, but have ssh access, you can tunnel over to the port using:

ssh -L 7000:localhost:7000 $HOST

If you scroll down to the bottom of the first page, you will see two useful links: "
That will show you all "new" instances between the different heaps, which should give you some idea of where your leak is coming from.  Screenshot below:

And there you have it, a quick synopsis of the magic command-lines you need when diagnosing memory leaks. (which I always forget)

Tuesday, October 27, 2015

Using Gradle with AWS and S3 (w/ DefaultAWSCredentialsProviderChain and InstanceProfileCredentialsProvider (FTW!))

We recently switched over to gradle as our build mechanism.  As part of that switchover, we wanted to be able to build w/o external dependencies (e.g. Maven Central).  We tossed around the idea of Artifactory, but in the end we decided to keep it simple and push our jar files into S3.

This turned out to be remarkably easy.   First, we added a task to copy down our dependencies:

task copyRuntimeLibs(type: Copy) {
    from configurations.runtime
    into "$buildDir/libs"

With that, we simply sync'd our dependencies up to S3 using aws cli:

aws s3 sync build/libs/ s3://yourbucket/gradle/

That deposits the jar files into your S3 bucket.  For example:


For the next trick, you need to get gradle to pull those artifacts from S3.  You can do this by declaring an *ivy* repository in your build.gradle. e.g.

repositories {
    ivy {
         url "s3://yourbucket/gradle/"
         credentials(AwsCredentials) {
            accessKey "YOUR_AWSAccessKeyId"
            secretKey "YOUR_AWSSecretKey"
         layout "pattern", {
            artifact "[artifact]-[revision].[ext]"

That's it. HOWEVER....

If you are as paranoid about security as we are, you'll want to use EC2 instance credentials for system users like Jenkins, instead of having keys and secrets floating around that are known to many. Fortunately, the AWS Java SDK can do this out-of-the-box.   It is part of the DefaultAWSCredentialsProviderChain.  With the instance profile credentials, you don't need to specify a key and secret, instead it retrieves the security credentials directly from the host using the Amazon EC2 Instance Metadata Service.

UNFORTUNATELY, Gradle is hard-coded to use the BasicAwsCredentials!
See this discussion thread.

The gradle crew is working on a larger design to address security, and because of that they are not going to merge this patch.   So, we took matters into our own hands and forked the 2.8 branch of gradle to make the change:

If you want to use instance profile credentials feel free to pull that code and follow the instructions in this commit.

With that patch, you can simply omit the credentials from your build.gradle. (woohoo!)

repositories {
    ivy {
         url "s3://yourbucket/gradle/"
         layout "pattern", {
            artifact "[artifact]-[revision].[ext]"

It will pickup the credentials from your instance metadata, and you'll be on your way to Cape May (S3).

FWIW, hopefully this helps people.

Thursday, October 22, 2015

Reverse Engineering Kinesis Aggregation and Sharding (and the magic "a" partitionKey)

I love open source.  I love being able to take things apart to understand them.  And for that reason, I'd like to thank Amazon for open sourcing Kinesis Consumer Library and Kinesis Producer Library.  It allowed us to get to the bottom of a couple of strange observations and reverse engineer the design for Kinesis Record aggregation in the KPL. (at least we think so?)

When we started scale testing Kinesis, we saw a couple strange observations:

(1) The velocity of PUTs (records per second) seemed to scale/increase with the number of shards.
(2) When we attached to the stream using the AWS CLI, we saw records with partition key: "a"

First, let's discuss the PUT record velocity.  One component of Amazon Kinesis's pricing includes the number of PUT records per month.  Specifically:

PUT Payload Units, per 1,000,000 units$0.014

So, mo' PUTs is mo' money.  Fortunately, the Kinesis Producer Library (KPL) lets you aggregate multiple records into a single PUT.    If you are optimizing for cost, you want to maximize that records per PUT ratio. (and obviously at a 1:1 ratio you are paying the most)  But, the amount of aggregation you can perform depends on record throughput and your tolerance for latency.

KPL ingests records, accumulating them in a buffer.  The buffering is configured via the file, and the key parameter in the system is the RecordMaxBufferedTime, which specifies the maximum amount of time that the KPL will wait before it sends a PUT.   After this maximum amount of time passes, KPL PUTs the buffered records, regardless of how many it has accumulated/aggregated.  If your system has low throughput, KPL sends partially filled PUTs. (which is more expensive)

For example,
If your throughput is 1 record / second, and you have RecordMaxBufferedTime set to 1 second.  You will have a compression ratio of 1:1 (not good).  Now, if you increase that duration from 1 second to 4 seconds, you will immediately quadruple your compression ratio (and cut costs by 75%!).

When we started scale testing, we saw an order of magnitude more PUT records than we saw during our initial testing (on streams with few shards).  We noticed that the velocity of PUTs increased linearly with the number of shards.  For example, with 2X the number of shards, we saw 2X the number of PUTs.  This lead us to hypothesize that aggregation was somehow related to shards.

Our straw man hypothesis looked like this:

If our hypothesis was right, then locally the KPL was keeping an aggregate KinesisRecord (aka PUT) for each shard.  Then, when KPL hit the RecordMaxBufferedTime time it would flush all of those records, meaning that the minimum velocity of PUTs would be equal to the RecordMaxBufferedTime multiplied by the number of shards.

e.g. 100ms RecordBufferedTime = 10 PUTs / second / shard
which would be: 500 PUTs / second for 50 shards.

This is exactly what we saw.  Even with a trickling of traffic, we saw hundreds of PUT requests.

When you think about it, this makes sense.  Since a partition key determines which shard receives the record, a single PUT can't contain records destined for multiple shards, unless Amazon somehow fanned out the records on the backend.


Now, let's look at our second observation, the illusive "a" partition key.  When checking the accuracy of our stream data, we happened to attach using AWS CLI.  When we attached to the stream and read records directly using `aws kinesis get-records...`, we saw JSON data in the form:

  • PartitionKey"a",
  • Data:"84mawgo...rnz1f7h",
  • SequenceNumber"49555608841303555555555578136812826047777764134359384066"

Some of the JSON elements had the proper partitionKey in them, while others looked like the above. We had no idea where the mysterious "a" partitionKey was coming from.  Immediately, we thought we had a bug somewhere.  I proceeded to go into debug mode, and Base64 decoded the Data element of the JSON.  What I saw was... (and paraphrasing a bit...)

pk1 pk2 pk3 pk4 ?? data1 ?? data2 ?? data3 ?? data4

Very interesting... the beginning of the Data element had all of the partitionKeys from my records.  They were then followed by the data for each of those records.  Okay, so the data looked good.  And it looks like KPL just packs all of the partition keys up front, followed by the data.  But that did'n't explain the "a" partition key.  And if the partition key was always "a", how was the data getting distributed across shards?!

If you recall, the Kinesis Producer Library is a native application, coated in a Java wrapper.   If that "a" was coming from somewhere, it had to be deep in the native application.  I put on my waders, strapped on my spelunking headlamp and dusted off my C++.    After some digging, I eventually found the code(droid) I was looking for:

117 std::string KinesisRecord::partition_key() const {
118   if (items_.empty()) {
119     throw std::runtime_error(
120         "Cannot compute partition_key for empty container");
121   }
123   if (items_.size() == 1) {
124     return items_.front()->partition_key();
125   }
127   // We will always set an explicit hash key if we created an aggregated record.
128   // We therefore have no need to set a partition key since the records within
129   // the container have their own parition keys anyway. We will therefore use a
130   // single byte to save space.
131   return "a";
132 }

Aha  Mr. Mustard, that explains everything!   Kinesis typically hashes the partition key to determine the shard.  But if you have a a look at the Kinesis API, you'll notice that you can actually specify ExplicitHashKey.  When an ExplicitHashKey is provided, the partition key is ignored!

If we closely examine the code above, notice that whenever the aggregated KinesisRecord had a single record, it used that record's partition key.  These were the "good" records we were seeing through AWS CLI. However, whenever KPL had more than one record, it set the partition key to the magic "a", and set the ExplicitHashKey for the Kinesis Record (which contained multiple partition keys!).    It all became clear, and a rainbow unicorn galloped in from the street.

There you have it.

The case of the crazy "a", and the accelerated PUTs. Solved.
Until next time.  Long live open source software.

Tuesday, October 20, 2015

BaDaS Arsenal : from grep to graph in under 5 minutes w/ Pyplot

BaDaS = Big and Data and Science

These days, BaDaS is all the rage. And to be successful at it, you need an arsenal of weapons to gain insights into vast amounts of data quickly. And more and more, I believe python is the go-to swiss-army knife used in the field. And if you’ve read my previous blog, I believe iPython Notebook (a.k.a. Jupyter) is a great way to collaborate with other people when doing data analysis.


Recently, when I was analyzing the throughput/scalability of Kinesis and I wanted to graph the results, instead of turning to Excel to churn out the graph, I fired up the old iPython Notebook. (admittedly, I was also inspired by @AustinRochford’s excellent blog on Bayesian inference)

I was blown away. I was able to go from a log file to a visible graph in just minutes. Here is how I did it…

After a couple minutes of googling, I realized that the secret sauce was in matplotlib. More specifically, it was Pyplot FTW!

To get to work, I first grep’d the log files for each of my runs, and created a log file for each. I happened to be playing around with RecordBufferSizes in the Kinesis Producer Library (KPL), and ran an experiment at 10ms, 25ms, 50ms and 100ms buffer times.

I then fired up the trusty Notebook and started hacking away.

First, I imported pyplot:

from matplotlib import pyplot as plt

Then, I imported csv and dateutil to munge the data…

import csv
import dateutil

Finally, all I had to do was write some python to parse each data file and create a representative dataset. Pyplot took care of the rest.

In the below code, I loop through each CSV file and gather up the dataset for that file. The dataset is a dictionary called data. The dictionary contains two lists, one for timestamps, which is what will become the y axis, and one for the throughput (a.k.a. recordsPerSecond), which is what will become the x axis.

The first element of each row in the CSV contains the epoch time. But because I wanted a time series graph , I didn’t want to use the absolute timestamp. Instead, inline I just calculate the delta from the first timestamp I see. (neato!)

In the end, I stuff each of the data dictionaries, into an overarching dictionary that maps filename to the data dictionary representing the data for that file.

The final few lines tell pyplot to graph each of the datasets. There is some voodoo in there to set the color (hint: ‘r’ = red, ‘y’ = yellow, etc.)

In the end, you hit execute on the cell, and SHAZAM – it’s graphalicious.

%matplotlib inline
files = ['10ms.csv', '25ms.csv', '50ms.csv', '100ms.csv']

datasets = {}

for filename in files:
    data = {
        "timestamp": [],
        "recordsPerSecond": []
    with open(filename, 'rb') as csvfile:
        reader = csv.reader(csvfile, delimiter=',')
        starttime = None
        for row in reader:
            timestamp = dateutil.parser.parse(row[0])
            if (starttime is None):
                starttime = timestamp
            delta = (timestamp - starttime).total_seconds()
        datasets[filename] = data

    'r', label='10ms')
    'b', label="25ms")
    'g', label="50ms")
    'y', label="100ms")
plt.ylabel('Records / Second')

And FWIW, this entire blog post was written from within Jupyter!

If I get around to it, I’ll hopefully post what the actual results were when we played with buffer sizes with Kinesis. (HINT: Be careful – the buffer sizes control how much aggregation occurs, which not only affects throughput, but also affects cost!)

Sunday, October 11, 2015

A quick proof that Black is not White (to save all of mankind)

In the Hitchhiker's guide to the galaxy, there is a proof about the non-existence of God.  The reasoning roughly mimics that of the Ontological argument for God.

The passage has stuck with me:
"I refuse to prove that I exist," says God, "for proof denies faith, and without faith I am nothing." "But," says Man, "the Babel fish is a dead giveaway, isn't it? It could not have evolved by chance. It proves that You exist, and so therefore, by Your own arguments, You don't. QED".  "Oh dear," says God, "I hadn't thought of that," and promptly vanishes in a puff of logic. "Oh, that was easy," says Man, and for an encore goes on to prove that black is white and gets himself killed on the next zebra crossing.

I always thought it weird that Man is capitalized in this passage.  Does that in fact mean all of Man?  If so, it would be tragic if we all died during a zebra crossing simply because we did not have proof that black was not white.   Thus, I formally submit this proof for posterity, to avoid a Zebrapoclypse.

Proof that Black is not White:

Axiom (1): A chess board comprises 64 black and white squares.

Now, let us assume for contradiction that back is white. 

With black and white indistinguishable from one another, squares on a chess board would be indistinguishable from one another.  

Thus, the chess board would have only one square.

We know from (1) that a chess board has 64 squares, therefore our initial assumption must be incorrect.  Black must not be white.


Proof by contradiction, never fails. (or does it always fail? ;)

And there you have it.  All of mankind saved from a zebra crossing.

(oh, the things I think about while waiting in line to pay for pumpkins. =)

Thursday, October 8, 2015

Integrating Syslog w/ Kinesis : Anticipating use of the Firehose

On the heals of the Kinesis Firehose announcement, more people are going to be looking to integrate Kinesis with logging systems. (to expedite/simplify the ingestion of logs into S3 and Redshift)  Here is one take on solving that problem that integrates syslog-ng with Kinesis.

First, let’s have a look at the syslog-ng configuration. In the syslog-ng configuration, you wire sources to destinations:

source s_sys {
    udp(ip( port(514));

destination d_test { file("/var/log/monetate/test.log"
); };

destination d_fact_kinesis { tcp("localhost"
); };
log { source(s_sys); destination(d_test); destination(d_kinesis); };

In this example, we have one source of events: UDP, and we have two destinations: one file and one TCP. We wire them together in the last log statement. With this configuration, any packets syslog-ng receives via UDP, are written to the TCP port and to file. File is our backup strategy, just in case something happens to Kinesis, we will also persist the events locally.  TCP is our path to Kinesis.

On the TCP side of things, we can easily implement a simple TCP server in Java that receives the messages from syslog-ng and sends them out via KPL. At Monetate, we’ve done exactly that. (and I’ll look to see if we can open source the solution). However,  it is really important to understand the syslog-ng flow control, and more specifically how we will handle the back pressure if/when Kinesis starts to back up.

Let's consider how we back that thang’ up.

Here is a visual that shows the whole flow:

At each point in the sequence, we need to make sure that we are prepared to handle back-pressure. Specifically, we need to validate the behavior of each component when the next downstream component refuses to accept additional data.

For example, if our little TCP server (“bridge”) blindly accepted packets from the TCP stream, we would quickly OoM the JVM if/when Kinesis slowed down. Thus, in the “bridge” we need to apply back-pressure upstream, to prevent that from happening. More specifically, we need to stop consuming off of the TCP stream, and hope that syslog-ng reacts appropriately.

Fortunately, syslog-ng has accounted for this case with their “output buffer”, shown above.  The output buffer is used to store messages for delivery to destinations. If the max-size is reached on that buffer, than the destination is “disconnected” as not to crash syslog. You control the max-size of the buffer through the log_fifo_size setting in syslog-ng.

Different destionation types have different behaviors under this scenario, and we tested out a few of them. First we considered a straight-up pipe between syslog and our bridge process using the “program” destination within syslog-ng. That worked well, but in the event of a backed-up queue, syslog-ng kills the program and respawns a new process. This was less than ideal. We also considered using UDP. That worked as well, and in effect eliminated the back pressure scenario because UDP is “fire and forget”.  Under heavy load however, we noticed packet drops, which meant we were losing events.

In the end, we decided to go with TCP.  With TCP we won’t silently lose messages, and syslog-ng won’t continually restart our process.  With that decision, we needed to confirm the behavior of syslog-ng.  And for that, we needed to monitor syslog-ng. This handy dandy command-line does exactly that:

syslog-ng -Fevd >& syslog.log

While monitoring syslog, we tested two failure scenarios.  The first tested a dead bridge (no TCP listener).  In this situation, syslog-ng won’t be able to connect.  We needed to make sure syslog-ng behaves well under that case. To test this, we ran 100K messages through the system over 40 seconds (>2K/sec). When monitoring syslog-ng, we saw it trying to reconnect:

$grep 4242 syslog.log | grep failed
Connection failed; server='AF_INET(', error='Connection refused (111)', time_reopen='10'
Connection failed; server='AF_INET(', error='Connection refused (111)', time_reopen='10'

All the messages made it through to our log file (hallelujah!), which means the downed TCP connection had no affect on the log file destination. And furthermore, syslog-ng continually retried to establish the TCP connection, so when the process did come back up, it reconnected! nice! One very important thing to note from the above output is the “time_reopen”. It turns out that this is a global configuration option, which tells syslog-ng how long to wait until re-attempting to establish the connection.  So, the behavior was solid – and we can actually configure how noisy things get if/when we lose the bridge process.

For the final test, we needed to see what happens when we apply back-pressure from the java process itself.  In this case, syslog-ng can connect, but the java process refuses to read off the stream/socket.  For this, we ran the same 100K test, but paused consuming in the java process to simulate a Kinesis slow-down. And again, we saw good things...  All the messages made it through to the log file, and this time we saw messages from syslog-ng indicating that the ouput buffer (fifo queue) was full:

syslog.log:Destination queue full, dropping message; queue_len='1000', mem_fifo_size='1000'
syslog.log:Destination queue full, dropping message; queue_len='1000', mem_fifo_size='1000'

And the log message tells you everything, after the queue fills up, it drops messages on the floor until it can re-establish the socket.

So, there you have it… to connect syslog to Kinesis, I’d recommend using a TCP output destination with some glue code between that and the Kinesis Producer Library (KPL). (and again, I’ll see if we can open source some of that) Just be careful, and apply the back pressure to syslog-ng.

Per our experiments, syslog-ng can back that thang up!

Tuesday, October 6, 2015

Destinations for syslog-ng : UDP + Program (Java)

This is just a quick mental note because I keep losing them...

Here are two quick gists that show how to configure destinations within syslog-ng.

UDP Destination

This destination will spew UDP datagram packets:
destination udp_spew { udp("localhost"
); };

Process Destination

This destination will spew log events at a java program:
destination program_spew { program("/opt/jdk1.7.0_79/bin/java -jar /mnt/foo.jar arg1 arg2"
); };;

Connecting the Destination to the Source

For both of these, don't forget to connect it to the source!
log { source(s_sys); ...; destination(program_spew); };

Monday, October 5, 2015

Getting started w/ Python Kinesis Consumer Library (KCL) (via IPython Notebook)

We are racing ahead with Kinesis.  With streaming events in hand, we need to get our data scientists tapped in. One problem, the KCL and KPL are heavily focused on Java, but our data scientists (and the rest of our organization) love Python. =)

We use IPython notebook to share python code/samples.  So, in this blog I'm going to get you up and running, consuming events from Kinesis, using Python and IPython notebook.  If you are already familiar with IPython notebook, you can pull the notebook from here:

If you aren't familiar with iPython Notebook, first go here to get a quick overview.  In short, IPython Notebook is a great little webapp that lets you play with python commands via the browser, creating a wiki/gist-like "recipe" that can be shared with others.  Once familiar, read on.

To get started with KCL-python, let's clone the amazon-kinesis-client-python github repo:

   git clone

Then, create a new virtualenv.  (and if you don't use virtualenv, get with the program =)
   virtualenv venv/kcl 
   source venv/kcl/bin/activate

Next, get IPython Notebook setup.  Install ipython notebook, by running the following commands from the directory containing the cloned repo:

   pip install "ipython[notebook]"
   ipython notebook

That should open a new tab in your browser.  Start a new notebook by clicking "New->Python 2".   A notebook comprises one or more cells.  Each cell has a type.   For this exercise, we'll use "Code" cells.   We'll take advantage of "magic" functions available in IPython Notebook.  We'll use run, which will execute a command.  Specifically, need execute the python setup for KCL.

The amazon-kinesis-client-python library actually rides on top of a Java process, and uses MultiLangDaemon for interprocess communication.  Thus, to get KCL setup you need to download the jars.  You can do that in the notebook by entering and executing the following in the first cell:

   run download_jars

Next, you need to install those jars with:

   run install

That command installed a few libraries for python (specifically -- it installed boto, which is the aws library for python).  Because the notebook doesn't dynamically reload, you'll need to restart IPython Notebook and re-open the notebook. (hmmm... Is there a way to reload the virtualenv used by the notebook w/o restarting?)

Once, reloaded you are ready to start emitting events into a kinesis stream.  For now, we'll use the sample application in the KCL repo.  Create a new cell and run:

   run samples/ --stream words -w cat -w dog -w bird -w lobster

From this, you should see:

Connecting to stream: words in us-east-1
Put word: cat into stream: words
Put word: dog into stream: words
Put word: bird into stream: words
Put word: lobster into stream: words

Woo hoo! We are emitting events to Kinesis!  Now, we need to consume them. =)

To get that going, you will want to edit  This file is loaded by KCL to configure the consumer.   Most importantly, have a look at the executableName property in that file.  This sets the name of the python code that the KCL will execute when it receives records.  In the sample, this is  

Have a look at that sample Python code.  You'll notice there are four important methods that you will need to implement:  init, process_records, checkpoint, and shutdown.  The purpose of those methods is almost self-evident, but the documentation in the sample is quite good.  Have a read through there.

Also in the, notice the AWSCredentialsProvider.  Since it is using Java underneath the covers, you need to set the Java classname of the credentials provider.  If left unchanged, it will use: DefaultAWSCredentialsProviderChain.

It is worth looking at that documentation, but probably the easiest route to get the authentication working is to create a ~/.aws/credentials file that contains the following:


Once you have your credentials setup, you can crank up the consumer by executing the following command in another notebook cell:

   run samples/ --print_command --java /Library/Java/JavaVirtualMachines/jdk1.8.0_60.jdk/Contents/Home/bin/java --properties samples/

Note that the --java parameter needs the location of the java executable.  Now, running that in the notebook will give you the command line that you can go ahead and execute to consume the stream.   Go ahead and cut-and-paste that into a command-line and execute it.  At this point you are up and running.

To get started with your own application, simply replace the code with your own, and update the properties file, and you should be off and running.

Now, I find this bit hokey.  There is no native python consumer (yet!).  And fact you actually need to run *java*, which will turn around and call python.  Honestly, with this approach, you aren't getting much from the python library over what you would get from the Java library.  (except for some pipe manipulation in MultiLangDaemon)  Sure, it is a micro-services approach... but maybe it would be better to simply run a little python web service (behind a load balancer?).   Then, we'd be able to scale the consumers better without re-sharding the Kinesis stream.  (One shard/KCL worker could fan-out work to many python consumers across many boxes)  You would need to be careful about checkpointing, but it might be a bit more flexible.  (and would completely decouple the "data science" components from Kinesis)  Definitely food for thought. =)

As always, let me know if you run into trouble.