14 min read

(For more resources related to this topic, see here.)

Counting distinct IPs in weblog data using MapReduce and Combiners

This recipe will walk you through creating a MapReduce program to count distinct IPs in weblog data. We will demonstrate the application of a combiner to optimize data transfer overhead between the map and reduce stages. The code is implemented in a generic fashion and can be used to count distinct values in any tab-delimited dataset.

Getting ready

This recipe assumes that you have a basic familiarity with the Hadoop 0.20 MapReduce API. You will need access to the weblog_entries dataset supplied with this book and stored in an HDFS folder at the path /input/weblog.

You will need access to a pseudo-distributed or fully-distributed cluster capable of running MapReduce jobs using the newer MapReduce API introduced in Hadoop 0.20.

You will also need to package this code inside a JAR file to be executed by the Hadoop JAR launcher from the shell. Only the core Hadoop libraries are required to compile and run this example.

How to do it…

Perform the following steps to count distinct IPs using MapReduce:

  1. Open a text editor/IDE of your choice, preferably one with Java syntax highlighting.
  2. Create a class named DistinctCounterJob.java in your JAR file at whatever source package is appropriate.
  3. The following code will serve as the Tool implementation for job submission:

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    import java.io.IOException;
    import java.util.regex.Pattern;
    public class DistinctCounterJob implements Tool {
    private Configuration conf;
    public static final String NAME = “distinct_counter”;
    public static final String COL_POS = “col_pos”;
    public static void main(String[] args) throws Exception {
    ToolRunner.run(new Configuration(), new
    DistinctCounterJob(), args);
    }

    
    
  4. The run() method is where we set the input/output formats, mapper class configuration, combiner class, and key/value class configuration:

    public int run(String[] args) throws Exception {
    if(args.length != 3) {
    System.err.println(“Usage: distinct_counter <input>
    <output> <element_position>”);
    System.exit(1);
    }
    conf.setInt(COL_POS, Integer.parseInt(args[2]));
    Job job = new Job(conf, “Count distinct elements at
    position”);
    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    job.setMapperClass(DistinctMapper.class);
    job.setReducerClass(DistinctReducer.class);
    job.setCombinerClass(DistinctReducer.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(IntWritable.class);
    job.setJarByClass(DistinctCounterJob.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    return job.waitForCompletion(true) ? 1 : 0;
    }
    public void setConf(Configuration conf) {
    this.conf = conf;
    }
    public Configuration getConf() {
    return conf;
    }
    }

    
    
  5. The map() function is implemented in the following code by extending mapreduce.Mapper:

    public static class DistinctMapper
    extends Mapper<LongWritable, Text, Text, IntWritable>
    {
    private static int col_pos;
    private static final Pattern pattern = Pattern.
    compile(“t”);
    private Text outKey = new Text();
    private static final IntWritable outValue = new
    IntWritable(1);
    @Override
    protected void setup(Context context
    ) throws IOException, InterruptedException {
    col_pos = context.getConfiguration().
    getInt(DistinctCounterJob.COL_POS, 0);
    }
    @Override
    protected void map(LongWritable key, Text value,
    Context context) throws IOException,
    InterruptedException {
    String field = pattern.split(value.toString())[col_
    pos];
    outKey.set(field);
    context.write(outKey, outValue);
    }
    }

    
    
  6. The reduce() function is implemented in the following code by extending mapreduce.Reducer:

    public static class DistinctReducer
    extends Reducer<Text, IntWritable, Text, IntWritable>
    {
    private IntWritable count = new IntWritable();
    @Override
    protected void reduce(Text key, Iterable<IntWritable>
    values, Context context
    ) throws IOException, InterruptedException {
    int total = 0;
    for(IntWritable value: values) {
    total += value.get();
    }
    count.set(total);
    context.write(key, count);
    }
    }

    
    
  7. The following command shows the sample usage against weblog data with column position number 4, which is the IP column:

    hadoop jar myJobs.jar distinct_counter /input/weblog/ /output/
    weblog_distinct_counter 4

    
    

How it works…

First we set up DistinctCounterJob to implement a Tool interface for remote submission. The static constant NAME is of potential use in the Hadoop Driver class, which supports the launching of different jobs from the same JAR file. The static constant COL_POS is initialized to the third required argument from the command line <element_position>. This value is set within the job configuration, and should match the position of the column you wish to count for each distinct entry. Supplying 4 will match the IP column for the weblog data.

Since we are reading and writing text, we can use the supplied TextInputFormat and TextOutputFormat classes. We will set the Mapper and Reduce classes to match our DistinctMapper and DistinctReducer implemented classes respectively. We also supply DistinctReducer as a combiner class. This decision is explained in more detail as follows:

It’s also very important to call setJarByClass() so that the TaskTrackers can properly unpack and find the Mapper and Reducer classes. The job uses the static helper methods on FileInputFormat and FileOutputFormat to set the input and output directories respectively. Now we’re set up and ready to submit the job.

The Mapper class sets up a few member variables as follows:

  • col_pos: This is initialized to a value supplied in the configuration. It allows users to change which column to parse and apply the count distinct operation on.
  • pattern: This defines the column’s split point for each row based on tabs.
  • outKey: This is a class member that holds output values. This avoids having to create a new instance for each output that is written.
  • outValue: This is an integer representing one occurrence of the given key. It is similar to the WordCount example.

The map() function splits each incoming line’s value and extracts the string located at col_ pos. We reset the internal value for outKey to the string found on that line’s position. For our example, this will be the IP value for the row. We emit the value of the newly reset outKey variable along with the value of outValue to mark one occurrence of that given IP address.

Without the assistance of the combiner, this would present the reducer with an iterable collection of 1s to be counted.

The following is an example of a reducer {key, value:[]} without a combiner:

{10.10.1.1, [1,1,1,1,1,1]} = six occurrences of the IP “10.10.1.1”.


The implementation of the reduce() method will sum the integers and arrive at the correct total, but there’s nothing that requires the integer values to be limited to the number 1. We can use a combiner to process the intermediate key-value pairs as they are output from each mapper and help improve the data throughput in the shuffle phase. Since the combiner is applied against the local map output, we may see a performance improvement as the amount of data we need to transfer for an intermediate key/value can be reduced considerably.

Instead of seeing {10.10.1.1, [1,1,1,1,1,1]}, the combiner can add the 1s and replace the value of the intermediate value for that key to {10.10.1.1, [6]}. The reducer can then sum the various combined values for the intermediate key and arrive at the same correct total. This is possible because addition is both a commutative and associative operation. In other words:

  • Commutative: The order in which we process the addition operation against the values has no effect on the final result. For example, 1 + 2 + 3 = 3 + 1 + 2.
  • Associative: The order in which we apply the addition operation has no effect on the final result. For example, (1 + 2) + 3 = 1 + (2 + 3).

For counting the occurrences of distinct IPs, we can use the same code in our reducer as a combiner for output in the map phase.

When applied to our problem, the normal output with no combiner from two separate independently running map tasks might look like the following where {key: value[]} is equal to the intermediate key-value collection:

  • Map Task A = {10.10.1.1, [1,1,1]} = three occurrences
  • Map Task B = {10.10.1.1, [1,1,1,1,1,1]} = six occurrences

Without the aid of a combiner, this will be merged in the shuffle phase and presented to a single reducer as the following key-value collection:

  • {10.10.1.1, [1,1,1,1,1,1,1,1,1]} = nine total occurrences

Now let’s revisit what would happen when using a Combiner against the exact same sample output:

Map Task A = {10.10.1.1, [1,1,1]} = three occurrences

  • Combiner = {10.10,1,1, [3] = still three occurrences, but reduced for this mapper.

Map Task B = {10.10.1.1, [1,1,1,1,1,1] = six occurrences

  • Combiner = {10.10.1.1, [6] = still six occurrences

Now the reducer will see the following for that key-value collection:

  • {10.10.1.1, [3,6]} = nine total occurrences

We arrived at the same total count for that IP address, but we used a combiner to limit the amount of network I/O during the MapReduce shuffle phase by pre-reducing the intermediate key-value output from each mapper.

There’s more…

The combiner can be confusing to newcomers. Here are some useful tips:

The Combiner does not always have to be the same class as your Reducer

The previous recipe and the default WordCount example show the Combiner class being initialized to the same implementation as the Reducer class. This is not enforced by the API, but ends up being common for many types of distributed aggregate operations such as sum(), min(), and max(). One basic example might be the min() operation of the Reducer class that specifically formats output in a certain way for readability. This will take a slightly different form from that of the min() operator of the Combiner class, which does not care about the specific output formatting.

Combiners are not guaranteed to run

Whether or not the framework invokes your combiner during execution depends on the intermediate spill file size from each map output, and is not guaranteed to run for every intermediate key. Your job should not depend on the combiner for correct results, it should be used only for optimization.

You can control the spill file threshold when MapReduce tries to combine intermediate values with the configuration property min.num.spills.for.combine.

Using Hive date UDFs to transform and sort event dates from geographic event data

This recipe will illustrate the efficient use of the Hive date UDFs to list the 20 most recent events and the number of days between the event date and the current system date.

Getting ready

Make sure you have access to a pseudo-distributed or fully-distributed Hadoop cluster with Apache Hive 0.7.1 installed on your client machine and on the environment path for the active user account.

This recipe depends on having the Nigera_ACLED_cleaned.tsv dataset loaded into a Hive table named acled_nigeria_cleaned with the fields mapped to the respective datatypes.

Issue the following command to the Hive client to see the mentioned fields:

describe acled_nigeria_cleaned


You should see the following response:

OK
Loc string
event_date string
event_type string
actor string
latitude double
longitude double
source string
fatalities int


How to do it…

Perform the following steps to utilize Hive UDFs for sorting and transformation:

  1. Open a text editor of your choice, ideally one with SQL syntax highlighting.
  2. Add the inline creation and transform syntax:

    SELECT event_type,event_date,days_since FROM (
    SELECT event_type,event_date,
    datediff(to_date(from_unixtime(unix_timestamp())),
    to_date(from_unixtime(
    unix_timestamp(event_date,
    ‘yyyy-MM-dd’)))) AS days_since
    FROM acled_nigeria_cleaned) date_differences
    ORDER BY event_date DESC LIMIT 20;

    
    
  3. Save the file as top_20_recent_events.sql in the active folder.
  4. Run the script from the operating system shell by supplying the –f option to the Hive client. You should see the following five rows appear first in the output console:

    OK
    Battle-No change of territory 2011-12-31 190
    Violence against civilians 2011-12-27 194
    Violence against civilians 2011-12-25 196
    Violence against civilians 2011-12-25 196
    Violence against civilians 2011-12-25 196

    
    

How it works…

Let’s start with the nested SELECT subqueries. We select three fields from our Hive table acled_nigeria_cleaned: event_type, event_date, and the result of calling the UDF datediff(), which takes as arguments an end date and a start date. Both are expected in the form yyyy-MM-dd. The first argument to datediff() is the end date, with which we want to represent the current system date. Calling unix_timestamp() with no arguments will return the current system time in milliseconds. We send that return value to from_ unixtimestamp() to get a formatted timestamp representing the current system date in the default Java 1.6 format (yyyy-MM-dd HH:mm:ss). We only care about the date portion, so calling to_date() with the output of this function strips the HH:mm:ss. The result is the current date in the yyyy-MM-dd form.

The second argument to datediff() is the start date, which for our query is the event_ date. The series of function calls operate in almost the exact same manner as our previous argument, except that when we call unix_timestamp(), we must tell the function that our argument is in the SimpleDateFormat format that is yyyy-MM-dd. Now we have both start_date and end_date arguments in the yyyy-MM-dd format and can perform the datediff() operation for the given row. We alias the output column of datediff() as days_since for each row.

The outer SELECT statement takes these three columns per row and sorts the entire output by event_date in descending order to get reverse chronological ordering. We arbitrarily limit the output to only the first 20.

The net result is the 20 most recent events with the number of days that have passed since that event occurred.

There’s more…

The date UDFs can help tremendously in performing string date comparisons. Here are some additional pointers:

Date format strings follow Java SimpleDateFormat guidelines

Check out the Javadocs for SimpleDateFormat to learn how your custom date strings can be used with the date transform UDFs.

Default date and time formats

  • Many of the UDFs operate under a default format assumption.
  • For UDFs requiring only date, your column values must be in the form yyyy-MM-dd.
  • For UDFs that require date and time, your column values must be in the form yyyy- MM-dd HH:mm:ss.

Using Hive to build a per-month report of fatalities over geographic event data

This recipe will show a very simple analytic that uses Hive to count fatalities for every month appearing in the dataset and print the results to the console.

Getting ready

Make sure you have access to a pseudo-distributed or fully-distributed Hadoop cluster with Apache Hive 0.7.1 installed on your client machine and on the environment path for the active user account.

This recipe depends on having the Nigera_ACLED_cleaned.tsv dataset loaded into a Hive table named acled_nigeria_cleaned with the following fields mapped to the respective datatypes.

Issue the following command to the Hive client:

describe acled_nigeria_cleaned


You should see the following response:

OK
loc string
event_date string
event_type string
actor string
latitude double
longitude double
source string
fatalities int


How to do it…

Follow the steps to use Hive for report generation:

  1. Open a text editor of your choice, ideally one with SQL syntax highlighting.
  2. Add the inline creation and transformation syntax:

    SELECT from_unixtime(unix_timestamp(event_date, ‘yyyy-MM-dd’),
    ‘yyyy-MMM’),
    COALESCE(CAST(sum(fatalities) AS STRING), ‘Unknown’)
    FROM acled_nigeria_cleaned
    GROUP BY from_unixtime(unix_timestamp(event_date, ‘yyyy-MMdd’),’
    yyyy-MMM’);

    
    
  3. Save the file as monthly_violence_totals.sql in the active folder.
  4. Run the script from the operating system shell by supplying the –f option to the Hive client. You should see the following three rows appear first in the output console. Note that the output is sorted lexicographically, and not on the order of dates.

    OK
    1997-Apr 115
    1997-Aug 4
    1997-Dec 26

    
    

How it works…

The SELECT statement uses unix_timestamp() and from_unixtime() to reformat the event_date for each row as just a year-month concatenated field. This is also in the GROUP BY expression for totaling fatalities using sum().

The coalesce() method returns the first non-null argument passed to it. We pass as the first argument, the value of fatalities summed for that given year-month, cast as a string. If that value is NULL for any reason, return the constant Unknown. Otherwise return the string representing the total fatalities counted for that year-month combination. Print everything to the console over stdout.

There’s more…

The following are some additional helpful tips related to the code in this recipe:

The coalesce() method can take variable length arguments.

As mentioned in the Hive documentation, coalesce() supports one or more arguments. The first non-null argument will be returned. This can be useful for evaluating several different expressions for a given column before deciding the right one to choose.

The coalesce() will return NULL if no argument is non-null. It’s not uncommon to provide a type literal to return if all other arguments are NULL.

Date reformatting code template

Having to reformat dates stored in your raw data is very common. Proper use of from_ unixtime() and unix_timestamp() can make your life much easier.

Remember this general code template for concise date format transformation in Hive:

from_unixtime(unix_timestamp(<col>,<in-format>),<out-format>);


LEAVE A REPLY

Please enter your comment!
Please enter your name here