Monday, February 1, 2016

Executing Python and Python3 scripts in Oozie workflows

it is not completely obvious but you can certainly run Python scripts within
Here’s a sample file, nothing special about it.
Here’s a sample workflow that will look for a script called inside scripts folder
<workflow-app xmlns="uri:oozie:workflow:0.4" name="python-wf">
    <start to="python-node"/>
    <action name="python-node">
        <shell xmlns="uri:oozie:shell-action:0.2">
        <ok to="end"/>
        <error to="fail"/>
    <kill name="fail">
    <message>Python action failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
    <end name="end"/>
here’s my sample
#! /usr/bin/env pythonimport os, pwd, sysprint "who am I? " + pwd.getpwuid(os.getuid())[0]print "this is a Python script" 
print "Python Interpreter Version: " + sys.version
directory tree for my workflow assuming the workflow directory is called python is as such
[root@sandbox python]# tree
├── scripts
│   └──
└── workflow.xml

1 directory, 3 files
now you can execute the workflow like any other Oozie workflow.
If you wanted to leverage Python3, make sure Python3 is installed on every node. My Python3 looks like this
#! /usr/bin/env /usr/local/bin/python3.3
import os, pwd, sys
print("who am I? " + pwd.getpwuid(os.getuid())[0])
print("this is a Python script")
print("Python Interpreter Version: " + sys.version)

Tuesday, January 5, 2016

Apache Hive Groovy UDF examples

One of many reasons to be part of a vibrant and rich open source community is access to a treasure trove of information. One evening, I was reading through the Hive user mailing list and noticed how one user was suggesting to write Groovy to parse JSON. It was strange to suggest that approach when there are at least three ways to do so in Hive and they're built-in! It was astonishing because this feature is not very well documented. I decided to dig into it and wrote a couple of examples myself, the last two examples are contributed by Gopal from Hortonworks on that same mailing list. Now for the main event:

Groovy UDF example

Can be compiled at run time

Currently only works in "hive" shell, does not work in beeline
su guest
paste the following code into the hive shell
this will use Groovy String replace function to replace all instances of lower case 'e' with 'E'
compile `import org.apache.hadoop.hive.ql.exec.UDF \;
import \;
public class Replace extends UDF {
  public Text evaluate(Text s){
    if (s == null) return null \; 
    return new Text(s.toString().replace('e', 'E')) \;
} ` AS GROOVY NAMED Replace.groovy;
now create a temporary function to leverage the Groovy UDF
now you can use the function in your SQL
SELECT Replace(description) FROM sample_08 limit 5;
full example
hive> compile `import org.apache.hadoop.hive.ql.exec.UDF \;
    > import \;
    > public class Replace extends UDF {
    >   public Text evaluate(Text s){
    >     if (s == null) return null \;
    >     return new Text(s.toString().replace('e', 'E')) \;
    >   }
    > } ` AS GROOVY NAMED Replace.groovy;
Added [/tmp/0_1452022176763.jar] to class path
Added resources: [/tmp/0_1452022176763.jar]
hive> CREATE TEMPORARY FUNCTION Replace as 'Replace';
Time taken: 1.201 seconds
hive> SELECT Replace(description) FROM sample_08 limit 5;
All Occupations
ManagEmEnt occupations
ChiEf ExEcutivEs
GEnEral and opErations managErs
Time taken: 6.373 seconds, Fetched: 5 row(s)

Another example

this will duplicate any String passed to the function
compile `import org.apache.hadoop.hive.ql.exec.UDF \;
import \;
public class Duplicate extends UDF {
  public Text evaluate(Text s){
    if (s == null) return null \; 
    return new Text(s.toString() * 2) \;
} ` AS GROOVY NAMED Duplicate.groovy;

CREATE TEMPORARY FUNCTION Duplicate as 'Duplicate';
SELECT Duplicate(description) FROM sample_08 limit 5;

All OccupationsAll Occupations
Management occupationsManagement occupations
Chief executivesChief executives
General and operations managersGeneral and operations managers

JSON Parsing UDF

compile `import org.apache.hadoop.hive.ql.exec.UDF \;
import groovy.json.JsonSlurper \;
import \;
public class JsonExtract extends UDF {
  public int evaluate(Text a){
    def jsonSlurper = new JsonSlurper() \;
    def obj = jsonSlurper.parseText(a.toString())\;
    return  obj.val1\;
} ` AS GROOVY NAMED json_extract.groovy;

CREATE TEMPORARY FUNCTION json_extract as 'JsonExtract';
SELECT json_extract('{"val1": 2}') from date_dim limit 1;


Tuesday, December 29, 2015

Apache Hive CSV SerDe example

I’m going to show you a neat way to work with CSV files and Apache Hive. Usually, you’d have to do some preparatory work on CSV data before you can consume it with Hive but I’d like to show you a built-in SerDe (Serializer/Deseriazlier) for Hive that will make it a lot more convenient to work with CSV. This work was merged in Hive 0.14 and there’s no additional steps necessary to work with CSV from Hive.
Suppose you have a CSV file with the following entries 
id first_name last_name email gender ip_address 
1 James Coleman Male 
2 Lillian Lawrence Female 
3 Theresa Hall Female 
4 Samuel Tucker Male 
5 Emily Dixon Female
to consume it from within Hive, you’ll need to upload it to hdfs
hdfs dfs -put sample.csv /tmp/serdes/
now all it takes is to create a table schema on top of the file
drop table if exists sample;
create external table sample(id int,first_name string,last_name string,email string,gender string,ip_address string)
  row format serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
  stored as textfile
location '/tmp/serdes/';
now you can query the table as is
select * from sample limit 10;
but what if your CSV file was tab-delimited rather than comma?
well the SerDe got you covered there too:
drop table if exists sample;
create external table sample(id int,first_name string,last_name string,email string,gender string,ip_address string)
  row format serde 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
with serdeproperties (
  "separatorChar" = "\t"
  stored as textfile
location '/tmp/serdes/';
notice the separatorChar argument, in all, the SerDe accepts two more arguments; custom escape characters and quote characters 
Take a look at the wiki for more info.
The code is available at my github repo.
Blog was edited with

Thursday, December 24, 2015

Pig Dynamic Invoker

I must’ve been living under a rock because I’d just learned about Pig’s dynamic invokers. What if I told you that besides UDFs, you have another option to run your Java code without compiling your UDFs. I will let you read the docs on your own but even though I find it quite handy to use it, it is pretty limited in features. You’re limited to passing primitives only and only static methods work. There’s an example of using a non-static method “StringConcat” but I haven’t been able to make it work.
So for the demo:
suppose you have a file with numbers 4, 9, 16, etc, one on each line 
upload the file to hdfs
hdfs dfs -put numbers /user/guest/
then suppose you’d like to use Java Math’s Sqrt function to get square root of each number, you can of course write use built-in SQRT function but for the example purposes bare with me. The code to make it work with Pig and Java would look like so:
DEFINE Sqrt InvokeForDouble('java.lang.Math.sqrt', 'double');
numbers = load '/user/guest/numbers' using PigStorage() as (num:double);
sqrts = FOREACH numbers GENERATE Sqrt($0);
dump sqrts;
then all that’s left doing is execute the script
pig -x tez scriptname.pig
I think this feature has a lot of promise, especially if it can be opened up to non-primitive types and of course not just static methods. It was introduced in Pig 0.8 and I haven't seen any changes since to extend feature set. It's unfortunate but then again, you can extend Pig's functionality with UDFs. 
Another thing to keep in mind, since this relies on reflection, your Pig scripts will run slower than if you'd write the same functions as UDFs. I do see value in this though, when you're lazy to develop, compile and test your UDFs and need something quick!
as usual, you can find the code at my github repo.
post written with

Wednesday, December 9, 2015

Hadoop with Python Book Review

O'Reilly recently released a free ebook called Hadoop with Python by the author of MapReduce Design Patterns, Donald Miner. Needless to say that caught my eye. The book is a short read, I was able to run through it within two lunch hours. It has five chapters tackling different angles of Hadoop. It is an easy read with an excellent overview of each product discussed.

1st chapter discusses HDFS and Spotify's library written in Python called Snakebite that allows for Python shops interact with HDFS in a native way. This is pretty use-case specific because I don't see a reason to use the library unless you're a Python-heavy shop. The other drawback is that it's not Python3 compliant. That may be an issue going forward. The cool think about Snakebite is that
the library does not require loading any Java libraries and promises to be really fast to load. It leverages RPC to speak to Namenode and uses protobuf, so interaction is native.

2nd chapter is on writing MapReduce in Python either with Hadoop Streaming or MRJOB. In my own opinion, I just don't see a purpose of yet another framework for Hadoop to write MapReduce, there's Apache Pig and Hive for that that is high enough level. Probably the one thing I found interesting is with MRJOB, you can run MR against S3 bucket directly.

3rd chapter is on Apache Pig and extending Pig with Python UDFs. I personally enjoyed this chapter very much, as a Pig aficianodo as well as learning a few things about UDFs with Python. I will certainly use this chapter a lot going forward evangelizing Pig.

4th chapter, I have the same sentiment about this chapter as much as previous. This one is on PySpark and it has an excellent overview of Spark and great examples in PySpark. Same goes here, I will be referring to this chapter a lot.

5th and last chapter is on Luigi, workflow manager for Hadoop written in Python and leverages Python for writing workflows as opposed to Oozie. I personally saw a demo at Spotify of Luigi a few years back and even though it is pretty enticing to say the least, given the complex nature of Oozie, I don't see a point using this tool unless you're in a Python shop and you stay away from Oozie (I don't blame you). Now I'm a purist and I can't say I have much faith in a project maintained by one company. Again, opinions are my own.

In summary, the book introduces each concept very well with meaningful examples. I found Spark and Pig chapters extremely useful and interesting. It's an easy read and is very interesting. I highly recommend reading the book, especially that it is a free download, (THANK YOU). Again, there's nothing wrong with any one of these projects, it's just they serve a certain niche and I find little use from these projects. That said, the book is worth a read either way since it's so short and Pig and Spark chapters are worth it's weight in gold! Happy reading..

Tuesday, November 24, 2015

Apache Pig Groovy UDF end to end examples

Apache Pig 0.11 added ability to write Pig UDFs in Groovy. The other possible languages to write Pig UDFs are Python, Ruby, Jython, Java, JavaScript. There are a lot of examples for UDFs in Python but the documentation does not give enough for beginners to get started with Groovy. I found the process of writing a Groovy UDF a lot more complicated than Python for example. First misconception is that you don't need to include Groovy groovy-all.jar in pig libraries, Pig is shipped with Groovy by default. Furthermore, you don't need to install Groovy on the client or any other machine, for the same reason as before. The other issue I was having and it was that I was getting type mismatch errors. The tuples arrive as byte arrays, at least with PigStorage loader function and before applying your custom logic, you need to cast the input to the appropriate class.

import org.apache.pig.scripting.groovy.OutputSchemaFunction;
import org.apache.pig.PigWarning;

class GroovyUDFs {
    public static allcaps(byte[] b) {
        if(b==null || b.size() == 0) {
            return null;
        try {
            String s = new String(b, "UTF-8");
            return s.toUpperCase();
        } catch (ClassCastException e) {
            warn("casting error", PigWarning.UDF_WARNING_1);  
        } catch ( Exception e) {
            warn("Error", PigWarning.UDF_WARNING_1);  
        return null;
    public static toUpperSchema(input) {
        return input;
Then in your Pig script

register 'udfs.groovy' using org.apache.pig.scripting.groovy.GroovyScriptEngine as myfuncs;

a = load '../input.txt' using PigStorage();
b = foreach a generate myfuncs.allcaps($0);
dump b;
And that's all she wrote. The sample code is on my github page, along with sample Python UDFs. Notice in my Python UDF, I'm casting the input to str() function to take advantage of .upper() method. WARNING, the Python script may need some more error handling and it automatically assumes you're passing a String object. In my Groovy script, I'm checking for most common errors.

return str(word).upper()
You can execute the sample pig scripts on Sandbox using tez_local mode.

pig -x tez_local example.pig

As usual, please provide feedback, Markdown was generated using Dillinger.

Friday, November 20, 2015

Using mongo-hadoop connector to interact with MongoDB using Pig, Hive and Spark (Update)

I published a set of Pig, Hive and Spark scripts to interact with MongoDB using mongo-hadoop connector. Some of the published tutorials on Mongo and Hadoop on Databricks and MongoDB sites are no longer working, I decided to update them for HDP 2.3. Some things are still wonky, like Hive queries failing if you try to run anything other than select. Either way, give it a try and provide feedback.

One more thing, I'm using Sandbox with HDP 2.3.2 and mongo is installed as an Ambari service using tutorial from github user nikunjness, made my work so much easier.

The code is published on my github page as well as on Hortonworks Community Site.

Thanks and enjoy.

Sample tutorial on HDP integration with MongoDB using Ambari, Spark, Hive and Pig


HDP 2.3.2 Sandbox
Mongo 2.6.11
install MongoDB service as per


make sure you change directory to home after completing the mongo-ambari service install

install gradle

mv gradle-2.7 /opt/
export GRADLE_HOME=/opt/gradle-2.7/bin/

download mongo-hadoop

cd mongo-hadoop-master/

compile the connectors, should take between 2-10min

./gradlew jar

copy drivers to one directory

mkdir ~/drivers
cd ~/drivers

download mongodb java drivers or build your own


or build using the following pom

cp ~/mongo-hadoop-master/core/build/libs/mongo-hadoop-core-1.5.0-SNAPSHOT.jar ~/drivers
cp ~/mongo-hadoop-master/pig/build/libs/mongo-hadoop-pig-1.5.0-SNAPSHOT.jar ~/drivers
cp ~/mongo-hadoop-master/hive/build/libs/mongo-hadoop-hive-1.5.0-SNAPSHOT.jar ~/drivers
cp ~/mongo-hadoop-master/spark/build/libs/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar ~/drivers
cp ~/mongo-hadoop-master/flume/build/libs/flume-1.5.0-SNAPSHOT.jar ~/drivers

copy drivers to hdp libs, needs these on the classpath

cp -r ~/drivers/* /usr/hdp/current/hadoop-client/lib/

restart services in Ambari

create local user

sudo -u hdfs hdfs dfs -mkdir /user/root
sudo -u hdfs hdfs dfs -chown -R root:hdfs /user/root



load data into mongo

mongoimport mstf.csv --type csv --headerline -d marketdata -c minibars

check data is in mongo

[root@sandbox mongo-tutorial]# mongo
MongoDB shell version: 2.6.11
connecting to: test
> use marketdata
switched to db marketdata
> db.minibars.findOne()
    "_id" : ObjectId("564359756336db32f2b4e8ce"),
    "Symbol" : "MSFT",
    "Timestamp" : "2009-08-24 09:30",
    "Day" : 24,
    "Open" : 24.41,
    "High" : 24.42,
    "Low" : 24.31,
    "Close" : 24.31,
    "Volume" : 683713
> exit

login to beeline

if you get error jdbc:hive2://localhost:10000 (closed)> Error: Failed to open new session: java.lang.RuntimeException: java.lang.RuntimeException: org.apache.hadoop.ipc.RemoteException( User: hive is not allowed to impersonate root (state=,code=0)
go to core-site and replace “users” with “*” in proxyusers for hive group

make sure jars are copied to hdp libs otherwise will get the error in the jira below

hdfs dfs -put drivers/* /tmp/udfs

!connect jdbc:hive2://localhost:10000 “” ””

add jar hdfs://;
add jar hdfs://;
add jar hdfs://;
objectid STRING,
    Symbol STRING,
    Day INT,
    Open DOUBLE,
    High DOUBLE,
    Low DOUBLE,
    Close DOUBLE,
    Volume INT
STORED BY 'com.mongodb.hadoop.hive.MongoStorageHandler'
WITH SERDEPROPERTIES('mongo.columns.mapping'='{"objectid":"_id",
 "Symbol":"Symbol", "TS":"Timestamp", "Day":"Day", "Open":"Open", "High":"High", "Low":"Low", "Close":"Close", "Volume":"Volume"}')
if you encounter error: Error while processing statement: FAILED: Hive Internal Error: com.sun.jersey.api.client.ClientHandlerException( Connection refused) (state=08S01,code=12)

shut down all services and restart the Sandbox, hive metastore ports most likely conflicting

query the table

select * from bars where bars.volume > 5000000 and bars.volume < 10000000;

|       bars.objectid       | bars.symbol  |      bars.ts      |  |  | bars.high  | bars.low  | bars.close  | bars.volume  |
| 564359756336db32f2b4f1f7  | MSFT         | 2009-08-31 16:00  | 31        | 24.64      | 24.65      | 24.64     | 24.65       | 5209285      |
| 564359756336db32f2b4ff6f  | MSFT         | 2009-09-14 16:00  | 14        | 25.0       | 25.0       | 24.99     | 25.0        | 9574088      |
| 564359756336db32f2b5027d  | MSFT         | 2009-09-16 16:00  | 16        | 25.21      | 25.22      | 25.18     | 25.2        | 7920502      |
| 564359756336db32f2b50eb5  | MSFT         | 2009-09-28 16:00  | 28        | 25.85      | 25.89      | 25.83     | 25.83       | 5487064      |
| 564359756336db32f2b5210a  | MSFT         | 2009-10-16 09:30  | 16        | 26.45      | 26.6       | 26.45     | 26.48       | 5092072      |
| 564359756336db32f2b52902  | MSFT         | 2009-10-23 10:55  | 23        | 28.55      | 28.56      | 28.3      | 28.35       | 5941372      |
| 564359766336db32f2b54721  | MSFT         | 2009-11-20 09:30  | 20        | 29.66      | 29.72      | 29.62     | 29.63       | 6859911      |
| 564359766336db32f2b59cba  | MSFT         | 2010-02-12 16:00  | 12        | 27.94      | 27.94      | 27.93     | 27.93       | 5076037      |
| 564359766336db32f2b5c14f  | MSFT         | 2010-03-19 16:00  | 19        | 29.6       | 29.61      | 29.58     | 29.59       | 8826314      |
| 564359766336db32f2b5cd17  | MSFT         | 2010-03-31 14:08  | 31        | 29.45      | 29.46      | 29.4      | 29.46       | 5314205      |
| 564359766336db32f2b5dccc  | MSFT         | 2010-04-15 16:00  | 15        | 30.87      | 30.87      | 30.87     | 30.87       | 5228182      |
| 564359766336db32f2b5dccd  | MSFT         | 2010-04-16 09:30  | 16        | 30.79      | 30.88      | 30.75     | 30.86       | 6267858      |
| 564359766336db32f2b5de53  | MSFT         | 2010-04-16 16:00  | 16        | 30.68      | 30.7       | 30.67     | 30.67       | 5014677      |
| 564359766336db32f2b5e77d  | MSFT         | 2010-04-26 16:00  | 26        | 31.1       | 31.11      | 31.09     | 31.11       | 5338985      |
| 564359776336db32f2b5fcd0  | MSFT         | 2010-05-14 16:00  | 14        | 28.93      | 28.93      | 28.93     | 28.93       | 5318496      |
| 564359776336db32f2b613b9  | MSFT         | 2010-06-07 16:00  | 7         | 25.3       | 25.31      | 25.29     | 25.29       | 6956406      |
| 564359776336db32f2b616c7  | MSFT         | 2010-06-09 16:00  | 9         | 24.79      | 24.81      | 24.78     | 24.79       | 7953364      |

order by or any select into won’t work, check status of


pyspark --jars drivers/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar

Paste the following in PySpark shell

# set up parameters for reading from MongoDB via Hadoop input format
config = {"mongo.input.uri": "mongodb://localhost:27017/marketdata.minibars"}
inputFormatClassName = "com.mongodb.hadoop.MongoInputFormat"
# these values worked but others might as well
keyClassName = ""
valueClassName = ""

# read the 1-minute bars from MongoDB into Spark RDD format
minBarRawRDD = sc.newAPIHadoopRDD(inputFormatClassName, keyClassName, valueClassName, None, None, config)

# configuration for output to MongoDB
config["mongo.output.uri"] = "mongodb://localhost:27017/marketdata.fiveminutebars"
outputFormatClassName = "com.mongodb.hadoop.MongoOutputFormat"

# takes the verbose raw structure (with extra metadata) and strips down to just the pricing data
minBarRDD = minBarRawRDD.values()


cat the file in hdfs

hdfs dfs -cat spark-mongo-output3/part-00000 | head -n 5


download enron dataset


bzip2 -d enron_mongo.tar.bz2
tar -xvf enron_mongo.tar

restore database

mongorestore dump/enron_mail/messages.bson

# add user

use enron_mail
      user: "reportsUser",
      pwd: "12345678",
      roles: [
         { role: "readWrite", db: "enron_mail" },
         { role: "readWrite", db: "enron_processed" }

# query mongodb, select all rows
use enron_mail

# create new mongodb database
use enron_processed
      user: "writesUser",
      pwd: "12345678",
      roles: [
         { role: "readWrite", db: "enron_processed" }
> exit

Make sure you run the following Pig script in Tez mode, in MR mode it will run approx 15minutes

pig -x tez load_store_mongodb.pig

make sure you run the following Pig script in tez_local mode, we’re not working with HDFS here.

pig -x tez_local load_store_bson.pig

review the output

head -n 5 /tmp/enron_result.bson/part-v001-o000-r-00000.bson

load messages in mongo format and store in Pig format

hdfs dfs -put dump/enron_mail/messages.bson /tmp/

pig -x tez load_store_bson_hdfs.pig

check output

hdfs dfs -cat /tmp/enronoutputpig/part-v001-o000-r-00000 | head -n 5


hdfs dfs -rm -r spark-mongo-output*
hdfs dfs -rm -r /tmp/messages.bson
hdfs dfs -rm -r /tmp/enronoutputpig

Contributions welcome, big thanks to nikunjness for mongo-ambari service, 

markdown editor: