Tuesday, November 24, 2015

Apache Pig Groovy UDF end to end examples

Apache Pig 0.11 added ability to write Pig UDFs in Groovy. The other possible languages to write Pig UDFs are Python, Ruby, Jython, Java, JavaScript. There are a lot of examples for UDFs in Python but the documentation does not give enough for beginners to get started with Groovy. I found the process of writing a Groovy UDF a lot more complicated than Python for example. First misconception is that you don't need to include Groovy groovy-all.jar in pig libraries, Pig is shipped with Groovy by default. Furthermore, you don't need to install Groovy on the client or any other machine, for the same reason as before. The other issue I was having and it was that I was getting type mismatch errors. The tuples arrive as byte arrays, at least with PigStorage loader function and before applying your custom logic, you need to cast the input to the appropriate class.

import org.apache.pig.scripting.groovy.OutputSchemaFunction;
import org.apache.pig.PigWarning;

class GroovyUDFs {
    @OutputSchemaFunction('toUpperSchema')
    public static allcaps(byte[] b) {
        if(b==null || b.size() == 0) {
            return null;
        }
        try {
            String s = new String(b, "UTF-8");
            return s.toUpperCase();
        } catch (ClassCastException e) {
            warn("casting error", PigWarning.UDF_WARNING_1);  
        } catch ( Exception e) {
            warn("Error", PigWarning.UDF_WARNING_1);  
        }
        return null;
    }
    
    public static toUpperSchema(input) {
        return input;
    }
}
Then in your Pig script

register 'udfs.groovy' using org.apache.pig.scripting.groovy.GroovyScriptEngine as myfuncs;

a = load '../input.txt' using PigStorage();
b = foreach a generate myfuncs.allcaps($0);
dump b;
And that's all she wrote. The sample code is on my github page, along with sample Python UDFs. Notice in my Python UDF, I'm casting the input to str() function to take advantage of .upper() method. WARNING, the Python script may need some more error handling and it automatically assumes you're passing a String object. In my Groovy script, I'm checking for most common errors.

return str(word).upper()
You can execute the sample pig scripts on Sandbox using tez_local mode.

pig -x tez_local example.pig

As usual, please provide feedback, Markdown was generated using Dillinger.


Friday, November 20, 2015

Using mongo-hadoop connector to interact with MongoDB using Pig, Hive and Spark (Update)

I published a set of Pig, Hive and Spark scripts to interact with MongoDB using mongo-hadoop connector. Some of the published tutorials on Mongo and Hadoop on Databricks and MongoDB sites are no longer working, I decided to update them for HDP 2.3. Some things are still wonky, like Hive queries failing if you try to run anything other than select. Either way, give it a try and provide feedback.

One more thing, I'm using Sandbox with HDP 2.3.2 and mongo is installed as an Ambari service using tutorial from github user nikunjness, made my work so much easier.

The code is published on my github page as well as on Hortonworks Community Site.

Thanks and enjoy.

Sample tutorial on HDP integration with MongoDB using Ambari, Spark, Hive and Pig

Prerequisites

HDP 2.3.2 Sandbox
Mongo 2.6.11
install MongoDB service as per https://github.com/nikunjness/mongo-ambari

IMPORTANT

make sure you change directory to home after completing the mongo-ambari service install
cd

install gradle

wget https://services.gradle.org/distributions/gradle-2.7-bin.zip
unzip gradle-2.7-bin.zip
mv gradle-2.7 /opt/
export GRADLE_HOME=/opt/gradle-2.7/bin/

download mongo-hadoop

wget https://github.com/mongodb/mongo-hadoop/archive/master.zip
unzip master.zip
cd mongo-hadoop-master/

compile the connectors, should take between 2-10min

./gradlew jar

copy drivers to one directory

mkdir ~/drivers
cd ~/drivers

download mongodb java drivers or build your own

wget https://oss.sonatype.org/content/repositories/releases/org/mongodb/mongodb-driver/3.0.4/mongodb-driver-3.0.4.jar

or build using the following pom

cp ~/mongo-hadoop-master/core/build/libs/mongo-hadoop-core-1.5.0-SNAPSHOT.jar ~/drivers
cp ~/mongo-hadoop-master/pig/build/libs/mongo-hadoop-pig-1.5.0-SNAPSHOT.jar ~/drivers
cp ~/mongo-hadoop-master/hive/build/libs/mongo-hadoop-hive-1.5.0-SNAPSHOT.jar ~/drivers
cp ~/mongo-hadoop-master/spark/build/libs/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar ~/drivers
cp ~/mongo-hadoop-master/flume/build/libs/flume-1.5.0-SNAPSHOT.jar ~/drivers

copy drivers to hdp libs, needs these on the classpath

cp -r ~/drivers/* /usr/hdp/current/hadoop-client/lib/

restart services in Ambari

create local user

cd
sudo -u hdfs hdfs dfs -mkdir /user/root
sudo -u hdfs hdfs dfs -chown -R root:hdfs /user/root

HIVE

wget http://www.barchartmarketdata.com/data-samples/mstf.csv

load data into mongo

mongoimport mstf.csv --type csv --headerline -d marketdata -c minibars

check data is in mongo

[root@sandbox mongo-tutorial]# mongo
MongoDB shell version: 2.6.11
connecting to: test
> use marketdata
switched to db marketdata
> db.minibars.findOne()
{
    "_id" : ObjectId("564359756336db32f2b4e8ce"),
    "Symbol" : "MSFT",
    "Timestamp" : "2009-08-24 09:30",
    "Day" : 24,
    "Open" : 24.41,
    "High" : 24.42,
    "Low" : 24.31,
    "Close" : 24.31,
    "Volume" : 683713
}
> exit

login to beeline

if you get error jdbc:hive2://localhost:10000 (closed)> Error: Failed to open new session: java.lang.RuntimeException: java.lang.RuntimeException: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.authorize.AuthorizationException): User: hive is not allowed to impersonate root (state=,code=0)
go to core-site and replace “users” with “*” in proxyusers for hive group

make sure jars are copied to hdp libs otherwise will get the error in the jira below

hdfs dfs -put drivers/* /tmp/udfs

beeline
!connect jdbc:hive2://localhost:10000 “” ””

add jar hdfs://sandbox.hortonworks.com:8020/tmp/udfs/mongo-hadoop-hive-1.5.0-SNAPSHOT.jar;
add jar hdfs://sandbox.hortonworks.com:8020/tmp/udfs/mongo-hadoop-core-1.5.0-SNAPSHOT.jar;
add jar hdfs://sandbox.hortonworks.com:8020/tmp/udfs/mongodb-driver-3.0.4.jar;
DROP TABLE IF EXISTS bars;
CREATE EXTERNAL TABLE bars
(
objectid STRING,
    Symbol STRING,
    TS STRING,
    Day INT,
    Open DOUBLE,
    High DOUBLE,
    Low DOUBLE,
    Close DOUBLE,
    Volume INT
)
STORED BY 'com.mongodb.hadoop.hive.MongoStorageHandler'
WITH SERDEPROPERTIES('mongo.columns.mapping'='{"objectid":"_id",
 "Symbol":"Symbol", "TS":"Timestamp", "Day":"Day", "Open":"Open", "High":"High", "Low":"Low", "Close":"Close", "Volume":"Volume"}')
TBLPROPERTIES('mongo.uri'='mongodb://localhost:27017/marketdata.minibars');
if you encounter error: Error while processing statement: FAILED: Hive Internal Error: com.sun.jersey.api.client.ClientHandlerException(java.io.IOException: java.net.ConnectException: Connection refused) (state=08S01,code=12)

shut down all services and restart the Sandbox, hive metastore ports most likely conflicting

query the table

select * from bars where bars.volume > 5000000 and bars.volume < 10000000;

+---------------------------+--------------+-------------------+-----------+------------+------------+-----------+-------------+--------------+--+
|       bars.objectid       | bars.symbol  |      bars.ts      | bars.day  | bars.open  | bars.high  | bars.low  | bars.close  | bars.volume  |
+---------------------------+--------------+-------------------+-----------+------------+------------+-----------+-------------+--------------+--+
| 564359756336db32f2b4f1f7  | MSFT         | 2009-08-31 16:00  | 31        | 24.64      | 24.65      | 24.64     | 24.65       | 5209285      |
| 564359756336db32f2b4ff6f  | MSFT         | 2009-09-14 16:00  | 14        | 25.0       | 25.0       | 24.99     | 25.0        | 9574088      |
| 564359756336db32f2b5027d  | MSFT         | 2009-09-16 16:00  | 16        | 25.21      | 25.22      | 25.18     | 25.2        | 7920502      |
| 564359756336db32f2b50eb5  | MSFT         | 2009-09-28 16:00  | 28        | 25.85      | 25.89      | 25.83     | 25.83       | 5487064      |
| 564359756336db32f2b5210a  | MSFT         | 2009-10-16 09:30  | 16        | 26.45      | 26.6       | 26.45     | 26.48       | 5092072      |
| 564359756336db32f2b52902  | MSFT         | 2009-10-23 10:55  | 23        | 28.55      | 28.56      | 28.3      | 28.35       | 5941372      |
| 564359766336db32f2b54721  | MSFT         | 2009-11-20 09:30  | 20        | 29.66      | 29.72      | 29.62     | 29.63       | 6859911      |
| 564359766336db32f2b59cba  | MSFT         | 2010-02-12 16:00  | 12        | 27.94      | 27.94      | 27.93     | 27.93       | 5076037      |
| 564359766336db32f2b5c14f  | MSFT         | 2010-03-19 16:00  | 19        | 29.6       | 29.61      | 29.58     | 29.59       | 8826314      |
| 564359766336db32f2b5cd17  | MSFT         | 2010-03-31 14:08  | 31        | 29.45      | 29.46      | 29.4      | 29.46       | 5314205      |
| 564359766336db32f2b5dccc  | MSFT         | 2010-04-15 16:00  | 15        | 30.87      | 30.87      | 30.87     | 30.87       | 5228182      |
| 564359766336db32f2b5dccd  | MSFT         | 2010-04-16 09:30  | 16        | 30.79      | 30.88      | 30.75     | 30.86       | 6267858      |
| 564359766336db32f2b5de53  | MSFT         | 2010-04-16 16:00  | 16        | 30.68      | 30.7       | 30.67     | 30.67       | 5014677      |
| 564359766336db32f2b5e77d  | MSFT         | 2010-04-26 16:00  | 26        | 31.1       | 31.11      | 31.09     | 31.11       | 5338985      |
| 564359776336db32f2b5fcd0  | MSFT         | 2010-05-14 16:00  | 14        | 28.93      | 28.93      | 28.93     | 28.93       | 5318496      |
| 564359776336db32f2b613b9  | MSFT         | 2010-06-07 16:00  | 7         | 25.3       | 25.31      | 25.29     | 25.29       | 6956406      |
| 564359776336db32f2b616c7  | MSFT         | 2010-06-09 16:00  | 9         | 24.79      | 24.81      | 24.78     | 24.79       | 7953364      |

order by or any select into won’t work, check status of

SPARK

pyspark --jars drivers/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar

Paste the following in PySpark shell

# set up parameters for reading from MongoDB via Hadoop input format
config = {"mongo.input.uri": "mongodb://localhost:27017/marketdata.minibars"}
inputFormatClassName = "com.mongodb.hadoop.MongoInputFormat"
# these values worked but others might as well
keyClassName = "org.apache.hadoop.io.Text"
valueClassName = "org.apache.hadoop.io.MapWritable"

# read the 1-minute bars from MongoDB into Spark RDD format
minBarRawRDD = sc.newAPIHadoopRDD(inputFormatClassName, keyClassName, valueClassName, None, None, config)

# configuration for output to MongoDB
config["mongo.output.uri"] = "mongodb://localhost:27017/marketdata.fiveminutebars"
outputFormatClassName = "com.mongodb.hadoop.MongoOutputFormat"

# takes the verbose raw structure (with extra metadata) and strips down to just the pricing data
minBarRDD = minBarRawRDD.values()

minBarRDD.saveAsTextFile("hdfs://sandbox.hortonworks.com:8020/user/root/spark-mongo-output3")

cat the file in hdfs

hdfs dfs -cat spark-mongo-output3/part-00000 | head -n 5

PIG

download enron dataset

wget https://s3.amazonaws.com/mongodb-enron-email/enron_mongo.tar.bz2

bzip2 -d enron_mongo.tar.bz2
tar -xvf enron_mongo.tar

restore database

mongorestore dump/enron_mail/messages.bson

# add user
mongo

use enron_mail
db.createUser(
    {
      user: "reportsUser",
      pwd: "12345678",
      roles: [
         { role: "readWrite", db: "enron_mail" },
         { role: "readWrite", db: "enron_processed" }
      ]
    }
)


# query mongodb, select all rows
use enron_mail
db.messages.find() 

# create new mongodb database
use enron_processed
db.createUser(
    {
      user: "writesUser",
      pwd: "12345678",
      roles: [
         { role: "readWrite", db: "enron_processed" }
      ]
    }
)
> exit

Make sure you run the following Pig script in Tez mode, in MR mode it will run approx 15minutes

pig -x tez load_store_mongodb.pig

make sure you run the following Pig script in tez_local mode, we’re not working with HDFS here.

pig -x tez_local load_store_bson.pig

review the output

head -n 5 /tmp/enron_result.bson/part-v001-o000-r-00000.bson

load messages in mongo format and store in Pig format

hdfs dfs -put dump/enron_mail/messages.bson /tmp/

pig -x tez load_store_bson_hdfs.pig

check output

hdfs dfs -cat /tmp/enronoutputpig/part-v001-o000-r-00000 | head -n 5

cleanup

hdfs dfs -rm -r spark-mongo-output*
hdfs dfs -rm -r /tmp/messages.bson
hdfs dfs -rm -r /tmp/enronoutputpig

Contributions welcome, big thanks to nikunjness for mongo-ambari service, 

markdown editor: StackEdit.io