HPC MapReduce Exercise: Hands-On Lab


Introduction

In the MapReduce framework, a job reads the text from a set of text files distributed across a cluster's nodes via the distributed file system. A "mapper" process converts the input to a set of <key, value> pairs. A "shuffle/sorter" process sorts these pairs by their key values. A "reducer" process then combines these sorted pairs in a way that solves a problem, and typically produces a different set of <key, value> pairs, which is the output of the job.

The basic steps of a job are thus:
(input)
-> splitting -> <k1, v1>
-> mapping -> <k2, v2>
-> shuffling/sorting -> <k2, v2>
-> reducing -> <k3, v3>
-> (output)

To make this a bit less abstract, we will be creating a Java program that counts the number of occurrences of each word in a set of text files, which we might visualize as follows:
Using MapReduce to count the words in a distributed set of files
This WordCount problem is often used as the "hello world" example for MapReduce computations.

Getting Started

If necessary, login to dahl.calvin.edu in the usual way, There, create a folder for this project; then copy the following files from /home/cs/374/exercises/12/ to your project folder:

Using your favorite text editor, take a few minutes to open these files and familirize yourself with their contents before proceeding.

Using Hadoop

  1. Create some directories in HDFS. Let's begin by creating some directories to store the input and output files our WordCount program will use. To do so, enter:
          hadoop fs -mkdir 01.wordcount
          hadoop fs -mkdir 01.wordcount/input 
    If you have worked through our HDFS Tutorial, it should be apparent that the first command creates a directory in the HDFS file system named 01.wordcount; and the second command creates a subdirectory of 01.wordcount named input.
  2.  
  3. Copy our program's input files into HDFS. Enter:
          hadoop fs -copyFromLocal simple*.txt 01.wordcount/input/
          hadoop fs -ls 01.wordcount/input/
    The first command copies both simple1.txt and simple2.txt from our host file system to the 01.wordcount/input/ folder we created in the previous step. We use the second command to verify that the first command succeeded.
  4.  
  5. Creating our Hadoop program. Hadoop natively runs compiled Java programs, so we need to complete WordCounter.java in order to solve our problem. This involves three steps: (a) creating the mapper class that takes the input and produces <key, value> pairs; (b) creating the reducer class that takes the <key, value> pairs produced by the mapper and combined them as appropriate to solve the problem; and (c) creating the WordCouner's main() function to use our mapper and reducer. Taking these one at a time:

    1. Creating the Mapper Class. Inside WordCounter.java, find the comment:
              // replace this comment with the mapper class
      and replace it with the following class:
           /* Our 'mapper' class emits (word, "1") pairs,
            *  for later counting by our 'reducer' class.
            */
           public static class TokenizerMapper
                                extends Mapper<Object, Text, Text, IntWritable>{
      
             public void map(Object key, Text value, Context context) 
                             throws IOException, InterruptedException {
      
               StringTokenizer itr = new StringTokenizer( value.toString() );
      
               while ( itr.hasMoreTokens() ) {
                  word.set( itr.nextToken() );
                  context.write(word, one);
               }
             }
      
             private final static IntWritable one = new IntWritable(1);
             private Text word = new Text();
           }
      As you can see, this class extends a Hadoop generic class named Mapper, and overrides its map() method. This map() method has three parameters: a key (which our method does not use, but could), a value (consisting of a line of text from a file), and a context (to which we will write our results), all of which are passed to it by Hadoop.

      The body of the method tokenizes the value parameter into words, and then uses a loop to iterate through those words. For each word, the loop writes the pair (word, "1") to the context. When the loop completes, our mapper has done its work!

    2.  
    3. Creating the Reducer Class. Still inside WordCounter.java, find the comment:
              // replace this comment with the reducer class
      and replace it with the following class:
            /* Our 'reducer' class receives a sequence of (word, "1") pairs
            *  and for each word in the sequence, adds up its "1"s...
            */
            public static class IntSumReducer
                                  extends Reducer<Text,IntWritable,Text,IntWritable> {
      
              public void reduce(Text key, Iterable<IntWritable> values, Context context) 
                                   throws IOException, InterruptedException {
                  int sum = 0;
      
                  for (IntWritable val : values) {
                      sum += val.get();
                  }
                  result.set(sum);
                  context.write(key, result);
              }
      
              private IntWritable result = new IntWritable();
           }
      This class extends the Hadoop Reducer class, overrides its reduce() method. Like the map() method, the reduce() method takes three parameters: a key, which is one of the word values emitted by the mapper phase; a values, which is an iterable sequence of the values that were combined with that key; and a context to which the reducer will write its results.

      The body of the method declares a sum variable to keep track of the number of occurences of the key, and then uses a loop to iterate through the values. Since the mapper produces "1" values, each val in values is "1", so the loop converts that "1" from a string to the int 1 and adds it to sum. When control leaves the loop, the method sets instance variable result to sum and writes the pair (key, result) to the context.

      That completes our reducer!

    4.  
    5. Creating the main() method. Find the comment:
             // replace this line with the main() method
      and replace it with the following main() method:
             public static void main(String[] args) throws Exception {
                Configuration conf = new Configuration();
      
                Job job = Job.getInstance(conf, "word count");
                job.setJarByClass(WordCounter.class);
                job.setMapperClass(TokenizerMapper.class);
                job.setCombinerClass(IntSumReducer.class);
                job.setReducerClass(IntSumReducer.class);
                job.setOutputKeyClass(Text.class);
                job.setOutputValueClass(IntWritable.class);
      
                FileInputFormat.addInputPath(job, new Path(args[0]));
                FileOutputFormat.setOutputPath(job, new Path(args[1]));
      
                System.exit( job.waitForCompletion(true) ? 0 : 1 );
             } 
      The main() method is responsible for configuring Hadoop as necessary to run the computation.

      It starts by creating a new Configuration object for the computation.

      It then creates a new Job for that Configuration, and gives it a name. It then configures the Job, giving it the name of our computation-class (WordCounter), the name of its mapper and reducer classes, and the kind of classes it uses for its (key, value) pairs.

      It then tells Hadoop the paths to the input and output folders.

      Finally, it waits for the Job to complete, producing an exit status of 0 for normal termination and 1 for abnormal termination.

    6.  
    With that, our program is complete and we are ready to compile.
  6.  
  7. Compiling our Java program into a .jar file for Hadoop. Hadoop natively runs compiled Java programs, so we need to compile our source code, along with some required Hadoop Core libraries, to submit to Hadoop to run. Before we can do this though, we need to make sure we have correctly defined some necessary environment variables.

    1. Check environment variables. While most of the needed environment variables should be set by default, it is a good idea to double-check that they exist before continuing. Enter:
               env
      Check the output and make sure that JAVA_HOME and HADOOP_CLASSPATH are defined. If either is not set, run the appropriate one of the following commands to set it.
               export JAVA_HOME=/usr/java/jdk1.8.0_60
               export HADOOP_CLASSPATH=${JAVA_HOME}/lib/tools.jar
    2.  
    3. Compile your Java program into a JAR. This involves two steps: First, we need to compile the WordCount.java file into its constituent .class files. Second, we need to package all of these classes together into a single Java archive (JAR) file for Hadoop to run.

      To simplify this, we have provided a Makefile, so just run make to build the JAR file. You should see the following commands execute:

               hadoop com.sun.tools.javac.Main WordCounter.java
               jar cf WordCounter.jar WordCounter*.class
      Then use the ls command to view the files this created.

      Notice that in executing the first command, the compiler created multiple .class files, for our WordCounter class, but also for its inner IntSumReducer and TokenizerMapper classes.

      The second command then took those .class files and combined them into the WordCounter.jar JAR file.

  8.  
  9. Submit your first Hadoop/MapReduce job. Since we have our input files and our JAR file, we are ready to run our first Hadoop application! We will instruct Hadoop to: (i) run our WordCounter.jar file, (ii) use the WordCounter class it contains (i.e., run its main() function), (iii) read the input files from HDFS, and (iv) write the results to a new directory Hadoop will create. (If you instruct Hadoop to output to an existing HDFS directory, a Java exception will be thrown and your program will terminate.)

    1. Submit the job. Enter the following command, substituting your username:
                hadoop jar WordCounter.jar WordCounter /user/yourUserName/01.wordcount/input /user/yourUserName/01.wordcount/output
      Trace through hadoop's output until it completes, and make sure that some file output bytes have been written.
    2.  
    3. Check the job output. We told Hadoop to write the output into HDFS to the /user/.../01.wordcount/output/ directory, so check what was created by entering:
                hadoop fs -ls 01.wordcount/output/
      If all is well, you should see two files listed: 01.wordcount/output/_SUCCESS and 01.wordcount/output/part-r-00000.

      To view the results of our computation, "cat" the part-r-00000 file:

                hadoop fs -cat 01.wordcount/output/part-r-00000
      The output in that file should be a count of each word, reflecting the number of combined occurences in our two input files.

      Verify that the output is correct by looking at our simple1.txt and simple2.txt input files, and if the results make sense, you're done!

Congratulations! You've just created, compiled, and run your first Hadoop job!

To summarize what we've done, we have seen:

Hopefully this simple example is enough to get you started using Hadoop to process larger quantities of data.

To dig deeper, check out Hadoop Illuminated, a free, open source book on Hadoop.

Good luck!

There is no project this week, aside from working on your final project.


CS > 374 > Exercise > 12 > Hands-On Lab


This page maintained by Joel Adams.