Wednesday, March 21, 2012

Cassandra Indexing: The good, the bad and the ugly

Within NoSQL, the operations of indexing, fetching and searching for information are intimately tied to the physical storage mechanisms.  It is important to remember that rows are stored across hosts, but a single row is stored on a single host. (with replicas)  Columns families are stored in sorted order, which makes querying a set of columns efficient (provided you are spanning rows).

The Bad : Partitioning

One of the tough things to get used to at first is that without any indexes queries that span rows can be (very) bad.  Thinking back to our storage model however, that isn't surprising.  The strategy that Cassandra uses to distribute the rows across hosts is called Partitioning. 

Partitioning is the act of carving up the range of rowkeys assigning them into the "token ring", which also assigns responsibility for a segment (i.e. partition) of the rowkey range to each host.  You've probably seen this when you initialized your cluster with a "token".  The token gives the host a location along the token ring, which assigns responsibility for a section of the token range. Partitioning is the act of mapping the rowkey into the token range.  

There are two primary partitioners: Random and Order Preserving.  They are appropriately named.  The RandomPartitioner hashes the rowkeys into tokens.  With the RandomPartitioner, the token is a hash of the rowkey.  This does a good job of evenly distributing your data across a set of nodes, but makes querying a range of the rowkey space incredibly difficult. From only a "start rowkey" value and an "end rowkey" value, Cassandra can't determine what range of the token space you need.  It essentially needs to perform a "table scan" to answer the query, and a "table scan" in Cassandra is bad because it needs to go to each machine (most likely ALL machines if you have a good hash function) to answer the query.

Now, at the great cost of even data distribution, you can employ the OrderPreservingPartitioner (OPP).  I am *not* down with OPP.  The OPP preserves order as it translates rowkeys into tokens.  Now, given a start rowkey value and a end rowkey value, Cassandra *can* determine exactly which hosts have the data you are looking for.  It computes the start value to a token the end value to a token, and simply selects and returns everything in between. BUT, by preserving order, unless your rowkeys are evenly distributed across the space, your tokens won't be either and you'll get a lopsided cluster, which greatly increases the cost of configuration and administration of the cluster. (not worth it)

The Good : Secondary Indexes

Cassandra does provide a native indexing mechanism in Secondary Indexes.  Secondary Indexes work off of the columns values.   You declare a secondary index on a Column Family.  Datastax has good documentation on the usage.  Under the hood,  Cassandra maintains a "hidden column family" as the index. (See Ed Anuff's presentation for specifics)  Since Cassandra doesn't maintain column value information in any one node, and secondary indexes are on columns value (rather than rowkeys), a query still needs to be sent to all nodes.  Additionally, secondary indexes are not recommended for high-cardinality sets.  I haven't looked yet, but I'm assuming this is because of the data model used within the "hidden column family".  If the hidden column family stores a row per unique value (with rowkeys as columns), then it would mean scanning the rows to determine if they are within the range in the query.

From Ed's presentation:
  • Not recommended for high cardinality values(i.e.timestamps,birthdates,keywords,etc.)
  • Requires at least one equality comparison in a query--not great for less-than/greater-than/range queries
  • Unsorted - results are in token order, not query value order
  • Limited to search on datatypes, Cassandra natively understands
With all that said, secondary indexes work out of the box and we've had good success using them on simple values.

The Ugly : Do-It-Yourself (DIY) / Wide-Rows

Now, beauty is in the eye of the beholder.  One of the beautiful things about NoSQL is the simplicity.  The constructs are simple: Keyspaces, Column Families, Rows and Columns.  Keeping it simple however means sometimes you need to take things into your own hands.

This is the case with wide-row indexes.  Utilizing Cassandra's storage model, its easy to build your own indexes where each row-key becomes a column in the index.  This is sometimes hard to get your head around, but lets imagine we have a case whereby we want to select all users in a zip code.  The main users column family is keyed on userid, zip code is a column on each user row.  We could use secondary indexes, but there are quite a few zip codes.  Instead we could maintain a column family with a single row called "idx_zipcode".  We could then write columns into this row of the form "zipcode_userid".  Since the columns are stored in sorted order, it is fast to query for all columns that start with "18964" (e.g. we could use 18964_ and 18964_ZZZZZZ as start and end values).

One obvious downside of this approach is that rows are self-contained on a host. (again except for replicas)  This means that all queries are going to hit a single node.  I haven't yet found a good answer for this.

Additionally, and IMHO, the ugliest part of DIY wide-row indexing is from a client perspective.  In our implementation, we've done our best to be language agnostic on the client-side, allowing people to pick the best tool for the job to interact with the data in Cassandra.  With that mentality, the DIY indexes present some trouble.  Wide-rows often use composite keys (imagine if you had an idx_state_zip, which would allow you to query by state then zip).  Although there is "native" support for composite keys, all of the client libraries implement their own version of them (Hector, Astyanax, and Thrift).   This means that client needing to query data needs to have the added logic to first query the index, and additionally all clients need to construct the composite key in the same manner.

Making It Better...
For this very reason, we've decided to release two open source projects that help push this logic to the server-side.  The first project is Cassandra-Triggers.  This allows you to attached asynchronous activities to writes in Cassandra.  (one such activity could be indexing)  We've also released Cassandra-Indexing.  This is hot off the presses and is still in its infancy (e.g. it only supports UT8Types in the index), but the intent is to provide a generic server-side mechanism that indexes data as its written to Cassandra.  Employing the same server-side technique we used in Cassandra-Indexing, you simply configure the columns you want indexed, and the AOP code does the rest as you write to the target CF.  As always, questions, comments and thoughts are welcome. (especially if I'm off-base somewhere)

Friday, March 2, 2012

Cassandra Triggers for Indexing and RDBMS Synchronization

We love Cassandra as a data store, but unfortunately it doesn't support all of our use cases natively.  We need to support ad hoc structural queries, online analytics, and full-text searches of the data stored in Cassandra.    These use cases are best supported by other storage mechanisms: indexes for search and RDBMS + BI for reporting and analysis.

Initially we took a batch approach to the problem, relying on Hadoop and Map/Reduce jobs to keep the external systems up to date.  We would perform map/reduce jobs over column families to bulk update the external systems.  This had obvious draw backs.  Until the batch process completes, the index and the RDBMS are out of synch with Cassandra.  Additionally, we would run over large portions of the column family even though only a small number of records had changed.

To keep the other systems synchronized, we could have complicated the cassandra clients, embedding the logic to orchestrate updates to all of the relevant systems, but that seemed like a nightmare. In the end, we decided to go for real-time trigger-like functionality.  This removes the burden off of the client and allows us to keep other systems in synch in near real-time.

Maxim Grinev came to the same conclusion and submitted a patch to Cassandra, which triggered a lengthy discussion. (pun intended)

In the end, we decided to implement our own trigger mechanism using Aspect-Oriented Programming (AOP). Our mechanism is roughly based on Jonathan Ellis's Crack-Smoking Commit Log (CSCL). For each column family mutation, we write to a commit log. The log entries are then processed asynchronously by the triggers. Upon successful execution, the log entry is removed. We've released the project at github:

The design is certainly heavy and the documentation is still a bit rough around the edges, but its small amount of code and it is working like a champ. We've setup installation and configuration instructions. Let us know if you have any trouble getting started.

Thursday, March 1, 2012

Code Coverage: Cobertura, and Testing Load-time Weaving with Maven

Our cassandra-triggers implementation uses AOP. Virgil also uses it for connection pooling and retries. More specifically, each of these projects uses load-time weaving, where the byte code of the classes is changed at runtime. For testing purposes, we have the following added to our pom file:
This adds the aspectjweaver agent to the jvm runtime, which enables and performs the load-time weaving. When we went to gather code coverage information on the projects, we ended up with the following exception out of aspectj.
[IsolatedClassLoader@1342ba4] warning register definition failed -- (BCException) malformed class file
malformed class file
org.aspectj.weaver.BCException: malformed class file
 at org.aspectj.weaver.bcel.Utility.makeJavaClass(
 at org.aspectj.weaver.bcel.BcelWeaver.addLibraryAspect(
Evidently, aspectj doesn't like cobertura instrumented classes. After much googling, I couldn't find a good solution. There appeared to be a lot of back and forth between cobertura and aspectj. To avoid the situation entirely, I simply decided to maintain to pom files. I swapped out the plugin above that adds the agent jvm, for the snippet below that enables compile-time weaving:
This swaps from load-time weaving during testing to the compile-time weaving, which allows cobertura and aspectj to play well together, which means you'll get accurate code coverage numbers in Sonar.