4 min read

In this post I am going to walk through how to setup a Map Reduce Job that lets you write to Cassandra. Use cases covered here will include streaming analytics into Cassandra.

I am assuming you have a Cassandra cluster and Hadoop cluster available before we start, even single instances or localhost will suffice. The code used for this example is available at https://github.com/manum/mr-cassandra.

Let’s create the Cassandra Keyspace and Table we are going to use. You can run the following in cqlsh (the command line utility that lets you talk to Cassandra).

The table keytable only has one column in it called key; it is where we will store the data.

CREATE KEYSPACE keytest WITH REPLICATION = { 'class' : 'NetworkTopologyStrategy', 'datacenter1' : 3 };

CREATE TABLE keytable (
  key varchar,
PRIMARY KEY (key)
);

Here is what it will look like after it has run:

cqlsh> USE keytest;
cqlsh:keytest> select * from keytable;
 key
----------
 test1234

(1 rows)

We can start by looking at CassandraHelper.java and CassandraTester.java.

CassandraHelper Methods:

getSession(): retrieves the current session object so that no additional ones are created

public Session getSession()  {
         LOG.info("Starting getSession()");
        if (this.session == null && (this.cluster == null || this.cluster.isClosed())) {
            LOG.info("Cluster not started or closed");
        } else if (this.session.isClosed()) {
            LOG.info("session is closed. Creating a session");
            this.session = this.cluster.connect();
        }

        return this.session;
    }

createConnection(String): pass the host for the Cassandra server

public void createConnection(String node)  {

        this.cluster = Cluster.builder().addContactPoint(node).build();

        Metadata metadata = cluster.getMetadata();
            
        System.out.printf("Connected to cluster: %sn",metadata.getClusterName());
        
        for ( Host host : metadata.getAllHosts() ) {
            System.out.printf("Datatacenter: %s; Host: %s; Rack: %sn", host.getDatacenter(), host.getAddress(), host.getRack());
        }
        this.session = cluster.connect();

        
        this.prepareQueries();

    }

closeConnection(): closes the connection after everything is completed.

public void closeConnection() {
        cluster.close();
    }

prepareQueries(): This method prepares queries that are optimized on the server side. It is recommended to use prepared queries in cases where you are running the same query often or where the query does not change but the data might, i.e. when doing several inserts.

private void prepareQueries()  {
        LOG.info("Starting prepareQueries()");
        this.preparedStatement = this.session.prepare(this.query);
    }

addKey(String): Method to add data to the cluster, it also has try catch blocks to catch exceptions and tell you what is occurring.

public void addKey(String key) {
        Session session = this.getSession();
        
        if(key.length()>0) {
            try {
                session.execute(this.preparedStatement.bind(key) );
                //session.executeAsync(this.preparedStatement.bind(key));
            } catch (NoHostAvailableException e) {
                System.out.printf("No host in the %s cluster can be contacted to execute the query.n", 
                        session.getCluster());
                Session.State st = session.getState();
                for ( Host host : st.getConnectedHosts() ) {
                    System.out.println("In flight queries::"+st.getInFlightQueries(host));
                    System.out.println("open connections::"+st.getOpenConnections(host));
                }

            } catch (QueryExecutionException e) {
                System.out.println("An exception was thrown by Cassandra because it cannot " +
                        "successfully execute the query with the specified consistency level.");
            }  catch (IllegalStateException e) {
                System.out.println("The BoundStatement is not ready.");
            }
        }
    }

CassandraTester: This class has a void main method in which you need to provide the host you want to connect to and it will result in writing the value “test1234” into Cassandra.

MapReduceExample.java is the interesting file here. It has a Mapper Class, Reducer Class and a main method to initialize the job. Under the Mapper you will find setup() and cleanup() methods – called automatically by the Map Reduce framework for setup and cleanup operations – which you will use to connect to Cassandra and for cleaning up the connection afterwards.

I modified the standard word count example for this so the program now counts lines instead and will write them all to Cassandra. The output of the reducer is basically lines and count.

To run this example here is what you need to do:

  1. Clone the repo from https://github.com/manum/mr-cassandra
  2. Run mvn install to create a jar in the target/ folder
  3. scp the jar to your Hadoop cluster
  4. Copy over the test input (For this test I used the entire works of Shakespeare all-shakespeare.txt in git)
  5. To run the jar use the following command hadoop jar mr_cassandra-0.0.1-SNAPSHOT-jar-with-dependencies.jar com.example.com.mr_cassandra.MapReduceExample /user/ubuntu/all-shakespeare.txt /user/ubuntu/output/

If you run the above steps, it should kick off the job. After the job is complete go to cqlsh and run select * from keytable limit 10;

cqlsh:keytest> select * from keytable limit 10;

 key
----------------------------------------------------------------
          REGANtGood sir, no more; these are unsightly tricks:
                   KINGtWe lost a jewel of her; and our esteem
                                        ROSALINDtAy, but when?
                                              tNow leaves him.
                           tThy brother by decree is banished:
 DUCHESS OF YORKtI had a Richard too, and thou didst kill him;
             JULIETtWho is't that calls? is it my lady mother?
           ARTHURtO, save me, Hubert, save me! my eyes are out
                 tFull of high feeding, madly hath broke loose
                     tSwift-winged with desire to get a grave,

(10 rows)

cqlsh:keytest>

About the author

Manu Mukerji has a background in cloud computing and big data, handling billions of transactions per day in real time. He enjoys building and architecting scalable, highly available data solutions, and has extensive experience working in online advertising and social media.

Twitter: @next2manu

LinkedIn: https://www.linkedin.com/in/manumukerji/

LEAVE A REPLY

Please enter your comment!
Please enter your name here