Cogs and Levers A blog full of technical stuff

Hadoop job setup

In today’s post, I’m going to refresh the information in some previous articles that I have written to bring them up to date.

Job, Mapper and Reducer

It’s pretty easy to bundle your job, mapper and reducer together. If they’re small enough, it makes sense to do so.

public class MyJobJob 
  extends Configured implements Tool {

  public static class MyMapper 
    extends Mapper<LongWritable, Text, Text, Text> {

    @Override 
    public void setup(Context context) {
      /* setup any configs from the command line */
      this.val = context.getConfiguration().get("some.value");
    }    

    public void map(LongWritable key, Text value, Context context) 
      throws IOException, InterruptedException {      
        /* data selection and source filtering here */
    }
    
  }
  
  public static class MyReducer 
    extends Reducer<Text, Text, NullWritable, Text> {
    
    public void reduce(Text key, Iterable<Text> values, Context context) 
      throws IOException, InterruptedException {
        /* data aggregators here */
    }
    
  }
}

There isn’t much that has changed here. Reading the type annotations can be a little hairy. You can always lookup the documentation for Mapper and for Reducer. The type definitions are uniform:

class Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>

ToolRunner

The ToolRunner class simplifies the execution management of a MapReduce job using the interface, Tool. Tool is a very simple interface, only providing implementing classes with a contract to run. The run method looks like this:

public abstract int run(java.lang.String[] args) 
  throws java.lang.Exception;

args are supplied as usual from the main method of the job. A typical implementation of the run method will retrieve configuration information, setup the job and execute.

public int run(String[] allArgs) throws Exception {
  
  Job job = Job.getInstance(getConf());

  job.setJarByClass(MyJob.class);

  // basic I/O shape setup 
  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);
  job.setOutputKeyClass(NullWritable.class);
  job.setOutputValueClass(Text.class);

  // map, combine, partition, reduce setup 
  job.setMapperClass(MyMapper.class);
  job.setCombinerClass(MyCombiner.class);
  job.setReducerClass(MyReducer.class);
  job.setNumReduceTasks(1);
  
  // parse options passed to the job      
  String[] args = new GenericOptionsParser(
    getConf(), allArgs
  ).getRemainingArgs();
  
  // set the files (from arguments)
  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
  // wait for the jobs to finish
  boolean status = job.waitForCompletion(true);
  return status ? 0 : 1;
}

A special part of the magic here is wrapped up in the GenericOptionsParser which takes in the standard set of command line parameters and plumbs them directly into the job’s configuration.

Finishing up

So there are a couple of features that are provided for you with this wrapper around the run function. Your main method ends up very simple:

public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  ToolRunner.run(new MyJob(), args);
}

Your job is then invoked from the command line using the hadoop command:

$HADOOP_PREFIX/bin/hadoop jar my-jobs-0.0.1-SNAPSHOT.jar \
  org.example.MyMapReduceJob \
  -D arbitrary.config.value=xyz \
  /user/root/input-file.csv \
  /user/root/output-dir