10 min read

(For more resources related to this topic, see here.)

Map-reduce is a technique that is used to take large quantities of data and farm it out for processing. A somewhat trivial example might be: given 1TB of HTTP log data, count the number of hits that come from a given country, and report those numbers. For example, if you have the log entries:

204.12.226.2 - - [09/Jun/2013:09:12:24 -0700] "GET /who-we-are HTTP/1.0"
404 471 "-" "Mozilla/5.0 (compatible; MJ12bot/v1.4.3;
http://www.majestic12.co.uk/bot.php?+)"
174.129.187.73 - - [09/Jun/2013:10:58:22 -0700] "GET /robots.txt
HTTP/1.1" 404 452 "-" "CybEye.com/2.0 (compatible; MSIE 9.0; Windows NT
5.1; Trident/4.0; GTB6.4)"
157.55.35.37 - - [02/Jun/2013:23:31:01 -0700] "GET / HTTP/1.1" 200 483
"-" "Mozilla/5.0 (compatible; bingbot/2.0; +
http://www.bing.com/bingbot.htm)"
206.183.1.74 - - [02/Jun/2013:18:24:35 -0700] "GET / HTTP/1.1" 200 482
"-" "Mozilla/4.0 (compatible;
http://search.thunderstone.com/texis/
websearch/about.html)"
1.202.218.21 - - [02/Jun/2013:17:38:20 -0700] "GET /robots.txt HTTP/1.1"
404 471 "-" "Mozilla/5.0 (compatible; JikeSpider; +
http://shoulu.jike.
com/spider.html)"

Then the answer to the question would be as follows:

US: 4
China: 1

Clearly this example dataset does not warrant distributing the data processing among multiple machines, but imagine if instead of five rows of log data we had twenty-five billion rows. If your program took a single computer a half a second to process five records, it would take a little short of eighty years to process twenty-five billion records. To solve for this, we could break up the data into smaller chunks and then process those smaller chunks, rejoining them when we were finished.

To apply this to a slightly larger dataset, imagine you extrapolated these five records to one hundred records and then split those one hundred records into five groups, each containing twenty records. From those five groups we might compute the following results:

Group 1

 

Group 2

 

Group 3

 

Group 4

 

Group 5

 

US

5

Mexico

2

US

15

Italy

1

Finland

5

Greece

4

Scotland

6

China

2

Greece

4

China

5

Ireland

8

Canada

9

Finland

3

Scotland

10

US

10

Canada

3

Ireland

3

 

 

US

5

 

 

If we were to combine these data points by using the country name as a key and store them in a map, adding the value to any existing value, we would get the count per country across all one hundred records.

Using Ruby, we can write a simple program to do this, first without using Gearman, and then with it.

To demonstrate this, we will write the following:

  • A simple library that we can use in our non-distributed program and in our Gearman-enabled programs

  • An example program that demonstrates using the library

  • A client that uses the library to split up our data and submit jobs to our manager

  • A worker that uses the library to process the job requests and return the results

The shared library

First we will develop a library that we can reuse. This will demonstrate that you can reuse existing logic to quickly take advantage of Gearman because it ensures the following things:

  • The program, client, and worker are much simpler so we can see what’s going on in them

  • The behavior between our program, client, and worker is guaranteed to be consistent

The shared library will have two methods, map_data and reduce_data. The map_data method will be responsible for splitting up the data into chunks to be processed, and the reduce_data method will process those chunks of data and return something that can be merged together into an accurate answer. Take the following example, and save it to a file named functions.rb for later use:

#!/bin/env ruby
# Generate sub-lists of the data
# each sub-list has size = blocksize
def map_data(lines, blocksize)
blocks = []
counter = 0
block = []
lines.each do |line|
if (counter >= blocksize)
blocks << block
block = []
counter = 0
end
block << line
counter += 1
end
blocks << block if block.size> 0
blocks
end
# Extract the number of times we see a unique line
# Result is a hash with key = line, value = count
def reduce_data(lines)
results = {}
lines.each do |line|
results[line] ||= 0
results[line] += 1
end
results
end

A simple program

To use this library, we can write a very simple program that demonstrates the functionality:

require './functions.rb'
countries = ["china", "us", "greece", "italy"]
lines = []
results = {}
(1..100).each { |i| lines << countries[i % 4] }
blocks = map_data(lines, 20)
blocks.each do |block|
reduce_data(block).each do |k,v|
results[k] ||= 0
results[k] += v
end
end
puts results.inspect

Put the contents of this example into a Ruby source file, named mapreduce.rb in the same directory as you placed your functions.rb file, and execute it with the following:

[user@host:$] ruby ./mapreduce.rb

This script will generate a list with one hundred elements in it. Since there are four distinct elements, each will appear 25 times as the following output shows:

{"us"=>25, "greece"=>25, "italy"=>25, "china"=>25}

Following in this vein, we can add in Gearman to extend our example to operate using a client that submits jobs and a single worker that will process the results serially to generate the same results. The reason we wrote these methods in a separate module from the driver application was to make them reusable in this fashion.

The client

The following code for the client in this example will be responsible for the mapping phase, it will split apart the results and submit jobs for the blocks of data it needs processed. In this example worker/client setup, we are using JSON as a simple way to serialize/deserialize data being sent back and forth:

require 'rubygems'
require 'gearman'
require 'json'
require './functions.rb'
client = Gearman::Client.new('localhost:4730')
taskset = Gearman::TaskSet.new(client)
countries = ["china", "us", "greece", "italy"]
jobcount = 1
lines = []
results = {}
(1..100).each { |i| lines << countries[i % 4] }
blocks = map_data(lines, 20)
blocks.each do |block|
# Generate a task with a unique id
uniq = rand(36**8).to_s(36)
task = Gearman::Task.new('count_countries',
JSON.dump(block),
:uniq =>uniq)
# When the task is complete, add its results into ours
task.on_complete do |d|
# We are passing data back and forth as JSON, so
# decode it to a hash and then iterate over the
# k=>v pairs
JSON.parse(d).each do |k,v|
results[k] ||= 0
results[k] += v
end
end
taskset.add_task(task)
puts "Submitted job #{jobcount}"
jobcount += 1
end
puts "Submitted all jobs, waiting for results."
start_time = Time.now
taskset.wait(100)
time_diff = (Time.now - start_time).to_i
puts "Took #{time_diff} seconds: #{results.inspect}"

This client uses a few new concepts that were not used in the introductory examples, that is, task sets and unique identifiers. In the Ruby client, a task set is a group of tasks that are submitted together and can be waited upon collectively. To generate a task set, you construct it by giving it the client that you want to submit the task set with:

taskset = Gearman::TaskSet.new(client)

Then you can create and add tasks to the task set:

task = Gearman::Task.new('count_countries',
JSON.dump(block), :uniq =>uniq)
taskset.add_task(task)

Finally, you tell the task set how long you want to wait for the results:

taskset.wait(100)

This will block the program until the timeout passes, or all the tasks in the task set complete hold true (again, complete does necessarily mean that the worker succeeded at the task, but that it saw it to completion). In this example, it will wait 100 seconds for all the tasks to complete before giving up on them. This doesn’t mean that the jobs won’t complete if the client disconnects, just that the client won’t see the end results (which may or may not be acceptable).

The worker

To complete the distributed MapReduce example, we need to implement the worker that is responsible for performing the actual data processing. The worker will perform the following tasks:

  • Receive a list of countries serialized as JSON from the manager

  • Decode that JSON data into a Ruby structure

  • Perform the reduce operation on the data converting the list of countries into a corresponding hash of counts

  • Serialize the hash of counts as a JSON string

  • Return the JSON string to the manager (to be passed on to the client)

    require 'rubygems'
    require 'gearman'
    require 'json'
    require './functions.rb'
    Gearman::Util.logger.level = Logger::DEBUG
    @servers = ['localhost:4730']
    w = Gearman::Worker.new(@servers)
    w.add_ability('count_countries') do |json_data,job|
    puts "Received: #{json_data}"
    data = JSON.parse(json_data)
    result = reduce_data(data)
    puts "Result: #{result.inspect}"
    returndata = JSON.dump(result)
    puts "Returning #{returndata}"
    sleep 4
    returndata
    end
    loop { w.work }

Notice that we have introduced a slight delay in returning the results by instructing our worker to sleep for four seconds before returning the data. This is here in order to simulate a job that takes a while to process.

To run this example, we will repeat the exercise from the first section. Save the contents of the client to a file called mapreduce_client.rb, and then contents of the worker to a file named mapreduce_worker.rb in the same directory as the functions.rb file. Then, start the worker first by running the following:

ruby mapreduce_worker.rb

And then start the client by running the following:

ruby mapreduce_client.rb

When you run these scripts, the worker will be waiting to pick up jobs, and then the client will generate five jobs, each with a block containing a list of countries to be counted, and submit them to the manager. These jobs will be picked up by the worker and then processed, one at a time, until they are all complete. As a result there will be a twenty second difference between when the jobs are submitted and when they are completed.

Parallelizing the pipeline

Implementing the solution this way clearly doesn’t gain us much performance from the original example. In fact, it is going to be slower (even ignoring the four second sleep inside each job execution) than the original because there is time involved in serialization and deserialization of the data, transmitting the data between the actors, and transmitting the results between the actors. The goal of this exercise is to demonstrate building a system that can increase the number of workers and parallelize the processing of data, which we will see in the following exercise.

To demonstrate the power of parallel processing, we can now run two copies of the worker. Simply open a new shell and execute the worker via ruby mapreduce_worker.rb and this will spin up a second copy of the worker that is ready to process jobs.

Now, run the client a second time and observe the behavior. You will see that the client has completed in twelve seconds instead of twenty. Why not ten? Remember that we submitted five jobs, and each will take four seconds. Five jobs do not get divided evenly between two workers and so one worker will acquire three jobs instead of two, which will take it an additional four seconds to complete:

[user@host]% ruby mapreduce_client.rb
Submitted job 1
Submitted job 2
Submitted job 3
Submitted job 4
Submitted job 5
Submitted all jobs, waiting for results.
Took 12 seconds: {"us"=>25, "greece"=>25, "italy"=>25, "china"=>25}

Feel free to experiment with the various parameters of the system such as running more workers, increasing the number of records that are being processed, or adjusting the amount of time that the worker sleeps during a job. While this example does not involve processing enormous quantities of data, hopefully you can see how this can be expanded for future growth.

Summary

In this article, we have discussed MapReduce technique. Hope this article gives you a glimpse of how the book flows.

Resources for Article :


Further resources on this subject:


LEAVE A REPLY

Please enter your comment!
Please enter your name here