In this article by Nishant Neeraj, the author of the book Mastering Apache Cassandra – Second Edition, aims to set you into a perspective where you will be able to see the evolution of the NoSQL paradigm. It will start with a discussion of common problems that an average developer faces when the application starts to scale up and software components cannot keep up with it. Then, we’ll see what can be assumed as a thumb rule in the NoSQL world: the CAP theorem that says to choose any two out of consistency, availability, and partition-tolerance. As we discuss this further, we will realize how much more important it is to serve the customers (availability), than to be correct (consistency) all the time. However, we cannot afford to be wrong (inconsistent) for a long time. The customers wouldn’t like to see that the items are in stock, but that the checkout is failing. Cassandra comes into picture with its tunable consistency.
(For more resources related to this topic, see here.)
Problems in the RDBMS world
RDBMS is a great approach. It keeps data consistent, it’s good for OLTP (http://en.wikipedia.org/wiki/Online_transaction_processing), it provides access to good grammar, and manipulates data supported by all the popular programming languages. It has been tremendously successful in the last 40 years (the relational data model was proposed in its first incarnation by Codd, E.F. (1970) in his research paper A Relational Model of Data for Large Shared Data Banks). However, in early 2000s, big companies such as Google and Amazon, which have a gigantic load on their databases to serve, started to feel bottlenecked with RDBMS, even with helper services such as Memcache on top of them. As a response to this, Google came up with BigTable (http://research.google.com/archive/bigtable.html), and Amazon with Dynamo (http://www.cs.ucsb.edu/~agrawal/fall2009/dynamo.pdf).
If you have ever used RDBMS for a complicated web application, you must have faced problems such as slow queries due to complex joins, expensive vertical scaling, and problems in horizontal scaling. Due to these problems, indexing takes a long time. At some point, you may have chosen to replicate the database, but there was still some locking, and this hurts the availability of the system. This means that under a heavy load, locking will cause the user’s experience to deteriorate.
Although replication gives some relief, a busy slave may not catch up with the master (or there may be a connectivity glitch between the master and the slave). Consistency of such systems cannot be guaranteed. Consistency, the property of a database to remain in a consistent state before and after a transaction, is one of the promises made by relational databases. It seems that one may need to make compromises on consistency in a relational database for the sake of scalability. With the growth of the application, the demand to scale the backend becomes more pressing, and the developer teams may decide to add a caching layer (such as Memcached) at the top of the database. This will alleviate some load off the database, but now the developers will need to maintain the object states in two places: the database, and the caching layer. Although some Object Relational Mappers (ORMs) provide a built-in caching mechanism, they have their own issues, such as larger memory requirement, and often mapping code pollutes application code. In order to achieve more from RDBMS, we will need to start to denormalize the database to avoid joins, and keep the aggregates in the columns to avoid statistical queries.
Sharding or horizontal scaling is another way to distribute the load. Sharding in itself is a good idea, but it adds too much manual work, plus the knowledge of sharding creeps into the application code. Sharded databases make the operational tasks (backup, schema alteration, and adding index) difficult. To find out more about the hardships of sharding, visit http://www.mysqlperformanceblog.com/2009/08/06/why-you-dont-want-to-shard/.
There are ways to loosen up consistency by providing various isolation levels, but concurrency is just one part of the problem. Maintaining relational integrity, difficulties in managing data that cannot be accommodated on one machine, and difficult recovery, were all making the traditional database systems hard to be accepted in the rapidly growing big data world. Companies needed a tool that could support hundreds of terabytes of data on the ever-failing commodity hardware reliably. This led to the advent of modern databases like Cassandra, Redis, MongoDB, Riak, HBase, and many more. These modern databases promised to support very large datasets that were hard to maintain in SQL databases, with relaxed constrains on consistency and relation integrity.
NoSQL is a blanket term for the databases that solve the scalability issues which are common among relational databases. This term, in its modern meaning, was first coined by Eric Evans. It should not be confused with the database named NoSQL (http://www.strozzi.it/cgi-bin/CSA/tw7/I/en_US/nosql/Home%20Page). NoSQL solutions provide scalability and high availability, but may not guarantee ACID: atomicity, consistency, isolation, and durability in transactions. Many of the NoSQL solutions, including Cassandra, sit on the other extreme of ACID, named BASE, which stands for basically available, soft-state, eventual consistency.
Wondering about where the name, NoSQL, came from? Read Eric Evans’ blog at http://blog.sym-link.com/2009/10/30/nosql_whats_in_a_name.html.
The CAP theorem
In 2000, Eric Brewer (http://en.wikipedia.org/wiki/Eric_Brewer_%28scientist%29), in his keynote speech at the ACM Symposium, said, “A distributed system requiring always-on, highly-available operations cannot guarantee the illusion of coherent, consistent single-system operation in the presence of network partitions, which cut communication between active servers”. This was his conjecture based on his experience with distributed systems. This conjecture was later formally proved by Nancy Lynch and Seth Gilbert in 2002 (Brewer’s Conjecture and the Feasibility of Consistent, Available, Partition-Tolerant Web Services, published in ACMSIGACT News, Volume 33 Issue 2 (2002), page 51 to 59 available at http://webpages.cs.luc.edu/~pld/353/gilbert_lynch_brewer_proof.pdf).
Let’s try to understand this. Let’s say we have a distributed system where data is replicated at two distinct locations and two conflicting requests arrive, one at each location, at the time of communication link failure between the two servers. If the system (the cluster) has obligations to be highly available (a mandatory response, even when some components of the system are failing), one of the two responses will be inconsistent with what a system with no replication (no partitioning, single copy) would have returned. To understand it better, let’s take an example to learn the terminologies.
Let’s say you are planning to read George Orwell’s book Nineteen Eighty-Four over the Christmas vacation. A day before the holidays start, you logged into your favorite online bookstore to find out there is only one copy left. You add it to your cart, but then you realize that you need to buy something else to be eligible for free shipping. You start to browse the website for any other item that you might buy. To make the situation interesting, let’s say there is another customer who is trying to buy Nineteen Eighty-Four at the same time.
A consistent system is defined as one that responds with the same output for the same request at the same time, across all the replicas. Loosely, one can say a consistent system is one where each server returns the right response to each request.
In our example, we have only one copy of Nineteen Eighty-Four. So only one of the two customers is going to get the book delivered from this store. In a consistent system, only one can check out the book from the payment page. As soon as one customer makes the payment, the number of Nineteen Eighty-Four books in stock will get decremented by one, and one quantity of Nineteen Eighty-Four will be added to the order of that customer. When the second customer tries to check out, the system says that the book is not available any more.
Relational databases are good for this task because they comply with the ACID properties. If both the customers make requests at the same time, one customer will have to wait till the other customer is done with the processing, and the database is made consistent. This may add a few milliseconds of wait to the customer who came later.
An eventual consistent database system (where consistency of data across the distributed servers may not be guaranteed immediately) may have shown availability of the book at the time of check out to both the customers. This will lead to a back order, and one of the customers will be paid back. This may or may not be a good policy. A large number of back orders may affect the shop’s reputation and there may also be financial repercussions.
Availability, in simple terms, is responsiveness; a system that’s always available to serve. The funny thing about availability is that sometimes a system becomes unavailable exactly when you need it the most.
In our example, one day before Christmas, everyone is buying gifts. Millions of people are searching, adding items to their carts, buying, and applying for discount coupons. If one server goes down due to overload, the rest of the servers will get even more loaded now, because the request from the dead server will be redirected to the rest of the machines, possibly killing the service due to overload. As the dominoes start to fall, eventually the site will go down. The peril does not end here. When the website comes online again, it will face a storm of requests from all the people who are worried that the offer end time is even closer, or those who will act quickly before the site goes down again.
Availability is the key component for extremely loaded services. Bad availability leads to bad user experience, dissatisfied customers, and financial losses.
Network partitioning is defined as the inability to communicate between two or more subsystems in a distributed system. This can be due to someone walking carelessly in a data center and snapping the cable that connects the machine to the cluster, or may be network outage between two data centers, dropped packages, or wrong configuration. Partition-tolerance is a system that can operate during the network partition. In a distributed system, a network partition is a phenomenon where, due to network failure or any other reason, one part of the system cannot communicate with the other part(s) of the system. An example of network partition is a system that has some nodes in a subnet A and some in subnet B, and due to a faulty switch between these two subnets, the machines in subnet A will not be able to send and receive messages from the machines in subnet B. The network will be allowed to lose many messages arbitrarily sent from one node to another. This means that even if the cable between the two nodes is chopped, the system will still respond to the requests.
The following figure shows the database classification based on the CAP theorem:
An example of a partition-tolerant system is a system with real-time data replication with no centralized master(s). So, for example, in a system where data is replicated across two data centers, the availability will not be affected, even if a data center goes down.
The significance of the CAP theorem
Once you decide to scale up, the first thing that comes to mind is vertical scaling, which means using beefier servers with a bigger RAM, more powerful processor(s), and bigger disks. For further scaling, you need to go horizontal. This means adding more servers. Once your system becomes distributed, the CAP theorem starts to play, which means, in a distributed system, you can choose only two out of consistency, availability, and partition-tolerance. So, let’s see how choosing two out of the three options affects the system behavior as follows:
- CA system: In this system, you drop partition-tolerance for consistency and availability. This happens when you put everything related to a transaction on one machine or a system that fails like an atomic unit, like a rack. This system will have serious problems in scaling.
- CP system: The opposite of a CA system is a CP system. In a CP system, availability is sacrificed for consistency and partition-tolerance. What does this mean? If the system is available to serve the requests, data will be consistent. In an event of a node failure, some data will not be available. A sharded database is an example of such a system.
- AP system: An available and partition-tolerant system is like an always-on system that is at risk of producing conflicting results in an event of network partition. This is good for user experience, your application stays available, and inconsistency in rare events may be alright for some use cases. In our example, it may not be such a bad idea to back order a few unfortunate customers due to inconsistency of the system than having a lot of users return without making any purchases because of the system’s poor availability.
- Eventual consistent (also known as BASE system): The AP system makes more sense when viewed from an uptime perspective—it’s simple and provides a good user experience. But, an inconsistent system is not good for anything, certainly not good for business. It may be acceptable that one customer for the book Nineteen Eighty-Four gets a back order. But if it happens more often, the users would be reluctant to use the service. It will be great if the system could fix itself (read: repair) as soon as the first inconsistency is observed; or, maybe there are processes dedicated to fixing the inconsistency of a system when a partition failure is fixed or a dead node comes back to life. Such systems are called eventual consistent systems.
The following figure shows the life of an eventual consistent system:
Quoting Wikipedia, “[In a distributed system] given a sufficiently long time over which no changes [in system state] are sent, all updates can be expected to propagate eventually through the system and the replicas will be consistent”. (The page on eventual consistency is available at http://en.wikipedia.org/wiki/Eventual_consistency.)
Eventual consistent systems are also called BASE, a made-up term to represent that these systems are on one end of the spectrum, which has traditional databases with ACID properties on the opposite end.
Cassandra is one such system that provides high availability and partition-tolerance at the cost of consistency, which is tunable. The preceding figure shows a partition-tolerant eventual consistent system.
Cassandra is a distributed, decentralized, fault tolerant, eventually consistent, linearly scalable, and column-oriented data store. This means that Cassandra is made to easily deploy over a cluster of machines located at geographically different places. There is no central master server, so no single point of failure, no bottleneck, data is replicated, and a faulty node can be replaced without any downtime. It’s eventually consistent. It is linearly scalable, which means that with more nodes, the requests served per second per node will not go down. Also, the total throughput of the system will increase with each node being added. And finally, it’s column oriented, much like a map (or better, a map of sorted maps) or a table with flexible columns where each column is essentially a key-value pair. So, you can add columns as you go, and each row can have a different set of columns (key-value pairs). It does not provide any relational integrity. It is up to the application developer to perform relation management.
So, if Cassandra is so good at everything, why doesn’t everyone drop whatever database they are using and jump start with Cassandra? This is a natural question. Some applications require strong ACID compliance, such as a booking system. If you are a person who goes by statistics, you’d ask how Cassandra fares with other existing data stores. TilmannRabl and others in their paper, Solving Big Data Challenges for Enterprise Application Performance Management (http://vldb.org/pvldb/vol5/p1724_tilmannrabl_vldb2012.pdf), said that, “In terms of scalability, there is a clear winner throughout our experiments. Cassandra achieves the highest throughput for the maximum number of nodes in all experiments with a linear increasing throughput from one to 12 nodes. This comes at the price of a high write and read latency. Cassandra’s performance is best for high insertion rates”.
If you go through the paper, Cassandra wins in almost all the criteria. Equipped with proven concepts of distributed computing, made to reliably serve from commodity servers, and simple and easy maintenance, Cassandra is one of the most scalable, fastest, and very robust NoSQL database. So, the next natural question is, “What makes Cassandra so blazing fast?”. Let’s dive deeper into the Cassandra architecture.
Understanding the architecture of Cassandra
Cassandra is a relative latecomer in the distributed data-store war. It takes advantage of two proven and closely similar data-store mechanisms, namely Bigtable: A Distributed Storage System for Structured Data, 2006 (http://static.googleusercontent.com/external_content/untrusted_dlcp/research.google.com/en//archive/bigtable-osdi06.pdf) and Amazon Dynamo: Amazon’s Highly Available Key-value Store, 2007 (http://www.read.seas.harvard.edu/~kohler/class/cs239-w08/decandia07dynamo.pdf). The following diagram displays the read throughputs that show linear scaling of Cassandra:
Like BigTable, it has a tabular data presentation. It is not tabular in the strictest sense. It is rather a dictionary-like structure where each entry holds another sorted dictionary/map. This model is more powerful than the usual key-value store and it is named a table, formerly known as a column family. The properties such as eventual consistency and decentralization are taken from Dynamo.
Now, assume a column family is a giant spreadsheet, such as MS Excel. But unlike spreadsheets, each row is identified by a row key with a number (token), and unlike spreadsheets, each cell may have its own unique name within the row. In Cassandra, the columns in the rows are sorted by this unique column name. Also, since the number of partitions is allowed to be very large (1.7*1038), it distributes the rows almost uniformly across all the available machines by dividing the rows in equal token groups. Tables or column families are contained within a logical container or name space called keyspace. A keyspace can be assumed to be more or less similar to database in RDBMS.
A word on max number of cells, rows, and partitions
A cell in a partition can be assumed as a key-value pair. The maximum number of cells per partition is limited by the Java integer’s max value, which is about 2 billion. So, one partition can hold a maximum of 2 billion cells.
A row, in CQL terms, is a bunch of cells with predefined names. When you define a table with a primary key that has just one column, the primary key also serves as the partition key. But when you define a composite primary key, the first column in the definition of the primary key works as the partition key. So, all the rows (bunch of cells) that belong to one partition key go into one partition. This means that every partition can have a maximum of X rows, where X = (2*109/number_of_columns_in_a_row). Essentially, rows * columns cannot exceed 2 billion per partition.
Finally, how many partitions can Cassandra hold for each table or column family? As we know, column families are essentially distributed hashmaps. The keys or row keys or partition keys are generated by taking a consistent hash of the string that you pass. So, the number of partitioned keys is bounded by the number of hashes these functions generate. This means that if you are using the default Murmur3 partitioner (range -263 to +263), the maximum number of partitions that you can have is 1.85*1019. If you use the Random partitioner, the number of partitions that you can have is 1.7*1038.
A Cassandra cluster is called a ring. The terminology is taken from Amazon Dynamo. Cassandra 1.1 and earlier versions used to have a token assigned to each node. Let’s call this value the initial token. Each node is responsible for storing all the rows with token values (a token is basically a hash value of a row key) ranging from the previous node’s initial token (exclusive) to the node’s initial token (inclusive).
This way, the first node, the one with the smallest initial token, will have a range from the token value of the last node (the node with the largest initial token) to the first token value. So, if you jump from node to node, you will make a circle, and this is why a Cassandra cluster is called a ring.
Let’s take an example. Assume that there is a hashing algorithm (partitioner) that generates tokens from 0 to 127 and you have four Cassandra machines to create a cluster. To allocate equal load, we need to assign each of the four nodes to bear an equal number of tokens. So, the first machine will be responsible for tokens one to 32, the second will hold 33 to 64, third 65 to 96, and fourth 97 to 127 and 0. If you mark each node with the maximum token number that it can hold, the cluster looks like a ring, as shown in the following figure:
Token ownership and distribution in a balanced Cassandra ring
In Cassandra 1.1 and previous versions, when you create a cluster or add a node, you manually assign its initial token. This is extra work that the database should handle internally. Apart from this, adding and removing nodes requires manual resetting token ownership for some or all nodes. This is called rebalancing. Yet another problem was replacing a node. In the event of replacing a node with a new one, the data (rows that the to-be-replaced node owns) is required to be copied to the new machine from a replica of the old machine. For a large database, this could take a while because we are streaming from one machine. To solve all these problems, Cassandra 1.2 introduced virtual nodes (vnodes).
The following figure shows 16 vnodes distributed over four servers:
In the preceding figure, each node is responsible for a single continuous range. In the case of a replication factor of 2 or more, the data is also stored on other machines than the one responsible for the range. (Replication factor (RF) represents the number of copies of a table that exist in the system. So, RF=2, means there are two copies of each record for the table.) In this case, one can say one machine, one range. With vnodes, each machine can have multiple smaller ranges and these ranges are automatically assigned by Cassandra. How does this solve those issues? Let’s see. If you have a 30 ring cluster and a node with 256 vnodes had to be replaced. If nodes are well-distributed randomly across the cluster, each physical node in remaining 29 nodes will have 8 or 9 vnodes (256/29) that are replicas of vnodes on the dead node. In older versions, with a replication factor of 3, the data had to be streamed from three replicas (10 percent utilization). In the case of vnodes, all the nodes can participate in helping the new node get up.
The other benefit of using vnodes is that you can have a heterogeneous ring where some machines are more powerful than others, and change the vnodes ‘ settings such that the stronger machines will take proportionally more data than others. This was still possible without vnodes but it needed some tricky calculation and rebalancing. So, let’s say you have a cluster of machines with similar hardware specifications and you have decided to add a new server that is twice as powerful as any machine in the cluster. Ideally, you would want it to work twice as harder as any of the old machines. With vnodes, you can achieve this by setting twice as many num_tokens as on the old machine in the new machine’s cassandra.yaml file. Now, it will be allotted double the load when compared to the old machines.
Yet another benefit of vnodes is faster repair. Node repair requires the creation of a Merkle tree for each range of data that a node holds. The data gets compared with the data on the replica nodes, and if needed, data re-sync is done. Creation of a Merkle tree involves iterating through all the data in the range followed by streaming it. For a large range, the creation of a Merkle tree can be very time consuming while the data transfer might be much faster. With vnodes, the ranges are smaller, which means faster data validation (by comparing with other nodes). Since the Merkle tree creation process is broken into many smaller steps (as there are many small nodes that exist in a physical node), the data transmission does not have to wait till the whole big range finishes. Also, the validation uses all other machines instead of just a couple of replica nodes.
As of Cassandra 2.0.9, the default setting for vnodes is “on” with default vnodes per machine as 256. If for some reason you do not want to use vnodes and want to disable this feature, comment out the num_tokens variable and uncomment and set the initial_token variable in cassandra.yaml. If you are starting with a new cluster or migrating an old cluster to the latest version of Cassandra, vnodes are highly recommended.
The number of vnodes that you specify on a Cassandra node represents the number of vnodes on that machine. So, the total vnodes on a cluster is the sum total of all the vnodes across all the nodes. One can always imagine a Cassandra cluster as a ring of lots of vnodes.
How Cassandra works
Diving into various components of Cassandra without having any context is a frustrating experience. It does not make sense why you are studying SSTable, MemTable, and log structured merge (LSM) trees without being able to see how they fit into the functionality and performance guarantees that Cassandra gives. So first we will see Cassandra’s write and read mechanism. It is possible that some of the terms that we encounter during this discussion may not be immediately understandable.
A rough overview of the Cassandra components is as shown in the following figure:
Main components of the Cassandra service
The main class of Storage Layer is StorageProxy. It handles all the requests. The messaging layer is responsible for inter-node communications, such as gossip. Apart from this, process-level structures keep a rough idea about the actual data containers and where they live.
There are four data buckets that you need to know. MemTable is a hash table-like structure that stays in memory. It contains actual cell data. SSTable is the disk version of MemTables. When MemTables are full, they are persisted to hard disk as SSTable. Commit log is an append only log of all the mutations that are sent to the Cassandra cluster.
Mutations can be thought of as update commands. So, insert, update, and delete operations are mutations, since they mutate the data.
Commit log lives on the disk and helps to replay uncommitted changes. These three are basically core data. Then there are bloom filters and index. The bloom filter is a probabilistic data structure that lives in the memory. They both live in memory and contain information about the location of data in the SSTable. Each SSTable has one bloom filter and one index associated with it. The bloom filter helps Cassandra to quickly detect which SSTable does not have the requested data, while the index helps to find the exact location of the data in the SSTable file.
With this primer, we can start looking into how write and read works in Cassandra. We will see more explanation later.
Write in action
To write, clients need to connect to any of the Cassandra nodes and send a write request. This node is called the coordinator node. When a node in a Cassandra cluster receives a write request, it delegates the write request to a service called StorageProxy. This node may or may not be the right place to write the data. StorageProxy’s job is to get the nodes (all the replicas) that are responsible for holding the data that is going to be written. It utilizes a replication strategy to do this. Once the replica nodes are identified, it sends the RowMutation message to them, the node waits for replies from these nodes, but it does not wait for all the replies to come. It only waits for as many responses as are enough to satisfy the client’s minimum number of successful writes defined by ConsistencyLevel.
ConsistencyLevel is basically a fancy way of saying how reliable a read or write you want to be. Cassandra has tunable consistency, which means you can define how much reliability is wanted. Obviously, everyone wants a hundred percent reliability, but it comes with latency as the cost. For instance, in a thrice-replicated cluster (replication factor = 3), a write time consistency level TWO, means the write will become successful only if it is written to at least two replica nodes. This request will be faster than the one with the consistency level THREE or ALL, but slower than the consistency level ONE or ANY.
The following figure is a simplistic representation of the write mechanism. The operations on node N2 at the bottom represent the node-local activities on receipt of the write request:
The following steps show everything that can happen during a write mechanism:
- If the failure detector detects that there aren’t enough live nodes to satisfy ConsistencyLevel, the request fails.
- If the failure detector gives a green signal, but writes time-out after the request is sent due to infrastructure problems or due to extreme load, StorageProxy writes a local hint to replay when the failed nodes come back to life. This is called hinted hand off.
One might think that hinted handoff may be responsible for Cassandra’s eventual consistency. But it’s not entirely true. If the coordinator node gets shut down or dies due to hardware failure and hints on this machine cannot be forwarded, eventual consistency will not occur. The anti-entropy mechanism is responsible for consistency, rather than hinted hand-off. Anti-entropy makes sure that all replicas are in sync.
- If the replica nodes are distributed across data centers, it will be a bad idea to send individual messages to all the replicas in other data centers. Rather, it sends the message to one replica in each data center with a header, instructing it to forward the request to other replica nodes in that data center.
- Now the data is received by the node which should actually store that data. The data first gets appended to the commit log, and pushed to a MemTable appropriate column family in the memory.
- When the MemTable becomes full, it gets flushed to the disk in a sorted structure named SSTable. With lots of flushes, the disk gets plenty of SSTables. To manage SSTables, a compaction process runs. This process merges data from smaller SSTables to one big sorted file.
Read in action
Similar to a write case, when StorageProxy of the node that a client is connected to gets the request, it gets a list of nodes containing this key based on the replication strategy. The node’s StorageProxy then sorts the nodes based on their proximity to itself. The proximity is determined by the snitch function that is set up for this cluster. Basically, the following types of snitches exist:
- SimpleSnitch: A closer node is the one that comes first when moving clockwise in the ring. (A ring is when all the machines in the cluster are placed in a circular fashion with each machine having a token number. When you walk clockwise, the token value increases. At the end, it snaps back to the first node.)
- PropertyFileSnitch: This snitch allows you to specify how you want your machines’ location to be interpreted by Cassandra. You do this by assigning a data center name and rack name for all the machines in the cluster in the $CASSANDRA_HOME/conf/cassandra-topology.properties file. Each node has a copy of this file and you need to alter this file each time you add or remove a node. This is what the file looks like:
# Cassandra Node IP=Data Center:Rack 192.168.1.100=DC1:RAC1 192.168.2.200=DC2:RAC2 10.0.0.10=DC1:RAC1 10.0.0.11=DC1:RAC1 10.0.0.12=DC1:RAC2 10.20.114.10=DC2:RAC1 10.20.114.11=DC2:RAC1
- GossipingPropertyFileSnitch: The PropertyFileSnitch is kind of a pain, even when you think about it. Each node has the locations of all nodes manually written and updated every time a new node joins or an old node retires. And then, we need to copy it on all the servers. Wouldn’t it be better if we just specify each node’s data center and rack on just that one machine, and then have Cassandra somehow collect this information to understand the topology? This is exactly what GossipingPropertyFileSnitch does. Similar to PropertyFileSnitch, you have a file called $CASSANDRA_HOME/conf/cassandra-rackdc.properties, and in this file you specify the data center and the rack name for that machine. The gossip protocol makes sure that this information gets spread to all the nodes in the cluster (and you do not have to edit properties of files on all the nodes when a new node joins or leaves). Here is what a cassandra-rackdc.properties file looks like:
# indicate the rack and dc for this node dc=DC13 rack=RAC42
- RackInferringSnitch: This snitch infers the location of a node based on its IP address. It uses the third octet to infer rack name, and the second octet to assign data center. If you have four nodes 10.110.6.30, 10.110.6.4, 10.110.7.42, and 10.111.3.1, this snitch will think the first two live on the same rack as they have the same second octet (110) and the same third octet (6), while the third lives in the same data center but on a different rack as it has the same second octet but the third octet differs. Fourth, however, is assumed to live in a separate data center as it has a different second octet than the three.
- EC2Snitch: This is meant for Cassandra deployments on Amazon EC2 service. EC2 has regions and within regions, there are availability zones. For example, us-east-1e is an availability zone in the us-east region with availability zone named 1e. This snitch infers the region name (us-east, in this case) as the data center and availability zone (1e) as the rack.
- EC2MultiRegionSnitch: The multi-region snitch is just an extension of EC2Snitch where data centers and racks are inferred the same way. But you need to make sure that broadcast_address is set to the public IP provided by EC2 and seed nodes must be specified using their public IPs so that inter-data center communication can be done.
- DynamicSnitch: This Snitch determines closeness based on a recent performance delivered by a node. So, a quick responding node is perceived as being closer than a slower one, irrespective of its location closeness, or closeness in the ring. This is done to avoid overloading a slow performing node. DynamicSnitch is used by all the other snitches by default. You can disable it, but it is not advisable.
Now, with knowledge about snitches, we know the list of the fastest nodes that have the desired row keys, it’s time to pull data from them. The coordinator node (the one that the client is connected to) sends a command to the closest node to perform a read (we’ll discuss local reads in a minute) and return the data. Now, based on ConsistencyLevel, other nodes will send a command to perform a read operation and send just the digest of the result. If we have read repairs (discussed later) enabled, the remaining replica nodes will be sent a message to compute the digest of the command response.
Let’s take an example. Let’s say you have five nodes containing a row key K (that is, RF equals five), your read ConsistencyLevel is three; then the closest of the five nodes will be asked for the data and the second and third closest nodes will be asked to return the digest. If there is a difference in the digests, full data is pulled from the conflicting node and the latest of the three will be sent. These replicas will be updated to have the latest data. We still have two nodes left to be queried. If read repairs are not enabled, they will not be touched for this request. Otherwise, these two will be asked to compute the digest. Depending on the read_repair_chance setting, the request to the last two nodes is done in the background, after returning the result. This updates all the nodes with the most recent value, making all replicas consistent.
Let’s see what goes on within a node. Take a simple case of a read request looking for a single column within a single row. First, the attempt is made to read from MemTable, which is rapid fast, and since there exists only one copy of data, this is the fastest retrieval. If all required data is not found there, Cassandra looks into SSTable. Now, remember from our earlier discussion that we flush MemTables to disk as SSTables and later when the compaction mechanism wakes up, it merges those SSTables. So, our data can be in multiple SSTables.
The following figure represents a simplified representation of the read mechanism. The bottom of the figure shows processing on the read node. The numbers in circles show the order of the event. BF stands for bloom filter.
Each SSTable is associated with its bloom filter built on the row keys in the SSTable. Bloom filters are kept in the memory and used to detect if an SSTable may contain (false positive) the row data. Now, we have the SSTables that may contain the row key. The SSTables get sorted in reverse chronological order (latest first).
Apart from the bloom filter for row keys, there exists one bloom filter for each row in the SSTable. This secondary bloom filter is created to detect whether the requested column names exist in the SSTable. Now, Cassandra will take SSTables one by one from younger to older, and use the index file to locate the offset for each column value for that row key and the bloom filter associated with the row (built on the column name). On the bloom filter being positive for the requested column, it looks into the SSTable file to read the column value. Note that we may have a column value in other yet-to-be-read SSTables, but that does not matter, because we are reading the most recent SSTables first, and any value that was written earlier to it does not matter. So, the value gets returned as soon as the first column in the most recent SSTable is allocated.
By now, you should be familiar with all the nuts and bolts of Cassandra. We have discussed how the pressure to make data stores to web scale inspired a rather not-so-common database mechanism to become mainstream, and how the CAP theorem governs the behavior of such databases. We have seen that Cassandra shines out among its peers. Then, we dipped our toes into the big picture of Cassandra read and write mechanisms. This left us with lots of fancy terms.
It is understandable that it may be a lot to take in for someone new to NoSQL systems. It is okay if you do not have complete clarity at this point. As you start working with Cassandra, tweaking it, experimenting with it, and going through the Cassandra mailing list discussions or talks, you will start to come across stuff that you have read in this article and it will start to make sense, and perhaps you may want to come back and refer to this article to improve your clarity.
It is not required that you understand this article fully to be able to write queries, set up clusters, maintain clusters, or do anything else related to Cassandra. A general sense of this article will take you far enough to work extremely well with Cassandra-based projects.
How does this knowledge help us in building an application? Isn’t it just about learning Thrift or CQL API and getting going? You might be wondering why you need to know about the compaction and storage mechanism, when all you need to do is to deliver an application that has a fast backend. It may not be obvious at this point why you are learning this, but as we move ahead with developing an application, we will come to realize that knowledge about underlying storage mechanism helps. Later, if you will deploy a cluster, performance tuning, maintenance, and integrating with other tools such as Apache Hadoop, you may find this article useful.