Hive & Hdfs to HBase Phoenix using Pig script

Few days before I had to work in my project to import data from hive to hbase phoenix database and the script I used during that time is given below :

Hive table schema

 	col_name	data_type	comment
0	roadid	int	
1	year	int	
2	month	int	
3	day	int	
4	hour	int	
5	total_trips	bigint

Here is Hive table snap short which has been stored in phoenix table

0	1612342	2016	4	1	3	3
1	1612342	2016	4	1	4	1
2	1612342	2016	4	1	5	8
3	1612342	2016	4	2	4	1
4	1612342	2016	4	2	7	4
5	1612342	2016	4	2	8	2
6	1612342	2016	4	2	9	2
7	1612342	2016	4	2	10	2
8	1612342	2016	4	2	11	2
9	1612342	2016	4	2	12	4
10	1612342	2016	4	3	11	1
11	1612342	2016	4	3	12	2
12	1612342	2016	4	3	13	4
13	1612342	2016	4	3	14	2
14	1612342	2016	4	3	15	3
15	1612342	2016	4	3	16	2
16	1612342	2016	4	3	17	4
17	1612342	2016	4	4	18	4
18	1612342	2016	4	6	2	1
19	1612342	2016	4	7	2	1

Pig Script

REGISTER /usr/hdp/current/phoenix-client/phoenix-client.jar

waypoints = LOAD 'zurich.zurich_waypoints_orc_cum' USING org.apache.hive.hcatalog.pig.HCatLoader();

STORE waypoints into 'hbase://waypoint_cum' using
org.apache.phoenix.pig.PhoenixHBaseStorage('namenode.selise.ch', '-batchSize 5000');

Here you can see that I have used a phoenix-client.jar which is located in my hortonworks distribution. Simply I have just registered the jar for the pig script.

Sample Hdfs data what I have stored in my phoenix db using pig script.

1887574,2016-04-05T10:26:43.000Z,8.6782,47.4279,c5a3d5c06036818187068cbf82d81f87,2016,04,05,10
1887574,2016-04-05T10:27:29.000Z,8.6829,47.4375,c5a3d5c06036818187068cbf82d81f87,2016,04,05,10
1887574,2016-04-05T10:28:54.000Z,8.7012,47.451,c5a3d5c06036818187068cbf82d81f87,2016,04,05,10
1822482,2016-04-07T16:26:50.000Z,8.5743,47.417,36b454bb2f66270d192d55fb35f68a59,2016,04,07,16
1974545,2016-04-07T16:27:32.000Z,8.5743,47.4221,36b454bb2f66270d192d55fb35f68a59,2016,04,07,16
1974545,2016-04-07T16:27:37.000Z,8.5742,47.4222,36b454bb2f66270d192d55fb35f68a59,2016,04,07,16
1974545,2016-04-07T16:27:42.000Z,8.5742,47.4223,36b454bb2f66270d192d55fb35f68a59,2016,04,07,16
1974545,2016-04-07T16:27:47.000Z,8.5742,47.4224,36b454bb2f66270d192d55fb35f68a59,2016,04,07,16
1974545,2016-04-07T16:27:52.000Z,8.5741,47.4225,36b454bb2f66270d192d55fb35f68a59,2016,04,07,16
1974545,2016-04-07T16:28:17.000Z,8.5741,47.4225,36b454bb2f66270d192d55fb35f68a59,2016,04,07,16
1974545,2016-04-07T16:28:18.000Z,8.574,47.4226,36b454bb2f66270d192d55fb35f68a59,2016,04,07,16
1974545,2016-04-07T16:28:21.000Z,8.5739,47.4227,36b454bb2f66270d192d55fb35f68a59,2016,04,07,16
1974545,2016-04-07T16:28:22.000Z,8.5738,47.4227,36b454bb2f66270d192d55fb35f68a59,2016,04,07,16
1974545,2016-04-07T16:28:27.000Z,8.5735,47.423,36b454bb2f66270d192d55fb35f68a59,2016,04,07,16
1974545,2016-04-07T16:28:28.000Z,8.5734,47.4231,36b454bb2f66270d192d55fb35f68a59,2016,04,07,16

The script what I have used to import hdfs data in phoenix/hbase

REGISTER /usr/hdp/current/phoenix-client/phoenix-client.jar

waypoints = LOAD 'hdfs://datanode1.selise.ch/home/admin/pyspark/zurich-road-waypoints-trimmed/part-m-000[0-9]*'
USING PigStorage(',')
AS (roadid:int, capture_date:chararray, longitude:double, latitude:double, tripid:chararray, year:int, month:int, day:int, hour:int);

STORE waypoints into 'hbase://waypoint' using
org.apache.phoenix.pig.PhoenixHBaseStorage('namenode.selise.ch', '-batchSize 5000');

Now lets run the script to test


pig  -x mapreduce hive2phoenix.pig

pig -x mapreduce hdfs2phoenix.pig

Geographical nearest neighbor using bigdata technologies

One of my complex problem in my blog. Yap its a little bit complex when you are dealing with bigdata for your specific problem. The problem you need to basically solve a nearest particle for a specific points. Then number of particle and number of points can be large. So obviously you have to handle this things using BigData, GIS, SQL, and some other algorithmic knowledge.

First of all I need to explain what is rtree. What purposes do we need to use this tree? How the problem can be solved and what kind of problem.

R-tree was Invented by Antonin Guttman in 1984 which is also my birth year :-D. So I am gonna explain this tree right at this moment.

R-trees are tree data structures used for spatial access methods, i.e., for indexing multi-dimensional information such as geographical coordinates, rectangles or polygons. A common real-world usage for an R-tree might be to store spatial objects such as restaurant locations or the polygons that typical maps are made of: streets, buildings, outlines of lakes, coastlines, etc. and then find answers quickly to queries such as “Find all museums within 2 km of my current location”, “retrieve all road segments within 2 km of my location” (to display them in a navigation system) or “find the nearest gas station” (although not taking roads into account). The R-tree can also accelerate nearest neighbor search for various distance metrics, including great-circle distance – wikipedia

Here is sample code snippet for my bigdata geo spatial nearest neighbor search algorithm. What I have used in here. First of all I have used python library shapely, rtree and anaconda package manager and few bigdata tools which are spark cluster, hadoop hdfs file systems
which is running over 3 node cluster, I have used 1 driver node and 3 slave node for my spark and hadoop cluster. When I run the script on hadoop cluster using spark at that time the task was not that much easy. I had to handle lots of things to run the cluster properly. I have used hortonworks hadoop distribution for my bigdata platform.

from shutil import rmtree
from tempfile import mkdtemp

from pyspark import SparkFiles
from pyspark.sql.types import StringType
from rtree import index
from shapely.geometry import asPoint
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row

from  shapely import speedups
import os

sc = SparkContext()

def build_nearest_road(linestrings):
        def nearest_road(x, y):
                from shapely.geometry import asPoint
                from rtree import index
                from  shapely import speedups
                idx = index.Index(SparkFiles.get('index.idx')[:-4])
                try:
                        min_name = "NOT FOUND"
                        min_distance = 2000000000.0
                        min_linestring = "NOT FOUND"
                        for fid in idx.nearest((float(x), float(y), float(x), float(y)), 20):
                                speedups.enable()
                                point = asPoint([float(x), float(y)])
                                (name, linestring) = road_broadcast.value[fid]


                                from shapely.geometry import LineString
                                linestring = LineString(linestring)
                                distance = point.distance(linestring)
                                if min_distance > distance:
                                        min_distance = distance
                                        min_name = name
                                        #min_linestring = linestring
                        return min_name
                except:
                        import sys
                        print str(sys.exc_info()[1])
                        return "NOT FOUND"
                        #if point.intersects(polygon):
                        #       return name
                return "NOT FOUND"

        from shapely.geometry import Point
        from shapely.geometry import Polygon
        from shapely.geometry import LineString
        from shapely.geometry import LineString, mapping, shape
        from shapely.wkt import loads
        linestrings_list = []
        for (name, linestring_wkt) in linestrings:
                linestring = loads(linestring_wkt)
                #linestring = shape(mapping(linestring))
                linestring = LineString(linestring.coords)
                linestrings_list.append((name, linestring))

        tempdir = mkdtemp()
        basename = os.path.join(tempdir, 'index')

        #try:
        idx = index.Index(basename, ((fid, linestring.bounds, None)
                for (fid, (name, linestring)) in enumerate(linestrings_list)))
        idx.close()
        sc.addFile(basename + '.idx')
        sc.addFile(basename + '.dat')
        #finally:
        #       rmtree(tempdir)

        road_broadcast = sc.broadcast(linestrings_list)
        return nearest_road

roads = sc.textFile("hdfs://datanode1.selise.ch/user/admin/geodjango/roads/part-m-00[0-9]*")
parts_list = roads.map(lambda l: l.split("LINESTRING"))
roads = parts_list.map(lambda parts: (parts[0], "LINESTRING" + parts[1]))
roads = roads.toLocalIterator()

sqlContext = SQLContext(sc)

zurich_waypoints = sc.textFile("hdfs://datanode1.selise.ch/user/admin/waypoints_in_zurich_2w/part-m-00[0-9]*")
zurich_waypoints = zurich_waypoints.map(lambda l: l.split(","))
zurich_waypoints = zurich_waypoints.map(lambda parts: Row(longitude=parts[3], latitude=parts[4], tripid=parts[0], CaptureDateHour=parts[1]))
zurich_waypoints = sqlContext.createDataFrame(zurich_waypoints)
zurich_waypoints.registerTempTable("zurich_waypoints")

sqlContext.registerFunction('getNearestRoad', build_nearest_road(roads))

df = sqlContext.sql("select getNearestRoad(zw.latitude, zw.longitude), zw.* from zurich_waypoints zw")
df.write.format('com.databricks.spark.csv').mode('overwrite').save('/home/admin/pyspark/zurich-road-waypoints-yarn-cluster.csv')

Now lets discuss the coding. Here you can see that I have used a python nested function for spark sql context. I have tried to build my road network in rtree as a spark udf function which will run as a distributed geo rtree over 3 hadoop node. That means you can query any road info into the rtree using spark sql context and the search functionality will work over multiple pcs. I have used a threshold value in rtree for over lapping or collusion detection for my road network. I have found that when a rtree form using road networks its not always the nearest road for a specific point. Here is the picture you might consider for your understanding the thoughts.

You could see that red is the point when rtree form using road networks it will return the R2 road but originally the nearest neighbor is R1. So I have considered a threshold value to over come the problems.

Here is a small sample data for testing the script

Near about 10Million data for testing

All data nodes are running.

Cpu usages 100%

You can see that datanode1.selise.ch, datanode2.selise.ch, namenode.selise.ch are running

All nodes are running

The article is not completed. I will explain

Why Hive on top of Hadoop spatial framework is not good for this kind of analysis

Why postgis is not suitable for this kind of analysis

Continued …

Parking spot analysis and migration techniques

My hive cluster is running on 3 worker pcs. My name node configuration is core i5 and 20GB RAM and 2 other data nodes are 16GB RAM and core i5 processor. All of my pc HDD are sata and I’m using centos 7. My hadoop cluster running using ambari and hortonworks 2.4. If you don’t know about hortonworks then please try to download the sandbox from hortonworks website to start your journery using hive. https://hortonworks.com/products/sandbox/.

Today I am not going to give you the instruction regarding hortonworks installation. I will give you a another post where you will find the the installation procedure of hortonworks in multiple node using ambari.


hive:zurich> select count(*) from parking_events_text_partition;
   _col0
-----------
 229966246

Here you can see that almost 230 million record in my parking table.

select count(distinct(parking_status_event_id)) as total from zurich.parking_events_text_orc where parking_type = "Spot" and parking_city = "XXXXX" and parking_neighborhood = "XXXXXX" and parking_area = "XXXXXXXX" and parking_free =  0 and parking_event_time between from_utc_timestamp('2014-01-01 00:00:00.0','CET') and from_utc_timestamp('2015-08-01 00:00:00.0','CET')

Now I am trying to generate the number of records from 2014-01-01 to 2015-08-01 in my parking_events_text_orc where parking spots were free for a specific parking city, parking neighborhood and parking area.

Lets start the hive shell to run the query where hive table is ORC format

[root@namenode pyspark]# hive
WARNING: Use "yarn jar" to launch YARN applications.

Logging initialized using configuration in file:/etc/hive/2.4.2.0-258/0/hive-log4j.properties
hive> use zurich;
OK
Time taken: 2.04 seconds
hive> select count(distinct(parking_status_event_id)) as total from zurich.parking_events_text_orc where parking_type = "Spot" and parking_city = "XXXXXX" and parking_neighborhood = "XXXXXX" and parking_area = "XXXXXXX" and parking_free =  0 and parking_event_time between from_utc_timestamp('2014-01-01 00:00:00.0','CET') and from_utc_timestamp('2015-08-01 00:00:00.0','CET');

Here is your query which will run over hive table orc format

hive> select count(distinct(parking_status_event_id)) as total from zurich.parking_events_text_orc where parking_type = "Spot" and parking_city = "XXXXX" and parking_neighborhood = "XXXXXX" and parking_area = "XXXXXXX" and parking_free =  0 and parking_event_time between from_utc_timestamp('2014-01-01 00:00:00.0','CET') and from_utc_timestamp('2015-08-01 00:00:00.0','CET');
Query ID = root_20170329193604_0956af6f-3720-4ef8-804a-18cf2ef36c10
Total jobs = 1
Launching Job 1 out of 1


Status: Running (Executing on YARN cluster with App id application_1490785713790_0001)

--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ......         RUNNING     35         24       11        0       3       0
Reducer 2             INITED    118          0        0      118       0       0
Reducer 3             INITED      1          0        0        1       0       0
--------------------------------------------------------------------------------

VERTICES: 00/03 [====>>----------------------] 15% ELAPSED TIME: 25.89 s

--------------------------------------------------------------------------------

Query is running right at this moment.

hive> select count(distinct(parking_status_event_id)) as total from zurich.parking_events_text_orc where parking_type = "Spot" and parking_city = "XXXXX" and parking_neighborhood = "XXXXXXX" and parking_area = "XXXXXXX" and parking_free =  0 and parking_event_time between from_utc_timestamp('2014-01-01 00:00:00.0','CET') and from_utc_timestamp('2015-08-01 00:00:00.0','CET');
Query ID = root_20170329193604_0956af6f-3720-4ef8-804a-18cf2ef36c10
Total jobs = 1
Launching Job 1 out of 1


Status: Running (Executing on YARN cluster with App id application_1490785713790_0001)

--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED     35         35        0        0       3       0
Reducer 2 ......   SUCCEEDED     15         15        0        0       0       0
Reducer 3 ......   SUCCEEDED      1          1        0        0       0       0
--------------------------------------------------------------------------------

VERTICES: 03/03 [==========================>>] 100% ELAPSED TIME: 48.14 s

-------------------------------------------------------------------------------- OK 29943 Time taken: 53.44 seconds, Fetched: 1 row(s)

And it takes almost 53 second to generate the output

Now lets see the time duration for the parking_events_text_parquet table where hive table is parquet format.

hive> select count(distinct(parking_status_event_id)) as total from zurich.parking_events_text_parquet where parking_type = "Spot" and parking_city = "XXXXXX" and parking_neighborhood = "XXXXXXX" and parking_area = "XXXXXXX" and parking_free =  0 and parking_event_time between from_utc_timestamp('2014-01-01 00:00:00.0','CET') and from_utc_timestamp('2015-08-01 00:00:00.0','CET');
Query ID = root_20170329193806_c755690f-8839-4d12-aa4c-cc6122187cd6
Total jobs = 1
Launching Job 1 out of 1


Status: Running (Executing on YARN cluster with App id application_1490785713790_0001)

--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED     50         50        0        0       5       0
Reducer 2 ......   SUCCEEDED      1          1        0        0       0       0
Reducer 3 ......   SUCCEEDED      1          1        0        0       0       0
--------------------------------------------------------------------------------

VERTICES: 03/03 [==========================>>] 100% ELAPSED TIME: 68.80 s

-------------------------------------------------------------------------------- OK 29943 Time taken: 69.697 seconds, Fetched: 1 row(s)

And it tooks almost 69 second to complete

Now we could analysis the hive table as a text format.

hive> select count(distinct(parking_status_event_id)) as total from zurich.parking_events_text_partition where parking_type = "Spot" and parking_city = "XXXXXX" and parking_neighborhood = "XXXXXX" and parking_area = "XXXXXX" and parking_free =  0 and parking_event_time between from_utc_timestamp('2014-01-01 00:00:00.0','CET') and from_utc_timestamp('2015-08-01 00:00:00.0','CET');
Query ID = root_20170329193950_b3f851d2-f49f-4977-931b-6815f953d31f
Total jobs = 1
Launching Job 1 out of 1


Status: Running (Executing on YARN cluster with App id application_1490785713790_0001)

--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1                RUNNING     36          0       20       16       0       0
Reducer 2             INITED     18          0        0       18       0       0
Reducer 3             INITED      1          0        0        1       0       0
--------------------------------------------------------------------------------

VERTICES: 00/03 [>>--------------------------] 0% ELAPSED TIME: 69.74 s

Now you can see that already 69 second elapsed to generate 0% data to process when your hive table is running as a normal text format.

Hive data migration

Now its time to migrate your hive server data from one server to another. The data is not that much big which was < 35GB and 229 Million data, I had to migrate the data from one server to another, So what I actually did at that time is given below.

I have run a command to generate the result as a csv format

Now you could see that there are almost 37GB data which was generated after completed my shell command

Then I have created the tar file to compressed the data which is around 3.9GB

Now we are almost done to download the data from one server to another. Now there is a another challenge has come to your end, you need to import that data to your another server so you need to create a table to import which was partitioned before in your hive table. The schema was look like this

create table parking_events_text_partition
(
	parking_id string,
	parking_type string,
	parking_size int,
	parking_free int,
	parking_longitude double,
        parking_latitude double,
	parking_permit_types string,
	parking_country string,
	parking_city string,
	parking_neighborhood string,
	parking_area string,
	parking_address string,
	parking_event_type string,
	parking_event_time timestamp,
	parking_status_event_id string
)
partitioned by (`date` string)  
ROW FORMAT DELIMITED  
FIELDS TERMINATED BY ',';

Parquet

create table parking_events_text_parquet
(
	parking_id string,
	parking_type string,
	parking_size int,
	parking_free int,
	parking_longitude double,
        parking_latitude double,
	parking_permit_types string,
	parking_country string,
	parking_city string,
	parking_neighborhood string,
	parking_area string,
	parking_address string,
	parking_event_type string,
	parking_event_time timestamp,
	parking_status_event_id string,
    `date` string
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS parquet;
insert into parking_events_text_parquet select * from parking_events_text_partition;

ORC

create table parking_events_text_orc
(
	parking_id string,
	parking_type string,
	parking_size int,
	parking_free int,
	parking_longitude double,
        parking_latitude double,
	parking_permit_types string,
	parking_country string,
	parking_city string,
	parking_neighborhood string,
	parking_area string,
	parking_address string,
	parking_event_type string,
	parking_event_time timestamp,
	parking_status_event_id string,
        `date` string
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS orc;
insert into parking_events_text_orc select * from parking_events_text_partition;

Now you are need to restore your data in your hive table.

How to export remote pc’s csv file to HDFS file system?

Last night I had to work on hdfs where I need to import remote pc’s csv file to hdfs file system. Here are the steps what I have followed.

STEP1
First I have conformed password less login

mohiulalamprince@ubuntu:/home/postgres$ ssh-keygen -t rsa
Generating public/private rsa key pair.
Enter file in which to save the key (/home/mohiulalamprince/.ssh/id_rsa):
Enter passphrase (empty for no passphrase):
Enter same passphrase again:
Your identification has been saved in /home/mohiulalamprince/.ssh/id_rsa.
Your public key has been saved in /home/mohiulalamprince/.ssh/id_rsa.pub.
The key fingerprint is:
d1:17:73:19:74:15:81:15:6a:d4:f8:18:9b:02:f5:6b mohiulalamprince@ubuntu
The key's randomart image is:
+--[ RSA 2048]----+
|          ..ooOBB|
|         o  oBoo |
|        . o .+*  |
|         . o.+.. |
|        S   .E   |
|            .    |
|                 |
|                 |
|                 |
+-----------------+

STEP2

mohiulalamprince@ubuntu:/home/postgres$ ssh-copy-id admin@172.16.0.215
/usr/bin/ssh-copy-id: INFO: attempting to log in with the new key(s), to filter out any that are already installed
/usr/bin/ssh-copy-id: INFO: 1 key(s) remain to be installed -- if you are prompted now it is to install the new keys
admin@172.16.0.215's password:

Number of key(s) added: 1

Now try logging into the machine, with:   "ssh 'admin@172.16.0.215'"
and check to make sure that only the key(s) you wanted were added.

STEP3

mohiulalamprince@ubuntu:/home/postgres$ cat waypoints_road_area.csv | ssh admin@172.16.0.215 "hadoop dfs -put - /user/admin/waypoints_road_area.csv"
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.