This post goes over doing a few aggregations on streaming data using Spark Streaming and Kafka. We will be setting up a local environment for the purpose of the tutorial. If you have Spark and Kafka running on a cluster, you can skip the getting setup steps.

### The Challenge of Stream Computations

Computations on streams can be challenging due to multiple reasons, including the size of a dataset.

Certain metrics such as quantiles need to iterate over the entire dataset in a sorted order using standard formulae/ practices and they may not be the most suited approach, for example, mean = sum of value/ count. For a streaming dataset, this is not fully scalable. Instead, suppose we store the sum and count and each new item is added to the sum. For every new item, we increment the count, and whenever we need the average, we divide the sum by the count. Then we get the mean at that instance.

## Calculating Percentile

Percentile requires finding the location of an item in a large dataset; for example, 90th percentile would mean the value that is over 90 percent of the values in a sorted dataset. To illustrate, in [9, 1, 8, 7, 6, 5, 2, 4, 3, 0], the 80th percentile would be 8. This means we need to sort the dataset and then find an item by its location. This clearly is not scalable. Scaling this operation involves using an algorithm called tdigest. This is a way of approximatingpercentile at scale. tdigest creates digests that create centroids at positions that are approximated at the appropriate quantiles. These digests can be added to get a complete digest that can be used to estimate the quantiles of the whole dataset. Spark allows us to do computations on partitions of data, unlike traditional Map Reduce. So we calculate the digests for every partition and add them in the reduce phase to get a complete digest. This is the only time we need to converge that data at one point (reduce operation). We then use Spark’s broadcast feature to broadcast the value. This value is then used for filtering the dataset to leave us an RDD matching our criteria (top 5 percentile). We then use mapPartitions to send the values of each partition to Kafka (this could be any message handler, post, and so on).

### Nature of the Data

We are using fictitious data. It contains two columns: user_id, and activity_type. We are going to compute popular users. The activity can be of the following types: profile.picture.like, profile.view, and message.private. Each of these activities will have a different score.

### Metrics We Would Like to Compute

We would like to compute the most popular users, that is, the top 5 percentile of users (score and list of users).

### Prerequisites

You must have Docker, Python 2.7, and JRE 1.7 installed, as well as Scala and basic familiarity with Spark and the concept of RDDs.

### Getting Setup with Kafka

Download the Kafka container. For the purpose of this tutorial, we will run Kafka as a Docker container. The container can be run with Mac:

``````docker run - p 2181 : 2181 - p 9092 : 9092 -- env ADVERTISED_HOST = `boot2docker ip`
-- env
ADVERTISED_PORT = 9092 spotify / kafka``````

Linux (Docker installed directly on the machine):

``````docker run - p 2181 : 2181 - p 9092 : 9092 -- env ADVERTISED_HOST = `127.0.0.1` -- env
ADVERTISED_PORT = 9092 spotify / kafka
``````

This should get you started with running a Kafka instance that we will be using for this tutorial. We also download the Kafka binaries locally to test the Kafka consumer, create topics, and so on. Kafka binaries can be found at here. Download and extract the latest version. The directory containing the Kafka binaries will be referred to as \$KAFKA_HOME.

### Getting Setup with Spark

The next step is to install Spark. We have two options to run spark:

• Run it on a Docker container
• Run it locally

#### Running Spark Locally

``wget http : //apache.claz.org/spark/spark-1.4.0/spark-1.4.0-bin-hadoop2.6.tgz``

Extract the binaries:

``````tar - xvf spark - 1.4 . 0 - bin - hadoop2 . 6.tgz
Run pyspark shell using
cd spark - 1.4 . 0 - bin - hadoop2 . 6
./bin/pyspark``````

If you have IPython installed, you can also use IPython with pyspark by using the following line:

``IPYTHON=1 ./bin/pyspark``

Running Spark as a Docker container

``````docker run - i - t - h - p 8888 : 8888 - v my_code : /app sandbox anantasty/ ubuntu_spark_ipython : 1.0
bash``````

This will mount a directory named my_code on your local system to the /app directory on the Docker container. The Spark shell starts with the Spark Context available as sc and the HiveContext available as the following:

``sqlContext``

Here is a simple Spark job for testing the installation:

``````sc . parallelize ( range ( 1 , 100 ))
sc . parallelize ( range ( 1 , 100 ))
res = rdd . map ( lambda v : v * v ). reduce ( lambda x , y : x + y)
print res
This job just calculates the sum of squares of the first 1000 integers.``````

### Spark Streaming Basics

Spark streaming is an extension of the core Spark API. It can be used to process high-throughput, fault-tolerant data streams. These data streams can be nested from various sources, such as ZeroMQ, Flume, Twitter, Kafka, and so on.

Spark Streaming breaks the data into small batches, and these batches are then processed by Spark to generate the stream of results, again in batches. The code abstraction from this is called DStream, which represents a continuous stream of data. A DStream is a sequence of RDDs loaded incrementally. More information on Spark Streaming can be found in the Spark Streaming Programming guide.

### Kafka Basics

Kafka is a publish-subscribe messaging system. It is distributed, partitioned, and replicated.

Terminology:

A category of feeds is called a topic; for example, weather data from two different stations could be different topics.

• The publishers are called Producers.
• The subscribers of these topics are called Consumers.
• The Kafka cluster has one or more servers each of which is called a broker.
• More details can be found at here.

### Generating Mock Data

We can generate data in two ways:

• Statically generated data
• Continuous data generation

We can use statically generated data to generate a dataset and use that in our Kafka producers. We could use the following method to generate random data:

``````from numpy . random import random_integers
MESSAGE_TYPES = [ 'profile.picture.like' , 'profile.view' , 'message.private']
START_INDEX = 1000
NUM_USERS = 100000
NUM_ROWS = 10000000
def generate_data ( start_index = START_INDEX , num_users = NUM_USERS,
num_rows = NUM_ROWS ):
users = random_integers ( start_index , start_index + num_users , num_rows)
activity = random_integers ( 0 , len ( MESSAGE_TYPES ) - 1 , num_rows)
activity_name = [ MESSAGE_TYPES [ i ] for i in activity]
user_activity = zip ( users , activity_name)
return user_activity``````

We can also generate data on the fly using this code:

``````import random
MESSAGE_TYPES = [ 'profile.picture.like' , 'profile.view' , 'message.private']
START_INDEX = 1000
END_INDEX = 101000
def gen_random_message ( start_index = START_INDEX , end_index = END_INDEX ):
return ( random . randint ( start_index , end_index ),
random . choice ( MESSAGE_TYPES ))``````

The full source code can be found at the GitHub repo. Now we can start the producer and use the following line:

``.\$KAFKA_HOME/bin/ kafka - console - consumer . sh -- zookeeper 127.0 . 0.1 : 2181 -- topic messages``

We can see the Kafka messages being printed to the console. At this point, we have our producer ready.

### Aggregation and Processing Using Spark Streaming

This process can be broken down into the following steps:

• Reading the message from the Kafka queue.
• Decoding the message.
• Converting the message type text to its numeric score.
• Updating the score counts for incoming data.
• Filtering for the most popular users.

### Reading Messages from the Kafka Queue

Reading messages in pyspark is possible using the KafkaUtils module to create a stream from a Kafka queue.

``````kvs = KafkaUtils . createDirectStream ( ssc , [ "messages" ],

Load the message and convert the type text to key. This is done by using Python’s built-in json module and returning a tuple of the relevant values. If you notice, we used this:

`` scores_b . value [ message [ 'activity' ]]``

Here, scores is a dictionary that maps the message type text to a numeric value. We then broadcast this dictionary out to all the nodes as score_b, using the following lines:

``````scores = { 'profile.picture.like' : 2 , 'profile.view' : 1 , 'message.private' : 3}
scores_b = sc . broadcast ( scores)``````

Next, we access the dictionary using scores_b.value, which returns us the original dictionary. Spark uses a bit torrent style broadcast, where the master broadcasts the value to a few nodes and the other nodes replicate this value from those nodes.

``````def load_msg ( msg ):
message = json . loads ( msg [ 1 ])
return message [ 'user_id' ], scores_b . value [ message [ 'activity' ]]
``````

Now we count incoming messages and update the score count. For this step, we use the updateStateByKey function on the DStream. The updateStateByKey function returns a new DStream by applying the provided function to the previous state of the DStream and the new values. This function operates somewhat similarly to a reduce function. The function provided to updateStateByKey has the accumulated value from the previous operations and the new value, and we can aggregate or combine these in our function that we provide. We also have to note that the first value is used as the key by default, so in this case the userId is the key, which is ideal.

The score is the value.

``````def update_scorecount ( new_scores , score_sum ):
if not score_sum:
score_sum = 0
return sum ( new_scores ) + score_sum``````

Now we can filter the most popular users. We compute the desired percentile and filter based on it. To calculate the percentile, we use the tdigest algorithm. This algorithm allows us to estimate the percentile value in a single pass and thus is very useful and efficient for streaming data. The orignal tdigest repo from Ted Dunning can be found at here. An open source Python implementation of this algorithm was used and it can be found at here.

We create a digest_partitions function that takes values from a given partition and adds them to the digest. In the reduce step, these digests are added to provide a final digest that can provide us the percentile value. We then broadcast this percentile value, which we later use in our filter. We could have also performed the computation of the digest within the filter_most_popular function, but this way we can easily add some form of output such as a Kafka producer to publish the percentile value, if needed.

``````def digest_partitions ( values ):
digest = TDigest ()
digest . batch_update ( values)
return [ digest]
def compute_percentile ( rdd ):
percentile_limit = rdd . map ( lambda row : row [ 1 ]). mapPartitions(
digest_partitions ). reduce ( add ). percentile ( args_broadcast . value . limit)
percentile_limit)
def filter_most_popular ( rdd ):
return rdd . filter ( lambda row : row [ 1 ] > percentile_broadcast . value)
return rdd . context . parallelize ([])``````

This filtered RDD can now be broadcast using Kafka. To broadcast the values, we used a Keyed producer and key on the timestamp. We use the foreachPartition function to publish each partition of the RDD instead of publishing each value at once to avoid the overhead of creating a huge number of network connections to Kafka.

``````def publish_popular_users ( popular_rdd ):
key = 'popular_{}' . format ( int ( time ()))
message_key = popular_rdd . context . broadcast ( key)
def publish_partition ( partition ):
kafka = KafkaClient ( args_broadcast . value . kafka_hosts)
producer = KeyedProducer ( kafka , partitioner = RoundRobinPartitioner,
async = True , batch_send = True)
producer . send_messages ( 'popular_users' , message_key . value,
*[ json . dumps ( user ) for user in partition ])
popular_rdd . foreachPartition ( publish_partition)``````

The complete code can be found here.

The code can be run using:

``````SPARK_HOME/spark/bin/sparksubmit master local jars
SPARK_KAFKA_JARS/target/sparkstreamingkafkaassembly_2.101.5.0SNAPSHOT.jar
--executorcores 8 streaming_percentile.py``````