17 min read

(For more resources related to this topic, see here.)

Key/value pairs

Here we will explain why some operations process and provide the output in terms of key/value pair.

What it mean

Firstly, we will clarify just what we mean by key/value pairs by highlighting similar concepts in the Java standard library. The java.util.Map interface is the parent of commonly used classes such as HashMap and (through some library backward reengineering) even the original Hashtable.

For any Java Map object, its contents are a set of mappings from a given key of a specified type to a related value of a potentially different type. A HashMap object could, for example, contain mappings from a person’s name (String) to his or her birthday (Date).

In the context of Hadoop, we are referring to data that also comprises keys that relate to associated values. This data is stored in such a way that the various values in the data set can be sorted and rearranged across a set of keys. If we are using key/value data, it will make sense to ask questions such as the following:

  • Does a given key have a mapping in the data set?
  • What are the values associated with a given key?
  • What is the complete set of keys?

We will go into Wordcount in detail shortly, but the output of the program is clearly a set of key/value relationships; for each word (the key), there is a count (the value) of its number of occurrences. Think about this simple example and some important features of key/value data will become apparent, as follows:

  • Keys must be unique but values need not be
  • Each value must be associated with a key, but a key could have no values (though not in this particular example)
  • Careful definition of the key is important; deciding on whether or not the counts are applied with case sensitivity will give different results

Note that we need to define carefully what we mean by keys being unique here. This does not mean the key occurs only once; in our data set we may see a key occur numerous times and, as we shall see, the MapReduce model has a stage where all values associated with each key are collected together. The uniqueness of keys guarantees that if we collect together every value seen for any given key, the result will be an association from a single instance of the key to every value mapped in such a way, and none will be omitted.

Why key/value data?

Using key/value data as the foundation of MapReduce operations allows for a powerful programming model that is surprisingly widely applicable, as can be seen by the adoption of Hadoop and MapReduce across a wide variety of industries and problem scenarios. Much data is either intrinsically key/value in nature or can be represented in such a way. It is a simple model with broad applicability and semantics straightforward enough that programs defined in terms of it can be applied by a framework like Hadoop.

Of course, the data model itself is not the only thing that makes Hadoop useful; its real power lies in how it uses the techniques of parallel execution, and divide and conquer. We can have a large number of hosts on which we can store and execute data and even use a framework that manages the division of the larger task into smaller chunks, and the combination of partial results into the overall answer. But we need this framework to provide us with a way of expressing our problems that doesn’t require us to be an expert in the execution mechanics; we want to express the transformations required on our data and then let the framework do the rest. MapReduce, with its key/value interface, provides such a level of abstraction, whereby the programmer only has to specify these transformations and Hadoop handles the complex process of applying this to arbitrarily large data sets.

Some real-world examples

To become less abstract, let’s think of some real-world data that is key/value pair:

  • An address book relates a name (key) to contact information (value)
  • A bank account uses an account number (key) to associate with the account details (value)
  • The index of a book relates a word (key) to the pages on which it occurs (value)
  • On a computer filesystem, filenames (keys) allow access to any sort of data, such as text, images, and sound (values)

These examples are intentionally broad in scope, to help and encourage you to think that key/value data is not some very constrained model used only in high-end data mining but a very common model that is all around us.

We would not be having this discussion if this was not important to Hadoop. The bottom line is that if the data can be expressed as key/value pairs, it can be processed by MapReduce.

MapReduce as a series of key/value transformations

You may have come across MapReduce described in terms of key/value transformations, in particular the intimidating one looking like this:

{K1,V1} -> {K2, List<V2>} -> {K3,V3}


We are now in a position to understand what this means:

  • The input to the map method of a MapReduce job is a series of key/value pairs that we’ll call K1 and V1.
  • The output of the map method (and hence input to the reduce method) is a series of keys and an associated list of values that are called K2 and V2. Note that each mapper simply outputs a series of individual key/value outputs; these are combined into a key and list of values in the shuffle method.
  • The final output of the MapReduce job is another series of key/value pairs, called K3 and V3

These sets of key/value pairs don’t have to be different; it would be quite possible to input, say, names and contact details and output the same, with perhaps some intermediary format used in collating the information. Keep this three-stage model in mind as we explore the Java API for MapReduce next. We will first walk through the main parts of the API you will need and then do a systematic examination of the execution of a MapReduce job.

The Hadoop Java API for MapReduce

Hadoop underwent a major API change in its 0.20 release, which is the primary interface in the 1.0 version. Though the prior API was certainly functional, the community felt it was unwieldy and unnecessarily complex in some regards.

The new API, sometimes generally referred to as context objects, for reasons we’ll see later, is the future of Java’s MapReduce development. Note that caveat: there are parts of the pre-0.20 MapReduce libraries that have not been ported to the new API, so we will use the old interfaces when we need to examine any of these.

The 0.20 MapReduce Java API

The 0.20 and above versions of MapReduce API have most of the key classes and interfaces either in the org.apache.hadoop.mapreduce package or its subpackages.

In most cases, the implementation of a MapReduce job will provide job-specific subclasses of the Mapper and Reducer base classes found in this package.

We’ll stick to the commonly used K1/K2/K3/ and so on terminology, though more recently the Hadoop API has, in places, used terms such as KEYIN/VALUEIN and KEYOUT/VALUEOUT instead. For now, we will stick with K1/K2/K3 as it helps understand the end-to-end data flow.

The Mapper class

This is a cut-down view of the base Mapper class provided by Hadoop. For our own mapper implementations, we will subclass this base class and override the specified method as follows:

class Mapper<K1, V1, K2, V2> { void map(K1 key, V1 value Mapper.Context context) throws IOException, InterruptedException {..} }


Although the use of Java generics can make this look a little opaque at first, there is actually not that much going on. The class is defined in terms of the key/value input and output types, and then the map method takes an input key/value pair in its parameters. The other parameter is an instance of the Context class that provides various mechanisms to communicate with the Hadoop framework, one of which is to output the results of a map or reduce method.

Notice that the map method only refers to a single instance of K1 and V1 key/ value pairs. This is a critical aspect of the MapReduce paradigm in which you write classes that process single records and the framework is responsible for all the work required to turn an enormous data set into a stream of key/ value pairs. You will never have to write map or reduce classes that try to deal with the full data set. Hadoop also provides mechanisms through its InputFormat and OutputFormat classes that provide implementations of common file formats and likewise remove the need of having to write file parsers for any but custom file types.

There are three additional methods that sometimes may be required to be overridden.

protected void setup( Mapper.Context context) throws IOException, Interrupted Exception


This method is called once before any key/value pairs are presented to the map method. The default implementation does nothing.

protected void cleanup( Mapper.Context context) throws IOException, Interrupted Exception


This method is called once after all key/value pairs have been presented to the map method. The default implementation does nothing.

protected void run( Mapper.Context context) throws IOException, Interrupted Exception


This method controls the overall flow of task processing within a JVM. The default implementation calls the setup method once before repeatedly calling the map method for each key/value pair in the split, and then finally calls the cleanup method .

The Reducer class

The Reducer base class works very similarly to the Mapper class, and usually requires only subclasses to override a single reduce method . Here is the cut-down class definition:

public class Reducer<K2, V2, K3, V3> { void reduce(K1 key, Iterable<V2> values, Reducer.Context context) throws IOException, InterruptedException {..} }


Again, notice the class definition in terms of the broader data flow (the reduce method accepts K2/V2 as input and provides K3/V3 as output) while the actual reduce method takes only a single key and its associated list of values. The Context object is again the mechanism to output the result of the method.

This class also has the setup, run, and cleanup methods with similar default implementations as with the Mapper class that can optionally be overridden:

protected void setup( Reduce.Context context) throws IOException, InterruptedException


This method is called once before any key/lists of values are presented to the reduce method. The default implementation does nothing.

protected void cleanup( Reducer.Context context) throws IOException, InterruptedException


This method is called once after all key/lists of values have been presented to the reduce method. The default implementation does nothing.

protected void run( Reducer.Context context) throws IOException, InterruptedException


This method controls the overall flow of processing the task within JVM. The default implementation calls the setup method before repeatedly calling the reduce method for as many key/values provided to the Reducer class, and then finally calls the cleanup method.

The Driver class

Although our mapper and reducer implementations are all we need to perform the MapReduce job, there is one more piece of code required: the driver that communicates with the Hadoop framework and specifies the configuration elements needed to run a MapReduce job. This involves aspects such as telling Hadoop which Mapper and Reducer classes to use, where to find the input data and in what format, and where to place the output data and how to format it.

There is no default parent Driver class as a subclass; the driver logic usually exists in the main method of the class written to encapsulate a MapReduce job. Take a look at the following code snippet as an example driver. Don’t worry about how each line works, though you should be able to work out generally what each is doing:

public class ExampleDriver { … public static void main(String[] args) throws Exception { // Create a Configuration object that is used to set other options Configuration conf = new Configuration() ; // Create the object representing the job Job job = new Job(conf, “ExampleJob”) ; // Set the name of the main class in the job jarfile job.setJarByClass(ExampleDriver.class) ; // Set the mapper class job.setMapperClass(ExampleMapper.class) ; // Set the reducer class job.setReducerClass(ExampleReducer.class) ; // Set the types for the final output key and value job.setOutputKeyClass(Text.class) ; job.setOutputValueClass(IntWritable.class) ; // Set input and output file paths FileInputFormat.addInputPath(job, new Path(args[0])) ; FileOutputFormat.setOutputPath(job, new Path(args[1])) // Execute the job and wait for it to complete System.exit(job.waitForCompletion(true) ? 0 : 1); } }}


Given our previous talk of jobs, it is not surprising that much of the setup involves operations on a Job object. This includes setting the job name and specifying which classes are to be used for the mapper and reducer implementations.

Certain input/output configurations are set and, finally, the arguments passed to the main method are used to specify the input and output locations for the job. This is a very common model that you will see often.

There are a number of default values for configuration options, and we are implicitly using some of them in the preceding class. Most notably, we don’t say anything about the file format of the input files or how the output files are to be written. These are defined through the InputFormat and OutputFormat classes mentioned earlier; we will explore them in detail later. The default input and output formats are text files that suit our WordCount example. There are multiple ways of expressing the format within text files in addition to particularly optimized binary formats.

A common model for less complex MapReduce jobs is to have the Mapper and Reducer classes as inner classes within the driver. This allows everything to be kept in a single file, which simplifies the code distribution.

Writing MapReduce programs

We have been using and talking about WordCount for quite some time now; let’s actually write an implementation, compile, and run it, and then explore some modifications.

Time for action – setting up the classpath

To compile any Hadoop-related code, we will need to refer to the standard Hadoop-bundled classes.

Add the Hadoop-1.0.4.core.jar file from the distribution to the Java classpath as follows:

$ export CLASSPATH=.:${HADOOP_HOME}/Hadoop-1.0.4.core.jar:${CLASSPATH}


What just happened?

This adds the Hadoop-1.0.4.core.jar file explicitly to the classpath alongside the current directory and the previous contents of the CLASSPATH environment variable.

Once again, it would be good to put this in your shell startup file or a standalone file to be sourced.

We will later need to also have many of the supplied third-party libraries that come with Hadoop on our classpath, and there is a shortcut to do this. For now, the explicit addition of the core JAR file will suffice.

Time for action – implementing WordCount

We will explore our own Java implementation by performing the following steps:

  1. Enter the following code into the WordCount1.java file:

    Import java.io.* ; import org.apache.hadoop.conf.Configuration ; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class WordCount1 { public static class WordCountMapper extends Mapper<Object, Text, Text, IntWritable> { private final static IntWritable one = new IntWritable(1); private Text word = new Text(); public void map(Object key, Text value, Context context ) throws IOException, InterruptedException { String[] words = value.toString().split(” “) ; for (String str: words) { word.set(str); context.write(word, one); } } } public static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable> { public void reduce(Text key, Iterable<IntWritable> values, Context context ) throws IOException, InterruptedException { int total = 0; for (IntWritable val : values) { total++ ; } context.write(key, new IntWritable(total)); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = new Job(conf, “word count”); job.setJarByClass(WordCount1.class); job.setMapperClass(WordCountMapper.class); job.setReducerClass(WordCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path(args[0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

    
    
  2. Now compile it by executing the following command:

    $ javac WordCount1.java

    
    

What just happened?

This is our first complete MapReduce job. Look at the structure and you should recognize the elements we have previously discussed: the overall Job class with the driver configuration in its main method and the Mapper and Reducer implementations defined as inner classes.

We’ll do a more detailed walkthrough of the mechanics of MapReduce in the next section, but for now let’s look at the preceding code and think of how it realizes the key/value transformations we talked about earlier.

The input to the Mapper class is arguably the hardest to understand, as the key is not actually used. The job specifies TextInputFormat as the format of the input data and, by default, this delivers to the mapper data where the key is the line number in the file and the value is the text of that line. In reality, you may never actually see a mapper that uses that line number key, but it is provided.

The mapper is executed once for each line of text in the input source and every time it takes the line and breaks it into words. It then uses the Context object to output (more commonly known as emitting) each new key/value of the form <word, 1 >. These are our K2/V2 values.

We said before that the input to the reducer is a key and a corresponding list of values, and there is some magic that happens between the map and reduce methods to collect together the values for each key that facilitates this, which we’ll not describe right now. Hadoop executes the reducer once for each key and the preceding reducer implementation simply counts the numbers in the Iterable object and gives output for each word in the form of <word, count>. This is our K3/V3 values.

Take a look at the signatures of our mapper and reducer classes: the WordCountMapper class gives IntWritable and Text as input and gives Text and IntWritable as output. The WordCountReducer class gives Text and IntWritableboth as input and output. This is again quite a common pattern, where the map method performs an inversion on the key and values, and instead emits a series of data pairs on which the reducer performs aggregation.

The driver is more meaningful here, as we have real values for the parameters. We use arguments passed to the class to specify the input and output locations.

Time for action – building a JAR file

Before we run our job in Hadoop, we must collect the required class files into a single JAR file that we will submit to the system.

Create a JAR file from the generated class files.

$ jar cvf wc1.jar WordCount1*class


What just happened?

We must always package our class files into a JAR file before submitting to Hadoop, be it local or on Elastic MapReduce.

Be careful with the JAR command and file paths. If you include in a JAR file class the files from a subdirectory, the class may not be stored with the path you expect. This is especially common when using a catch-all classes directory where all source data gets compiled. It may be useful to write a script to change into the directory, convert the required files into JAR files, and move the JAR files to the required location.

Time for action – running WordCount on a local Hadoop cluster

Now we have generated the class files and collected them into a JAR file, we can run the application by performing the following steps:

  1. Submit the new JAR file to Hadoop for execution.

    $ hadoop jar wc1.jar WordCount1 test.txt output

    
    
  2. Check the output file; it should be as follows:

    $ Hadoop fs –cat output/part-r-00000 This 1 yes 1 a 1 is 2 test 1 this 1

    
    

What just happened?

This is the first time we have used the Hadoop JAR command with our own code. There are four arguments:

  1. The name of the JAR file.
  2. The name of the driver class within the JAR file.
  3. The location, on HDFS, of the input file (a relative reference to the /user/Hadoop home folder, in this case).
  4. The desired location of the output folder (again, a relative path).

The name of the driver class is only required if a main class has not (as in this case) been specified within the JAR file manifest.

LEAVE A REPLY

Please enter your comment!
Please enter your name here