Cogs and Levers A blog full of technical stuff

Custom text delimiters in Hadoop jobs

Hadoop will process your data, line for line, splitting it on the \n newline character to send out to your mappers. In today’s post, I’ll demonstrate the ussage of the textinputformat.record.delimiter setting so that your Hadoop jobs can process different data structures.

Configuration

When you’re first setting up your job, you’ll create a Configuration object. This object has arbitrary settings that can be applied to use through the use of the set method. To make a job work on a delimiter of ---, you’d use the following:

Configuration conf = new Configuration();
conf.set("textinputformat.record.delimiter", "---");

From here, there’s no change to your code. Here’s a very simple map reduce module that is using the custom format.

Mining data with Hive

Apache Hive is a database analytics technology that can be used to mine, structured, well formatted data. From the website:

The Apache Hive™ data warehouse software facilitates querying and managing large datasets residing in distributed storage. Hive provides a mechanism to project structure onto this data and query the data using a SQL-like language called HiveQL. At the same time this language also allows traditional map/reduce programmers to plug in their custom mappers and reducers when it is inconvenient or inefficient to express this logic in HiveQL.

In today’s post, I’m going to walk through getting up and running to your first query with Hive.

CSV

Probably the easiest place to start, is a CSV file. Information in the file has its fields terminated by a comma , and lines by a newline \n. The example that I’ll use to day contains the following data:

id,first_name,last_name,age,country
1,John,Smith,24,ZA
2,Katie,Brown,27,AU
3,Stacey,Green,21,NZ
4,Joe,Taylor,34,US
5,Bob,Smith,20,US

Before Hive can get its hands on this information, we’ll need to make it available to cluster by uploading it to HDFS.

bin/hadoop fs -put people.csv /user/root/people.csv

Now we can startup Hive and create the table structure that we’ll be working with.

hive> CREATE TABLE people (
    > id INT,
    > first_name STRING,
    > last_name STRING,
    > age INT,
    > country STRING
    > ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
    > STORED AS TEXTFILE
    > TBLPROPERTIES ("skip.header.line.count"="1");
OK
Time taken: 1.195 seconds

There’s a fair bit in this data definition. The full documentation on Hive’s DDL can be found here. There are so many ways that you can accomplish things, and the example that I’ve listed is very simple.

ROW FORMAT DELIMITED tells Hive to use the default SerDe. We could have specified a regular expression here to interpret a line of the file, or specified our own custom SerDe but because this is so standard we only needed a field delimiter which is denoted by the FIELDS TERMINATED BY. There is also a LINES TERMINATED BY should you need to specify something other than \n as the terminator.

STORED AS TEXTFILE is the default. Our data is being stored in textfiles. Finally, TBLPROPERTIES allows arbitrary information to be applied to the create. We just wanted to tell the table that the first line in the files that it’ll encounter should be discarded as it’s the header line.

Load in the data

Now that we’ve built a data structure, we can now put some data in it.

hive> LOAD DATA INPATH '/user/root/people.csv' OVERWRITE INTO TABLE people;
Loading data to table default.people
Table default.people stats: [numFiles=1, numRows=0, totalSize=133, rawDataSize=0]

We’re good to run that first query now!

hive> SELECT * FROM people;
OK
1 John  Smith 24  ZA
2 Katie Brown 27  AU
3 Stacey  Green 21  NZ
4 Joe Taylor  34  US
5 Bob Smith 20  US
Time taken: 0.277 seconds, Fetched: 5 row(s)

Once we involve aggregates, these queries start to get submitted at MapReduce jobs:

hive> SELECT country, AVG(age)
    > FROM people
    > GROUP BY country;
Query ID = root_20151122062444_77533aaa-5c95-4d2e-8742-b3891226c393
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks not specified. Estimated from input data size: 1
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1448188576608_0002, Tracking URL = http://d62d018a5b3f:8088/proxy/application_1448188576608_0002/
Kill Command = /usr/local/hadoop/bin/hadoop job  -kill job_1448188576608_0002
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2015-11-22 06:24:49,989 Stage-1 map = 0%,  reduce = 0%
2015-11-22 06:24:56,214 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 1.08 sec
2015-11-22 06:25:02,440 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 2.68 sec
MapReduce Total cumulative CPU time: 2 seconds 680 msec
Ended Job = job_1448188576608_0002
MapReduce Jobs Launched: 
Stage-Stage-1: Map: 1  Reduce: 1   Cumulative CPU: 2.68 sec   HDFS Read: 8041 HDFS Write: 32 SUCCESS
Total MapReduce CPU Time Spent: 2 seconds 680 msec
OK
AU  27.0
NZ  21.0
US  27.0
ZA  24.0
Time taken: 19.166 seconds, Fetched: 4 row(s)

Next steps

The examples page on the Hive site has some more complex data definitions, including being able to specify your own SerDe using python as well as processing an Apache web server log.

Hadoop Streaming with Python

Hadoop provides a very rich API interface for developing and running MapReduce jobs in Java, however this is not always everybody’s preference. Hadoop Streaming makes it possible to run MapReduce jobs with any language that can access the standard streams STDIN and STDOUT.

Hadoop Streaming creates the plumbing required to build a full map reduce job out to your cluster so that all you need to do is supply a mapper and reducer that uses STDIN for their input and STDOUT for their output.

In today’s example, we’ll re-implement the word count example with python using streaming.

The mapper

In this case, the mapper’s job is to take a line of text (input) a break it into words. We’ll then write the word along with the number 1 to denote that we’ve counted it.

#!/usr/bin/env python

import sys

def read_input(file):
  '''Splits the lines given to it into words and
     produces a generator'''

  for line in file:
      yield line.split()

def main():
  '''Produces (word,1) pairs for every word 
     encountered on the input'''

  data = read_input(sys.stdin)

  for words in data:
      for word in words:
          print '%s,%d' % (word, 1)

if __name__ == "__main__":
  main()

The reducer

The reducers’ job is to come through and process the output of the map function, perform some aggregative operation over the set and produce an output set on this information. In this example, it’ll take the word and each of the 1’s, accumulating them to form a word count.

#!/usr/bin/env python

from itertools import groupby
from operator import itemgetter
import sys

def parse_output(file):
  '''Parses a single line of output produced 
     by the mapper function'''

  for line in file:
      yield line.rstrip().split(',', 1)

def main():
  data = parse_output(sys.stdin)

  # produce grouped pairs to count
  for current_word, group in groupby(data, itemgetter(0)):
    try:

      # produce the total count      
      total_count = sum(int(count) for current_word, count in group)
    
      # send it out to the output
      print "%s,%d" % (current_word, total_count)
    except ValueError:
      # ignore casting errors
      pass

if __name__ == "__main__":
  main()

input | map | sort | reduce

Before we full scale with this job, we can simulate the work that the Hadoop cluster would do for us by using our shell and pipe indirection to test it out. This is not a scale solution, so make sure you’re only giving it a small set of data. We can really treat this process as:

The Zen and the Art of the Internet should do, just fine.

$ cat zen10.txt | ./mapper.py | sort -k1,1 | ./reducer.py

We can now submit this job to the hadoop cluster like so. Remember, we need access to our source data, mapper and reducer from the namenode where we’ll submit this job from.

Submitting your job on the cluster

First, we need to get our input data in an accessible spot on the cluster.

$ bin/hadoop fs -mkdir /user/hadoop
$ bin/hadoop fs -put /srv/zen10.txt /user/hadoop

Make sure it’s there:

$ bin/hadoop fs -ls /user/hadoop
Found 1 items
-rw-r--r--   1 root supergroup     176012 2015-11-20 23:03 /user/hadoop/zen10.txt

Now, we can run the job.

$ bin/hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.7.0.jar \
             -mapper /src/mapper.py \
             -reducer /src/reducer.py 
             -input /user/hadoop/zen10.txt \
             -output /user/hadoop/zen10-output \

The -mapper and -reducer switches are referring to files on the actual linux node whereas -input and -output are referring to HDFS locations.

Results

The results are now available for you in /user/hadoop/zen10-output.

$ bin/hadoop fs -cat \
             /user/hadoop/zen10-output/part-00000

You should see the results start spraying down the page.

. . .
. . .

vernacular,1
version,10
versions,9
very,13
via,20
vic-20,2
vice,1

. . .
. . .

Limitation

So far, the only limitation that I’ve come across with this method of creating map reduce jobs is that the mapper will only work line-by-line. You can’t treat a single record as information spanning across multiple lines. Having information span across multiple lines in your data file should be a rare use case though.

Creating timer jobs with systemd

Creating and executing timer jobs has traditionally been a task for cron. With the arrival of systemd, this responsibility has been shifted onto services and timers. In today’s post, I’ll walk you through creating a service and timer schedule.

Setup

To accomplish this task, we need two files and a couple of shell commands. The basic method to do this is as follows:

  • Create a service definition
  • Create a timer definition
  • Start and enable the timer

In the example today, I’m going to schedule s3cmd each week to run over a mounted drive to sync with s3.

As we’re working with systemd, everything that we’ll do is a unit file.

Create a service definition

The service definition is a unit file which defines the actual work to be done. The following is placed at /etc/systemd/system/sync-to-s3.service.

[Unit]
Description=Runs the sync script for local file shares to s3

[Service]
Type=oneshot
ExecStart=/usr/bin/sh -c 's3cmd sync --check-md5 --follow-symlinks --verbose /mnt/share/ s3://my-s3-bucket/'

Full particulars on this file structure can be found in the documentation about service unit configuration.

Create a timer definition

The timer definition is also another unit file that defines a schedule. The following is named the same as the above, only it gets a .timer extension at /etc/systemd/system/sync-to-s3.timer.

[Unit]
Description=Schedules the sync of local file shares out to s3

[Timer]
OnCalendar=weekly
OnBootSec=10min

[Install]
WantedBy=multi-user.target

Again the documentation defines the full definition of the timer unit configuration.

The OnCalendar takes a value that needs to be understood by the time span parser, so make sure that it’s valid in accordance with the time span reference.

Start and enable the timer

Now that the service and schedule definitions have been created, we can start up the timer:

sudo systemctl start sync-to-s3.timer
sudo systemctl enable sync-to-s3.timer

Now that you’ve got your job up and running, you get the full feature set that systemd offers, including journald. You can use this to inspect the current or historical run logs from invocations:

sudo journalctl -u sync-to-s3

Using PIG

Pig is a data mining and analysis language that you can use to reason about large data sets. The language works with Hadoop’s MapReduce framework to enable the language to crunch large datasets.

In today’s post, I’ll walk you through:

  • Loading input data to HDFS
  • Writing and Executing your Pig query
  • Exploring output data sets

Get started

First of all, we need our source data. For the purposes of this article, I have a very simple data set consisting of 4 fields all sitting in a CSV format file. The file people.csv then looks like this:

id firstname lastname age
1 John Smith 25
2 Mary Brown 27
3 Paul Green 21
4 Sally Taylor 30

I’ll assume that your source file is sitting on a node in your Hadoop cluster that has access to HDFS. We now create an area for us to store our input data as well as upload our source data to HDFS with the following:

# make a directory under the /user/hadoop folder
# to hold our data, called "demo"
$ bin/hadoop fs -mkdir -p /user/hadoop/demo

# perform the upload into the "demo" folder
$ bin/hadoop fs -put /src/people.csv /user/hadoop/demo

We now confirm that the data is actually sitting there, using the familliar ls command:

$ bin/hadoop fs -ls /user/hadoop/demo

HDFS should respond, showing peoeple.csv. in place

Found 1 items
-rw-r--r--   1 root supergroup         80 2015-11-10 06:27 /user/hadoop/demo/people.csv

Running your queries

Now that our source data has been deployed to HDFS and is available to us, we can fire up Pig. There are two modes that you can run Pig in:

  • Local which will operate on local data and not submit map reduce jobs to complete its process
  • MapReduce which will use the cluster to perform its work

The local mode is quite handy in testing scenarios where your source data set is small and you’re just looking to test something out quickly. The mapreduce mode is where your information needs to scale to the size of your cluster.

# startup Pig so that it's in mapreduce mode
$ pig -x mapreduce

Now that you’ve got Pig started, you’ll be presented with the grunt> prompt. It’s at this prompt that we can enter in our queries for processing. The following query will load our data set, extract the first (id) column and pump it into an output set.

grunt> A = load '/user/hadoop/demo/people.csv' using PigStorage(',');
grunt> B = foreach A generate $0 as id;
grunt> store B into 'id.out';

The source data set is loaded into A. B then takes all of the values in the first column and writes these to id.out.

Pig will now send your question (or query) off into the compute cluster in the form of a map reduce job. You’ll see the usual fanfare scrolling up the screen from the output of this job submission, and you should be able to follow along on the job control web application for your cluster.

Viewing the result

Once the query has finished its process, you’ll be able to take a look at the result. As this has invoked a map reduce job, you’ll be offered the familiar _SUCCESS file in your output folder to illustrate that your query has run successfully.

$ bin/hadoop fs -ls id.out

You’ll also be given the result in the file part-m-00000.

Found 2 items
-rw-r--r--   1 root supergroup          0 2015-11-10 06:54 id.out/_SUCCESS
-rw-r--r--   1 root supergroup         31 2015-11-10 06:54 id.out/part-m-00000

We can take a look at these results now:

$ bin/hadoop fs -cat id.out/part-m-00000

This is a very simple example of how to run a Pig query on your Hadoop cluster. You can see how these ideas will scale with you as your dataset grows. The example query itself isn’t very complex by any stretch, so now that you know how to execute queries you can read up on Pig latin to tune your query writing craft.