Hadoop provides a very rich API interface for developing and running MapReduce jobs in Java, however this is not always everybody’s preference. Hadoop Streaming makes it possible to run MapReduce jobs with any language that can access the standard streamsSTDIN and STDOUT.
Hadoop Streaming creates the plumbing required to build a full map reduce job out to your cluster so that all you need to do is supply a mapper and reducer that uses STDIN for their input and STDOUT for their output.
In today’s example, we’ll re-implement the word count example with python using streaming.
The mapper
In this case, the mapper’s job is to take a line of text (input) a break it into words. We’ll then write the word along with the number 1 to denote that we’ve counted it.
#!/usr/bin/env python
importsysdefread_input(file):'''Splits the lines given to it into words and
produces a generator'''forlineinfile:yieldline.split()defmain():'''Produces (word,1) pairs for every word
encountered on the input'''data=read_input(sys.stdin)forwordsindata:forwordinwords:print'%s,%d'%(word,1)if__name__=="__main__":main()
The reducer
The reducers’ job is to come through and process the output of the map function, perform some aggregative operation over the set and produce an output set on this information. In this example, it’ll take the word and each of the 1’s, accumulating them to form a word count.
#!/usr/bin/env python
fromitertoolsimportgroupbyfromoperatorimportitemgetterimportsysdefparse_output(file):'''Parses a single line of output produced
by the mapper function'''forlineinfile:yieldline.rstrip().split(',',1)defmain():data=parse_output(sys.stdin)# produce grouped pairs to count
forcurrent_word,groupingroupby(data,itemgetter(0)):try:# produce the total count
total_count=sum(int(count)forcurrent_word,countingroup)# send it out to the output
print"%s,%d"%(current_word,total_count)exceptValueError:# ignore casting errors
passif__name__=="__main__":main()
input | map | sort | reduce
Before we full scale with this job, we can simulate the work that the Hadoop cluster would do for us by using our shell and pipe indirection to test it out. This is not a scale solution, so make sure you’re only giving it a small set of data. We can really treat this process as:
We can now submit this job to the hadoop cluster like so. Remember, we need access to our source data, mapper and reducer from the namenode where we’ll submit this job from.
Submitting your job on the cluster
First, we need to get our input data in an accessible spot on the cluster.
So far, the only limitation that I’ve come across with this method of creating map reduce jobs is that the mapper will only work line-by-line. You can’t treat a single record as information spanning across multiple lines. Having information span across multiple lines in your data file should be a rare use case though.
Creating and executing timer jobs has traditionally been a task for cron. With the arrival of systemd, this responsibility has been shifted onto services and timers. In today’s post, I’ll walk you through creating a service and timer schedule.
Setup
To accomplish this task, we need two files and a couple of shell commands. The basic method to do this is as follows:
Create a service definition
Create a timer definition
Start and enable the timer
In the example today, I’m going to schedule s3cmd each week to run over a mounted drive to sync with s3.
As we’re working with systemd, everything that we’ll do is a unit file.
Create a service definition
The service definition is a unit file which defines the actual work to be done. The following is placed at /etc/systemd/system/sync-to-s3.service.
[Unit]Description=Runs the sync script for local file shares to s3[Service]Type=oneshotExecStart=/usr/bin/sh -c 's3cmd sync --check-md5 --follow-symlinks --verbose /mnt/share/ s3://my-s3-bucket/'
The timer definition is also another unit file that defines a schedule. The following is named the same as the above, only it gets a .timer extension at /etc/systemd/system/sync-to-s3.timer.
[Unit]Description=Schedules the sync of local file shares out to s3[Timer]OnCalendar=weeklyOnBootSec=10min[Install]WantedBy=multi-user.target
The OnCalendar takes a value that needs to be understood by the time span parser, so make sure that it’s valid in accordance with the time span reference.
Start and enable the timer
Now that the service and schedule definitions have been created, we can start up the timer:
Now that you’ve got your job up and running, you get the full feature set that systemd offers, including journald. You can use this to inspect the current or historical run logs from invocations:
Pig is a data mining and analysis language that you can use to reason about large data sets. The language works with Hadoop’s MapReduce framework to enable the language to crunch large datasets.
First of all, we need our source data. For the purposes of this article, I have a very simple data set consisting of 4 fields all sitting in a CSV format file. The file people.csv then looks like this:
id
firstname
lastname
age
1
John
Smith
25
2
Mary
Brown
27
3
Paul
Green
21
4
Sally
Taylor
30
I’ll assume that your source file is sitting on a node in your Hadoop cluster that has access to HDFS. We now create an area for us to store our input data as well as upload our source data to HDFS with the following:
# make a directory under the /user/hadoop folder# to hold our data, called "demo"$ bin/hadoop fs -mkdir-p /user/hadoop/demo
# perform the upload into the "demo" folder$ bin/hadoop fs -put /src/people.csv /user/hadoop/demo
We now confirm that the data is actually sitting there, using the familliar ls command:
$ bin/hadoop fs -ls /user/hadoop/demo
HDFS should respond, showing peoeple.csv. in place
Now that our source data has been deployed to HDFS and is available to us, we can fire up Pig. There are two modes that you can run Pig in:
Local which will operate on local data and not submit map reduce jobs to complete its process
MapReduce which will use the cluster to perform its work
The local mode is quite handy in testing scenarios where your source data set is small and you’re just looking to test something out quickly. The mapreduce mode is where your information needs to scale to the size of your cluster.
# startup Pig so that it's in mapreduce mode$ pig -x mapreduce
Now that you’ve got Pig started, you’ll be presented with the grunt> prompt. It’s at this prompt that we can enter in our queries for processing. The following query will load our data set, extract the first (id) column and pump it into an output set.
grunt> A = load '/user/hadoop/demo/people.csv' using PigStorage(',');
grunt> B = foreach A generate $0 as id;
grunt> store B into 'id.out';
The source data set is loaded into A. B then takes all of the values in the first column and writes these to id.out.
Pig will now send your question (or query) off into the compute cluster in the form of a map reduce job. You’ll see the usual fanfare scrolling up the screen from the output of this job submission, and you should be able to follow along on the job control web application for your cluster.
Viewing the result
Once the query has finished its process, you’ll be able to take a look at the result. As this has invoked a map reduce job, you’ll be offered the familiar _SUCCESS file in your output folder to illustrate that your query has run successfully.
$ bin/hadoop fs -ls id.out
You’ll also be given the result in the file part-m-00000.
This is a very simple example of how to run a Pig query on your Hadoop cluster. You can see how these ideas will scale with you as your dataset grows. The example query itself isn’t very complex by any stretch, so now that you know how to execute queries you can read up on Pig latin to tune your query writing craft.
SQL Server and PostgreSQL are both relational database systems and as such share similarities that should allow you to migrate your data structures between them. In today’s post, I’m going to go through the process of migrating SQL Server data schema objects over to PostgreSQL.
Immediate differences
Whilst both relational database systems implement a core set of the standard language, there are implementation-specific features which need special consideration. So long as you are capable of wrangling text in your favorite editor, the conversion task shouldn’t be that hard.
The batch terminator GO gets replaces by a much more familiar ;.
Tables
First thing to do for tables is to generate your create scripts. Make sure that you:
Turn offDROP statement generation for your objects
Turn on index and keys generation
To safely qualify the names of objects within the database, SQL Server will surround its object names with square brackets [], so you’ll see definitions like this:
-- generated from SQL ServerCREATETABLE[dbo].[Table1]([ID]INTIDENTITY(1,1)NOTNULL,[Name]VARCHAR(50)NOTNULLCONSTRAINT[PK_Table1]PRIMARYKEYCLUSTERED([ID]ASC))
PostgreSQL uses double-quotes on object names and doesn’t use the owner (in the above case [dbo]) to qualify names.
In the above example, Table1 is using IDENTITY on its primary key field ID. This gives us the auto-increment functionality that’s so natural in relational database systems. There is a little extra work in PostgreSQL to emulate this behavior through the use of CREATE SEQUENCE and nextval.
The SQL Server definition above now looks like this for PostgreSQL:
-- migrated for PostgreSQLCREATESEQUENCETable1Serial;CREATETABLETable1(IDINTNOTNULLDEFAULTnextval('Table1Serial'),NameVARCHAR(50)NOTNULL,CONSTRAINTPK_Table1PRIMARYKEY(ID));
Stored Procedures
Stored procedures in SQL Server are considered a much more common citizen in the database world than Stored procedures in PostgreSQL. If your database design hinges on extensive use of stored procedures, you’ll be in for a bit of redevelopment.
Both stored procedures and functions are created using the same syntax in PostgreSQL. The actions that either can perform differ though:
Stored Procedure
Function
Used in an expression
No
Yes
Return a value
No
Yes
Output parameters
Yes
No
Return result set
Yes
Yes
Multiple result sets
Yes
No
A simple function that will square its input value looks as follows:
A more in-depth example involves returning a result set from within a stored procedure. You can do this in an unnamed fashion; you won’t control the name of the cursor coming back.
There are quite a few times where I’ve run a command on a remote machine and needed to get out of that machine but leave my command running.
I’ll normally start a job that I know is going to take a while using an ampersand like so:
$ long-running-prog &
Really, the nohup command should also be put on the command line so that the command that you execute will ignore the signal SIGHUP.
$ nohup long-running-prog &
$ exit
If you’re already part-way through a running process, you can get it to continue running in the background (while you make your getaway) by doing the following
$ long-running-prog
CTRL-Z
$ bg$ disown pid
You use CTRL-Z to suspend the running process. The bg command then gets the program running in the background. You can confirm that it is running in the background with the jobs command. Lastly, using disown detatches the process running in the background from your terminal, so that when you exit your session the process will continue.