Using mongo-hadoop connector to interact with MongoDB using Pig, Hive and Spark (Update)

I published a set of Pig, Hive and Spark scripts to interact with MongoDB using mongo-hadoop connector. Some of the published tutorials on Mongo and Hadoop on Databricks and MongoDB sites are no longer working, I decided to update them for HDP 2.3. Some things are still wonky, like Hive queries failing if you try to run anything other than select. Either way, give it a try and provide feedback.

One more thing, I'm using Sandbox with HDP 2.3.2 and mongo is installed as an Ambari service using tutorial from github user nikunjness, made my work so much easier.

The code is published on my github page as well as on Hortonworks Community Site.

Thanks and enjoy.

Sample tutorial on HDP integration with MongoDB using Ambari, Spark, Hive and Pig

Prerequisites

HDP 2.3.2 Sandbox
Mongo 2.6.11
install MongoDB service as per https://github.com/nikunjness/mongo-ambari

IMPORTANT

make sure you change directory to home after completing the mongo-ambari service install
cd

install gradle

wget https://services.gradle.org/distributions/gradle-2.7-bin.zip
unzip gradle-2.7-bin.zip
mv gradle-2.7 /opt/
export GRADLE_HOME=/opt/gradle-2.7/bin/

download mongo-hadoop

wget https://github.com/mongodb/mongo-hadoop/archive/master.zip
unzip master.zip
cd mongo-hadoop-master/

compile the connectors, should take between 2-10min

./gradlew jar

copy drivers to one directory

mkdir ~/drivers
cd ~/drivers

download mongodb java drivers or build your own

wget https://oss.sonatype.org/content/repositories/releases/org/mongodb/mongodb-driver/3.0.4/mongodb-driver-3.0.4.jar

or build using the following pom

cp ~/mongo-hadoop-master/core/build/libs/mongo-hadoop-core-1.5.0-SNAPSHOT.jar ~/drivers
cp ~/mongo-hadoop-master/pig/build/libs/mongo-hadoop-pig-1.5.0-SNAPSHOT.jar ~/drivers
cp ~/mongo-hadoop-master/hive/build/libs/mongo-hadoop-hive-1.5.0-SNAPSHOT.jar ~/drivers
cp ~/mongo-hadoop-master/spark/build/libs/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar ~/drivers
cp ~/mongo-hadoop-master/flume/build/libs/flume-1.5.0-SNAPSHOT.jar ~/drivers

copy drivers to hdp libs, needs these on the classpath

cp -r ~/drivers/* /usr/hdp/current/hadoop-client/lib/

restart services in Ambari

create local user

cd
sudo -u hdfs hdfs dfs -mkdir /user/root
sudo -u hdfs hdfs dfs -chown -R root:hdfs /user/root

HIVE

wget http://www.barchartmarketdata.com/data-samples/mstf.csv

load data into mongo

mongoimport mstf.csv --type csv --headerline -d marketdata -c minibars

check data is in mongo

[root@sandbox mongo-tutorial]# mongo
MongoDB shell version: 2.6.11
connecting to: test
> use marketdata
switched to db marketdata
> db.minibars.findOne()
{
    "_id" : ObjectId("564359756336db32f2b4e8ce"),
    "Symbol" : "MSFT",
    "Timestamp" : "2009-08-24 09:30",
    "Day" : 24,
    "Open" : 24.41,
    "High" : 24.42,
    "Low" : 24.31,
    "Close" : 24.31,
    "Volume" : 683713
}
> exit

login to beeline

if you get error jdbc:hive2://localhost:10000 (closed)> Error: Failed to open new session: java.lang.RuntimeException: java.lang.RuntimeException: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.authorize.AuthorizationException): User: hive is not allowed to impersonate root (state=,code=0)
go to core-site and replace “users” with “*” in proxyusers for hive group

make sure jars are copied to hdp libs otherwise will get the error in the jira below

hdfs dfs -put drivers/* /tmp/udfs

beeline
!connect jdbc:hive2://localhost:10000 “” ””

add jar hdfs://sandbox.hortonworks.com:8020/tmp/udfs/mongo-hadoop-hive-1.5.0-SNAPSHOT.jar;
add jar hdfs://sandbox.hortonworks.com:8020/tmp/udfs/mongo-hadoop-core-1.5.0-SNAPSHOT.jar;
add jar hdfs://sandbox.hortonworks.com:8020/tmp/udfs/mongodb-driver-3.0.4.jar;
DROP TABLE IF EXISTS bars;
CREATE EXTERNAL TABLE bars
(
objectid STRING,
    Symbol STRING,
    TS STRING,
    Day INT,
    Open DOUBLE,
    High DOUBLE,
    Low DOUBLE,
    Close DOUBLE,
    Volume INT
)
STORED BY 'com.mongodb.hadoop.hive.MongoStorageHandler'
WITH SERDEPROPERTIES('mongo.columns.mapping'='{"objectid":"_id",
 "Symbol":"Symbol", "TS":"Timestamp", "Day":"Day", "Open":"Open", "High":"High", "Low":"Low", "Close":"Close", "Volume":"Volume"}')
TBLPROPERTIES('mongo.uri'='mongodb://localhost:27017/marketdata.minibars');
if you encounter error: Error while processing statement: FAILED: Hive Internal Error: com.sun.jersey.api.client.ClientHandlerException(java.io.IOException: java.net.ConnectException: Connection refused) (state=08S01,code=12)

shut down all services and restart the Sandbox, hive metastore ports most likely conflicting

query the table

select * from bars where bars.volume > 5000000 and bars.volume < 10000000;

+---------------------------+--------------+-------------------+-----------+------------+------------+-----------+-------------+--------------+--+
|       bars.objectid       | bars.symbol  |      bars.ts      | bars.day  | bars.open  | bars.high  | bars.low  | bars.close  | bars.volume  |
+---------------------------+--------------+-------------------+-----------+------------+------------+-----------+-------------+--------------+--+
| 564359756336db32f2b4f1f7  | MSFT         | 2009-08-31 16:00  | 31        | 24.64      | 24.65      | 24.64     | 24.65       | 5209285      |
| 564359756336db32f2b4ff6f  | MSFT         | 2009-09-14 16:00  | 14        | 25.0       | 25.0       | 24.99     | 25.0        | 9574088      |
| 564359756336db32f2b5027d  | MSFT         | 2009-09-16 16:00  | 16        | 25.21      | 25.22      | 25.18     | 25.2        | 7920502      |
| 564359756336db32f2b50eb5  | MSFT         | 2009-09-28 16:00  | 28        | 25.85      | 25.89      | 25.83     | 25.83       | 5487064      |
| 564359756336db32f2b5210a  | MSFT         | 2009-10-16 09:30  | 16        | 26.45      | 26.6       | 26.45     | 26.48       | 5092072      |
| 564359756336db32f2b52902  | MSFT         | 2009-10-23 10:55  | 23        | 28.55      | 28.56      | 28.3      | 28.35       | 5941372      |
| 564359766336db32f2b54721  | MSFT         | 2009-11-20 09:30  | 20        | 29.66      | 29.72      | 29.62     | 29.63       | 6859911      |
| 564359766336db32f2b59cba  | MSFT         | 2010-02-12 16:00  | 12        | 27.94      | 27.94      | 27.93     | 27.93       | 5076037      |
| 564359766336db32f2b5c14f  | MSFT         | 2010-03-19 16:00  | 19        | 29.6       | 29.61      | 29.58     | 29.59       | 8826314      |
| 564359766336db32f2b5cd17  | MSFT         | 2010-03-31 14:08  | 31        | 29.45      | 29.46      | 29.4      | 29.46       | 5314205      |
| 564359766336db32f2b5dccc  | MSFT         | 2010-04-15 16:00  | 15        | 30.87      | 30.87      | 30.87     | 30.87       | 5228182      |
| 564359766336db32f2b5dccd  | MSFT         | 2010-04-16 09:30  | 16        | 30.79      | 30.88      | 30.75     | 30.86       | 6267858      |
| 564359766336db32f2b5de53  | MSFT         | 2010-04-16 16:00  | 16        | 30.68      | 30.7       | 30.67     | 30.67       | 5014677      |
| 564359766336db32f2b5e77d  | MSFT         | 2010-04-26 16:00  | 26        | 31.1       | 31.11      | 31.09     | 31.11       | 5338985      |
| 564359776336db32f2b5fcd0  | MSFT         | 2010-05-14 16:00  | 14        | 28.93      | 28.93      | 28.93     | 28.93       | 5318496      |
| 564359776336db32f2b613b9  | MSFT         | 2010-06-07 16:00  | 7         | 25.3       | 25.31      | 25.29     | 25.29       | 6956406      |
| 564359776336db32f2b616c7  | MSFT         | 2010-06-09 16:00  | 9         | 24.79      | 24.81      | 24.78     | 24.79       | 7953364      |

order by or any select into won’t work, check status of

SPARK

pyspark --jars drivers/mongo-hadoop-spark-1.5.0-SNAPSHOT.jar

Paste the following in PySpark shell

# set up parameters for reading from MongoDB via Hadoop input format
config = {"mongo.input.uri": "mongodb://localhost:27017/marketdata.minibars"}
inputFormatClassName = "com.mongodb.hadoop.MongoInputFormat"
# these values worked but others might as well
keyClassName = "org.apache.hadoop.io.Text"
valueClassName = "org.apache.hadoop.io.MapWritable"

# read the 1-minute bars from MongoDB into Spark RDD format
minBarRawRDD = sc.newAPIHadoopRDD(inputFormatClassName, keyClassName, valueClassName, None, None, config)

# configuration for output to MongoDB
config["mongo.output.uri"] = "mongodb://localhost:27017/marketdata.fiveminutebars"
outputFormatClassName = "com.mongodb.hadoop.MongoOutputFormat"

# takes the verbose raw structure (with extra metadata) and strips down to just the pricing data
minBarRDD = minBarRawRDD.values()

minBarRDD.saveAsTextFile("hdfs://sandbox.hortonworks.com:8020/user/root/spark-mongo-output3")

cat the file in hdfs

hdfs dfs -cat spark-mongo-output3/part-00000 | head -n 5

PIG

download enron dataset

wget https://s3.amazonaws.com/mongodb-enron-email/enron_mongo.tar.bz2

bzip2 -d enron_mongo.tar.bz2
tar -xvf enron_mongo.tar

restore database

mongorestore dump/enron_mail/messages.bson

# add user
mongo

use enron_mail
db.createUser(
    {
      user: "reportsUser",
      pwd: "12345678",
      roles: [
         { role: "readWrite", db: "enron_mail" },
         { role: "readWrite", db: "enron_processed" }
      ]
    }
)


# query mongodb, select all rows
use enron_mail
db.messages.find() 

# create new mongodb database
use enron_processed
db.createUser(
    {
      user: "writesUser",
      pwd: "12345678",
      roles: [
         { role: "readWrite", db: "enron_processed" }
      ]
    }
)
> exit

Make sure you run the following Pig script in Tez mode, in MR mode it will run approx 15minutes

pig -x tez load_store_mongodb.pig

make sure you run the following Pig script in tez_local mode, we’re not working with HDFS here.

pig -x tez_local load_store_bson.pig

review the output

head -n 5 /tmp/enron_result.bson/part-v001-o000-r-00000.bson

load messages in mongo format and store in Pig format

hdfs dfs -put dump/enron_mail/messages.bson /tmp/

pig -x tez load_store_bson_hdfs.pig

check output

hdfs dfs -cat /tmp/enronoutputpig/part-v001-o000-r-00000 | head -n 5

cleanup

hdfs dfs -rm -r spark-mongo-output*
hdfs dfs -rm -r /tmp/messages.bson
hdfs dfs -rm -r /tmp/enronoutputpig

Contributions welcome, big thanks to nikunjness for mongo-ambari service, 

markdown editor: StackEdit.io

Comments

Popular posts from this blog

Vista Vulnerability Report mentions Ubuntu 6.06 LTS

Running CockroachDB with Docker Compose and Minio, Part 2

Doing "print screen" on a Mac is a pain in the ass