Hadoop job setup
02 Dec 2015In today’s post, I’m going to refresh the information in some previous articles that I have written to bring them up to date.
Job, Mapper and Reducer
It’s pretty easy to bundle your job, mapper and reducer together. If they’re small enough, it makes sense to do so.
public class MyJobJob
extends Configured implements Tool {
public static class MyMapper
extends Mapper<LongWritable, Text, Text, Text> {
@Override
public void setup(Context context) {
/* setup any configs from the command line */
this.val = context.getConfiguration().get("some.value");
}
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
/* data selection and source filtering here */
}
}
public static class MyReducer
extends Reducer<Text, Text, NullWritable, Text> {
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
/* data aggregators here */
}
}
}
There isn’t much that has changed here. Reading the type annotations can be a little hairy. You can always lookup the documentation for Mapper and for Reducer. The type definitions are uniform:
class Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
ToolRunner
The ToolRunner class simplifies the execution management of a MapReduce job using the interface, Tool. Tool is a very simple interface, only providing implementing classes with a contract to run
. The run
method looks like this:
public abstract int run(java.lang.String[] args)
throws java.lang.Exception;
args
are supplied as usual from the main
method of the job. A typical implementation of the run
method will retrieve configuration information, setup the job and execute.
public int run(String[] allArgs) throws Exception {
Job job = Job.getInstance(getConf());
job.setJarByClass(MyJob.class);
// basic I/O shape setup
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
job.setOutputKeyClass(NullWritable.class);
job.setOutputValueClass(Text.class);
// map, combine, partition, reduce setup
job.setMapperClass(MyMapper.class);
job.setCombinerClass(MyCombiner.class);
job.setReducerClass(MyReducer.class);
job.setNumReduceTasks(1);
// parse options passed to the job
String[] args = new GenericOptionsParser(
getConf(), allArgs
).getRemainingArgs();
// set the files (from arguments)
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// wait for the jobs to finish
boolean status = job.waitForCompletion(true);
return status ? 0 : 1;
}
A special part of the magic here is wrapped up in the GenericOptionsParser which takes in the standard set of command line parameters and plumbs them directly into the job’s configuration.
Finishing up
So there are a couple of features that are provided for you with this wrapper around the run function. Your main
method ends up very simple:
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
ToolRunner.run(new MyJob(), args);
}
Your job is then invoked from the command line using the hadoop
command:
$HADOOP_PREFIX/bin/hadoop jar my-jobs-0.0.1-SNAPSHOT.jar \
org.example.MyMapReduceJob \
-D arbitrary.config.value=xyz \
/user/root/input-file.csv \
/user/root/output-dir