I love open source. I love being able to take things apart to understand them. And for that reason, I'd like to thank Amazon for open sourcing Kinesis Consumer Library and Kinesis Producer Library. It allowed us to get to the bottom of a couple of strange observations and reverse engineer the design for Kinesis Record aggregation in the KPL. (at least we think so?)
When we started scale testing Kinesis, we saw a couple strange observations:
(1) The velocity of PUTs (records per second) seemed to scale/increase with the number of shards.
(2) When we attached to the stream using the AWS CLI, we saw records with partition key: "a"
First, let's discuss the PUT record velocity. One component of Amazon Kinesis's pricing includes the number of PUT records per month. Specifically:
PUT Payload Units, per 1,000,000 units | $0.014 |
So, mo' PUTs is mo' money. Fortunately, the Kinesis Producer Library (KPL) lets you aggregate multiple records into a single PUT. If you are optimizing for cost, you want to maximize that records per PUT ratio. (and obviously at a 1:1 ratio you are paying the most) But, the amount of aggregation you can perform depends on record throughput and your tolerance for latency.
KPL ingests records, accumulating them in a buffer. The buffering is configured via the kpl.properties file, and the key parameter in the system is the RecordMaxBufferedTime, which specifies the maximum amount of time that the KPL will wait before it sends a PUT. After this maximum amount of time passes, KPL PUTs the buffered records, regardless of how many it has accumulated/aggregated. If your system has low throughput, KPL sends partially filled PUTs. (which is more expensive)
For example,
If your throughput is 1 record / second, and you have RecordMaxBufferedTime set to 1 second. You will have a compression ratio of 1:1 (not good). Now, if you increase that duration from 1 second to 4 seconds, you will immediately quadruple your compression ratio (and cut costs by 75%!).
When we started scale testing, we saw an order of magnitude more PUT records than we saw during our initial testing (on streams with few shards). We noticed that the velocity of PUTs increased linearly with the number of shards. For example, with 2X the number of shards, we saw 2X the number of PUTs. This lead us to hypothesize that aggregation was somehow related to shards.
Our straw man hypothesis looked like this:
If our hypothesis was right, then locally the KPL was keeping an aggregate KinesisRecord (aka PUT) for each shard. Then, when KPL hit the RecordMaxBufferedTime time it would flush all of those records, meaning that the minimum velocity of PUTs would be equal to the RecordMaxBufferedTime multiplied by the number of shards.
e.g. 100ms RecordBufferedTime = 10 PUTs / second / shard
which would be: 500 PUTs / second for 50 shards.
This is exactly what we saw. Even with a trickling of traffic, we saw hundreds of PUT requests.
When you think about it, this makes sense. Since a partition key determines which shard receives the record, a single PUT can't contain records destined for multiple shards, unless Amazon somehow fanned out the records on the backend.
...
Now, let's look at our second observation, the illusive "a" partition key. When checking the accuracy of our stream data, we happened to attach using AWS CLI. When we attached to the stream and read records directly using `aws kinesis get-records...`, we saw JSON data in the form:
{
- PartitionKey: "a",
- Data:"84mawgo...rnz1f7h",
- SequenceNumber: "49555608841303555555555578136812826047777764134359384066"
Some of the JSON elements had the proper partitionKey in them, while others looked like the above. We had no idea where the mysterious "a" partitionKey was coming from. Immediately, we thought we had a bug somewhere. I proceeded to go into debug mode, and Base64 decoded the Data element of the JSON. What I saw was... (and paraphrasing a bit...)
pk1 pk2 pk3 pk4 ?? data1 ?? data2 ?? data3 ?? data4
If you recall, the Kinesis Producer Library is a native application, coated in a Java wrapper. If that "a" was coming from somewhere, it had to be deep in the native application. I put on my waders, strapped on my spelunking headlamp and dusted off my C++. After some digging, I eventually found the code(droid) I was looking for:
117 std::string KinesisRecord::partition_key() const { 118 if (items_.empty()) { 119 throw std::runtime_error( 120 "Cannot compute partition_key for empty container"); 121 } 122 123 if (items_.size() == 1) { 124 return items_.front()->partition_key(); 125 } 126 127 // We will always set an explicit hash key if we created an aggregated record. 128 // We therefore have no need to set a partition key since the records within 129 // the container have their own parition keys anyway. We will therefore use a 130 // single byte to save space. 131 return "a"; 132 }
Aha Mr. Mustard, that explains everything! Kinesis typically hashes the partition key to determine the shard. But if you have a a look at the Kinesis API, you'll notice that you can actually specify ExplicitHashKey. When an ExplicitHashKey is provided, the partition key is ignored!
If we closely examine the code above, notice that whenever the aggregated KinesisRecord had a single record, it used that record's partition key. These were the "good" records we were seeing through AWS CLI. However, whenever KPL had more than one record, it set the partition key to the magic "a", and set the ExplicitHashKey for the Kinesis Record (which contained multiple partition keys!). It all became clear, and a rainbow unicorn galloped in from the street.
There you have it.
The case of the crazy "a", and the accelerated PUTs. Solved.
Until next time. Long live open source software.
No comments:
Post a Comment