Cogs and Levers A blog full of technical stuff

Hadoop job setup

In today’s post, I’m going to refresh the information in some previous articles that I have written to bring them up to date.

Job, Mapper and Reducer

It’s pretty easy to bundle your job, mapper and reducer together. If they’re small enough, it makes sense to do so.

public class MyJobJob 
  extends Configured implements Tool {

  public static class MyMapper 
    extends Mapper<LongWritable, Text, Text, Text> {

    @Override 
    public void setup(Context context) {
      /* setup any configs from the command line */
      this.val = context.getConfiguration().get("some.value");
    }    

    public void map(LongWritable key, Text value, Context context) 
      throws IOException, InterruptedException {      
        /* data selection and source filtering here */
    }
    
  }
  
  public static class MyReducer 
    extends Reducer<Text, Text, NullWritable, Text> {
    
    public void reduce(Text key, Iterable<Text> values, Context context) 
      throws IOException, InterruptedException {
        /* data aggregators here */
    }
    
  }
}

There isn’t much that has changed here. Reading the type annotations can be a little hairy. You can always lookup the documentation for Mapper and for Reducer. The type definitions are uniform:

class Mapper<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>

ToolRunner

The ToolRunner class simplifies the execution management of a MapReduce job using the interface, Tool. Tool is a very simple interface, only providing implementing classes with a contract to run. The run method looks like this:

public abstract int run(java.lang.String[] args) 
  throws java.lang.Exception;

args are supplied as usual from the main method of the job. A typical implementation of the run method will retrieve configuration information, setup the job and execute.

public int run(String[] allArgs) throws Exception {
  
  Job job = Job.getInstance(getConf());

  job.setJarByClass(MyJob.class);

  // basic I/O shape setup 
  job.setInputFormatClass(TextInputFormat.class);
  job.setOutputFormatClass(TextOutputFormat.class);
  job.setOutputKeyClass(NullWritable.class);
  job.setOutputValueClass(Text.class);

  // map, combine, partition, reduce setup 
  job.setMapperClass(MyMapper.class);
  job.setCombinerClass(MyCombiner.class);
  job.setReducerClass(MyReducer.class);
  job.setNumReduceTasks(1);
  
  // parse options passed to the job      
  String[] args = new GenericOptionsParser(
    getConf(), allArgs
  ).getRemainingArgs();
  
  // set the files (from arguments)
  FileInputFormat.setInputPaths(job, new Path(args[0]));
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
  // wait for the jobs to finish
  boolean status = job.waitForCompletion(true);
  return status ? 0 : 1;
}

A special part of the magic here is wrapped up in the GenericOptionsParser which takes in the standard set of command line parameters and plumbs them directly into the job’s configuration.

Finishing up

So there are a couple of features that are provided for you with this wrapper around the run function. Your main method ends up very simple:

public static void main(String[] args) throws Exception {
  Configuration conf = new Configuration();
  ToolRunner.run(new MyJob(), args);
}

Your job is then invoked from the command line using the hadoop command:

$HADOOP_PREFIX/bin/hadoop jar my-jobs-0.0.1-SNAPSHOT.jar \
  org.example.MyMapReduceJob \
  -D arbitrary.config.value=xyz \
  /user/root/input-file.csv \
  /user/root/output-dir

Make a symbolic link

As a small reminder to myself on creating symbolic links

ls -s /path/to/the/thing /path/to/the/link

Encoding information in prime numbers

An interesting part of encryption theory is the ability to encode a message using prime numbers. It’s not the most efficient way to represent a message, but it does exhibit some interesting properties.

Hello

Take the message “HELLO” for instance. Here it is along with the ASCII values for each character.

H  E  L  L  O
72 69 76 76 79 

If we assign each character of our message a prime (as they ascend in sequence):

2  3  5  7  11
H  E  L  L  O
72 69 76 76 79 

We can encode this message using these prime numbers like so:

(2^72) * (3^69) * (5^76) * (7^76) * (11^79) =

1639531486723067852359816964623169016543137549
4122401687192804219102815235735638642399170444
5066082282398711507312101674742952521828622795
1778467808618104090241918575825850806280956250
0000000000000000000000000000000000000000000000
0000000000000000000000000 

That massive number is our encoded message.

Adjusting the message

You can add a letter to the message, just by multiplying in another value:

 H        E        L        L        O         O
(2^72) * (3^69) * (5^76) * (7^76) * (11^79) * (13^79) 

Commutatively, we can remove a character from our message just by dividing the encoded message. To remove the E from our message, we’d divide the encoded message by 3^69.

The guessing game

As there’s no encryption involved with this process, it’s purely encoding; all someone needs to do is factor out your message. From there they can gain the ASCII codes and positions to be able to read your message.

Custom text delimiters in Hadoop jobs

Hadoop will process your data, line for line, splitting it on the \n newline character to send out to your mappers. In today’s post, I’ll demonstrate the ussage of the textinputformat.record.delimiter setting so that your Hadoop jobs can process different data structures.

Configuration

When you’re first setting up your job, you’ll create a Configuration object. This object has arbitrary settings that can be applied to use through the use of the set method. To make a job work on a delimiter of ---, you’d use the following:

Configuration conf = new Configuration();
conf.set("textinputformat.record.delimiter", "---");

From here, there’s no change to your code. Here’s a very simple map reduce module that is using the custom format.

Mining data with Hive

Apache Hive is a database analytics technology that can be used to mine, structured, well formatted data. From the website:

The Apache Hive™ data warehouse software facilitates querying and managing large datasets residing in distributed storage. Hive provides a mechanism to project structure onto this data and query the data using a SQL-like language called HiveQL. At the same time this language also allows traditional map/reduce programmers to plug in their custom mappers and reducers when it is inconvenient or inefficient to express this logic in HiveQL.

In today’s post, I’m going to walk through getting up and running to your first query with Hive.

CSV

Probably the easiest place to start, is a CSV file. Information in the file has its fields terminated by a comma , and lines by a newline \n. The example that I’ll use to day contains the following data:

id,first_name,last_name,age,country
1,John,Smith,24,ZA
2,Katie,Brown,27,AU
3,Stacey,Green,21,NZ
4,Joe,Taylor,34,US
5,Bob,Smith,20,US

Before Hive can get its hands on this information, we’ll need to make it available to cluster by uploading it to HDFS.

bin/hadoop fs -put people.csv /user/root/people.csv

Now we can startup Hive and create the table structure that we’ll be working with.

hive> CREATE TABLE people (
    > id INT,
    > first_name STRING,
    > last_name STRING,
    > age INT,
    > country STRING
    > ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
    > STORED AS TEXTFILE
    > TBLPROPERTIES ("skip.header.line.count"="1");
OK
Time taken: 1.195 seconds

There’s a fair bit in this data definition. The full documentation on Hive’s DDL can be found here. There are so many ways that you can accomplish things, and the example that I’ve listed is very simple.

ROW FORMAT DELIMITED tells Hive to use the default SerDe. We could have specified a regular expression here to interpret a line of the file, or specified our own custom SerDe but because this is so standard we only needed a field delimiter which is denoted by the FIELDS TERMINATED BY. There is also a LINES TERMINATED BY should you need to specify something other than \n as the terminator.

STORED AS TEXTFILE is the default. Our data is being stored in textfiles. Finally, TBLPROPERTIES allows arbitrary information to be applied to the create. We just wanted to tell the table that the first line in the files that it’ll encounter should be discarded as it’s the header line.

Load in the data

Now that we’ve built a data structure, we can now put some data in it.

hive> LOAD DATA INPATH '/user/root/people.csv' OVERWRITE INTO TABLE people;
Loading data to table default.people
Table default.people stats: [numFiles=1, numRows=0, totalSize=133, rawDataSize=0]

We’re good to run that first query now!

hive> SELECT * FROM people;
OK
1 John  Smith 24  ZA
2 Katie Brown 27  AU
3 Stacey  Green 21  NZ
4 Joe Taylor  34  US
5 Bob Smith 20  US
Time taken: 0.277 seconds, Fetched: 5 row(s)

Once we involve aggregates, these queries start to get submitted at MapReduce jobs:

hive> SELECT country, AVG(age)
    > FROM people
    > GROUP BY country;
Query ID = root_20151122062444_77533aaa-5c95-4d2e-8742-b3891226c393
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1448188576608_0002, Tracking URL = http://d62d018a5b3f:8088/proxy/application_1448188576608_0002/
Kill Command = /usr/local/hadoop/bin/hadoop job  -kill job_1448188576608_0002
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2015-11-22 06:24:49,989 Stage-1 map = 0%,  reduce = 0%
2015-11-22 06:24:56,214 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 1.08 sec
2015-11-22 06:25:02,440 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 2.68 sec
MapReduce Total cumulative CPU time: 2 seconds 680 msec
Ended Job = job_1448188576608_0002
MapReduce Jobs Launched: 
Stage-Stage-1: Map: 1  Reduce: 1   Cumulative CPU: 2.68 sec   HDFS Read: 8041 HDFS Write: 32 SUCCESS
Total MapReduce CPU Time Spent: 2 seconds 680 msec
OK
AU  27.0
NZ  21.0
US  27.0
ZA  24.0
Time taken: 19.166 seconds, Fetched: 4 row(s)

Next steps

The examples page on the Hive site has some more complex data definitions, including being able to specify your own SerDe using python as well as processing an Apache web server log.