MRJob using Amazon EMR (2 Small slave instances and 1 master node)

Today I will show you how to run a very basic map reduce word count job using AWS EMR. I have used EMR 4.8.2 and hadoop 2.7.3. Most of the time I actually used the hortonworks distribution for my works. But Today I will show you the amazon EMR.

First of all you need to create the security credential from your amazon account. If you don’t have any amazon account then don’t hesitate, you can be able to create a amazon account using a credit card. You don’t need to be pay for the first year (But there are some limitation for usages).

To run the map reduce job in amazon you need to install python pip. And by using pip you have to install awscli and mrjob which is given below.

pip install awscli
pip install mrjob

Now its time to create the amazon security credential from my amazon account. And I have included that amazon security credential to my mrjob.conf file. What you need to do, just open a text editor and paste below this lines

runners:
  emr:
    aws_access_key_id: xxxxxxxxxxxxxxxxxxxx
    aws_secret_access_key: XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
    num_core_instances: 2
    core_instance_type: m1.small

Now your configuration is almost done. Lets start to write down the word count map reduce job using python. My program is given below which has been downloaded from amazon website.


from mrjob.job import MRJob

class MRWordFrequencyCount(MRJob):
    def mapper(self, _, line):
        yield "chars", len(line)
        yield "words", len(line.split())
        yield "lines", 1

    def reducer(self, key, values):
        yield key, sum(values)

if __name__ == '__main__':
    MRWordFrequencyCount.run()

Now lets save the code in file name mr_word_count.py and try to run the program using your terminal

The command what I have followed is given below.

python mr_word_count.py -r emr README.rst --output-dir=s3://my-mr-jobs/wc_out2/ --conf-path=mrjob.conf 

Lets see what is happening in your console. My console is given below


python mr_word_count.py -r emr README.rst --output-dir=s3://my-mr-jobs/wc_out2/ --conf[46/1229$
ob.conf                                                                                                                                                       
Using s3://mrjob-be5ff8dcbdecb973/tmp/ as our temp dir on S3
Creating temp directory /var/folders/mh/qvz8smz53njctvsp0r8_xt040000gn/T/mr_word_count.mohiulalamprince.20170813.193712.456524
Copying local files to s3://mrjob-be5ff8dcbdecb973/tmp/mr_word_count.mohiulalamprince.20170813.193712.456524/files/...
Created new cluster j-1CBMSHE0GMZHN
Waiting for step 1 of 1 (s-2E4DYAQOEV00C) to complete...
  PENDING (cluster is STARTING)
  PENDING (cluster is STARTING)
  PENDING (cluster is STARTING)
  PENDING (cluster is STARTING: Configuring cluster software)
  PENDING (cluster is STARTING: Configuring cluster software)
  PENDING (cluster is BOOTSTRAPPING: Running bootstrap actions)
  PENDING (cluster is BOOTSTRAPPING: Running bootstrap actions)
  PENDING (cluster is BOOTSTRAPPING: Running bootstrap actions)
  PENDING (cluster is BOOTSTRAPPING: Running bootstrap actions)
  PENDING (cluster is BOOTSTRAPPING: Running bootstrap actions)
  PENDING (cluster is BOOTSTRAPPING: Running bootstrap actions)
  PENDING (cluster is BOOTSTRAPPING: Running bootstrap actions)
Terminating cluster: j-1CBMSHE0GMZHN

Now its time to see the amazon console, what is happening over there. Here is my console snap shot which is given below.

EMR Cluster list

EMR Cluster configuration

aws s3 ls s3://my-mr-jobs/wc_out2/
2017-08-14 01:49:20         12 part-00000

S3 output location

EMR Cluster list

aws emr list-clusters 
{
    "Clusters": [
        {
            "Status": {
                "Timeline": {
                    "ReadyDateTime": 1502653570.991, 
                    "CreationDateTime": 1502653079.879, 
                    "EndDateTime": 1502653805.849
                }, 
                "State": "TERMINATED", 
                "StateChangeReason": {
                    "Message": "Terminated by user request", 
                    "Code": "USER_REQUEST"
                }
            }, 
            "NormalizedInstanceHours": 4, 
            "Id": "j-1CBMSHE0GMZHN", 
            "Name": "mr_word_count.mohiulalamprince.20170813.193712.456524"
        }, 
        {
            "Status": {
                "Timeline": {
                    "ReadyDateTime": 1502650935.456, 
                    "CreationDateTime": 1502650470.064, 
                    "EndDateTime": 1502651081.962
                }, 
                "State": "TERMINATED", 
                "StateChangeReason": {
                    "Message": "Terminated by user request", 
                    "Code": "USER_REQUEST"
                }
            }, 
            "NormalizedInstanceHours": 2, 
            "Id": "j-1VLKU9ZZINT6L", 
            "Name": "mr_word_count.mohiulalamprince.20170813.185301.918036"
        }
    ]
}

The job is completed and terminated

Output result

Now lets give a try for this code snippet

from mrjob.job import MRJob
from mrjob.step import MRStep
import re

WORD_RE = re.compile(r"[\w']+")


class MRMostUsedWord(MRJob):

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_words,
                   combiner=self.combiner_count_words,
                   reducer=self.reducer_count_words),
            MRStep(reducer=self.reducer_find_max_word)
        ]

    def mapper_get_words(self, _, line):
        # yield each word in the line
        for word in WORD_RE.findall(line):
            yield (word.lower(), 1)

    def combiner_count_words(self, word, counts):
        # optimization: sum the words we've seen so far
        yield (word, sum(counts))

    def reducer_count_words(self, word, counts):
        # send all (num_occurrences, word) pairs to the same reducer.
        # num_occurrences is so we can easily use Python's max() function.
        yield None, (sum(counts), word)

    # discard the key; it is just None
    def reducer_find_max_word(self, _, word_count_pairs):
        # each item of word_count_pairs is (count, word),
        # so yielding one results in key=counts, value=word
        yield max(word_count_pairs)


if __name__ == '__main__':
    MRMostUsedWord.run()

Now its time to run a map reduce program on Hortonworks cluster. Its a 3 node cluster. My master node is consist of 20GB memory and 2 other slave nodes are 16 GB memory each. Lets see how to process the movielen data set using map reduce program. I have downloaded the movie lens dataset from grouplens.org. I have downloaded the largest data set which is around 26 Million ratings.Lets see how we can find the average rating per movies.

Grouplens 26 Million dataset analysis using map reduce

Download the data from grouplens.org

wget http://files.grouplens.org/datasets/movielens/ml-latest.zip

Unzip the file

unzip ml-latest.zip && cd ml-latest

Now its time to put your ratings.csv & movies.csv file into your hdfs


hadoop dfs -mkdir /user/root/movielen
hadoop dfs -put ratings.csv /user/root/movielen/
hadoop dfs -put movies.csv /user/root/movielen/

Now lets see the average rating code.

from mrjob.job import MRJob
import re

WORD_RE = re.compile(r"[\w']+")

class MRAverageRatingPerMovie(MRJob):

    def mapper(self, _, line):
        userid = movieid = rating = timestamp = 0
        try:
                data = line.split(",")
                userid = int(data[0])
                movieid = int(data[1])
                rating = float(data[2])
                timestamp = int(data[3])
        except:
                yield 0, 0
        yield movieid, rating

    def reducer(self, movieid, ratings):
        counter = 0
        total_ratings = 0.0
        for rating in ratings:
                total_ratings += rating
                counter += 1
        yield movieid, (1.0 * total_ratings)/counter * 1.0


if __name__ == '__main__':
    MRAverageRatingPerMovie.run()

How to run your map reduce program using mrjob on top of hortonworks distribution

Don’t to be afraid. Its a very basic approach. I have paste the command below this snippet.


[root@namenode ml-latest]# python AverageRatingCountPerMovie.py \
>     -r hadoop \
>     --hadoop-streaming-jar /usr/hdp/current/hadoop-client/hadoop-streaming.jar \
>     --output hdfs://datanode1.selise.ch/user/root/movielen/output5 \
>     hdfs:///user/root/movielen/ratings.csv > log

and here is my out put log once I press the enter button

No configs found; falling back on auto-configuration
Looking for hadoop binary in $PATH...
Found hadoop binary: /usr/bin/hadoop
Using Hadoop version 2.7.1.2.4.2.0
Creating temp directory /tmp/word_count.root.20170814.122244.762965
Copying local files to hdfs:///user/root/tmp/mrjob/word_count.root.20170814.122244.762965/files/...
Running step 1 of 1...
  WARNING: Use "yarn jar" to launch YARN applications.
  packageJobJar: [] [/usr/hdp/2.4.2.0-258/hadoop/hadoop-streaming.jar] /tmp/streamjob211933095411666257.jar tmpDir=null
  Timeline service address: http://datanode2.selise.ch:8188/ws/v1/timeline/
  Timeline service address: http://datanode2.selise.ch:8188/ws/v1/timeline/
  Total input paths to process : 1
  number of splits:6
  Submitting tokens for job: job_1502454305190_0007
  Submitted application application_1502454305190_0007
  The url to track the job: http://datanode2.selise.ch:8088/proxy/application_1502454305190_0007/
  Running job: job_1502454305190_0007
  Job job_1502454305190_0007 running in uber mode : false
   map 0% reduce 0%
   map 5% reduce 0%
   map 6% reduce 0%
   map 9% reduce 0%
   map 13% reduce 0%
   map 17% reduce 0%
   map 18% reduce 0%
   map 20% reduce 0%
   map 26% reduce 0%
   map 27% reduce 0%
   map 28% reduce 0%
   map 30% reduce 0%
   map 31% reduce 6%
   map 32% reduce 6%
   map 34% reduce 6%
   map 35% reduce 6%
   map 36% reduce 6%
   map 38% reduce 6%
   map 39% reduce 6%
   map 41% reduce 6%
   map 42% reduce 6%
   map 43% reduce 6%
   map 44% reduce 6%
   map 45% reduce 6%
   map 46% reduce 6%
   map 47% reduce 6%
   map 49% reduce 6%
   map 51% reduce 6%
   map 53% reduce 6%
   map 54% reduce 6%
   map 56% reduce 6%
   map 57% reduce 6%
   map 62% reduce 6%
   map 63% reduce 6%
   map 64% reduce 6%
   map 65% reduce 11%
   map 66% reduce 11%
   map 67% reduce 11%
   map 68% reduce 11%
   map 69% reduce 11%
   map 70% reduce 11%
   map 71% reduce 11%
   map 72% reduce 11%
   map 73% reduce 11%
   map 74% reduce 11%
   map 75% reduce 11%
   map 76% reduce 11%
   map 77% reduce 11%
   map 89% reduce 11%
   map 94% reduce 11%
   map 100% reduce 11%
   map 100% reduce 17%
   map 100% reduce 22%
   map 100% reduce 28%
   map 100% reduce 45%
   map 100% reduce 59%
   map 100% reduce 67%
   map 100% reduce 68%
   map 100% reduce 69%
   map 100% reduce 70%
   map 100% reduce 71%
   map 100% reduce 72%
   map 100% reduce 73%
   map 100% reduce 74%
   map 100% reduce 75%
   map 100% reduce 76%
   map 100% reduce 77%
   map 100% reduce 78%
   map 100% reduce 79%
   map 100% reduce 80%
   map 100% reduce 81%
   map 100% reduce 82%
   map 100% reduce 83%
   map 100% reduce 84%
   map 100% reduce 85%
   map 100% reduce 86%
   map 100% reduce 87%
   map 100% reduce 88%
   map 100% reduce 89%
   map 100% reduce 90%
   map 100% reduce 91%
   map 100% reduce 92%
   map 100% reduce 93%
   map 100% reduce 94%
   map 100% reduce 95%
   map 100% reduce 96%
   map 100% reduce 97%
   map 100% reduce 98%
   map 100% reduce 99%
   map 100% reduce 100%
  Job job_1502454305190_0007 completed successfully
  Output directory: hdfs://datanode1.selise.ch/user/root/movielen/output5
Counters: 49
        File Input Format Counters
                Bytes Read=710205687
        File Output Format Counters
                Bytes Written=808301
        File System Counters
                FILE: Number of bytes read=284899443
                FILE: Number of bytes written=570779561
                FILE: Number of large read operations=0
                FILE: Number of read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=710206371
                HDFS: Number of bytes written=808301
                HDFS: Number of large read operations=0
                HDFS: Number of read operations=21
                HDFS: Number of write operations=2
        Job Counters
                Data-local map tasks=6
                Launched map tasks=6
                Launched reduce tasks=1
                Total megabyte-seconds taken by all map tasks=972241920
                Total megabyte-seconds taken by all reduce tasks=533227520
                Total time spent by all map tasks (ms)=632970
                Total time spent by all maps in occupied slots (ms)=1265940
                Total time spent by all reduce tasks (ms)=260365
                Total time spent by all reduces in occupied slots (ms)=520730
                Total vcore-seconds taken by all map tasks=632970
                Total vcore-seconds taken by all reduce tasks=260365
        Map-Reduce Framework
                CPU time spent (ms)=727900
                Combine input records=0
                Combine output records=0
                Failed Shuffles=0
                GC time elapsed (ms)=5188
                Input split bytes=684
                Map input records=26024290
                Map output bytes=232850855
                Map output materialized bytes=284899473
                Map output records=26024291
                Merged Map outputs=6
                Physical memory (bytes) snapshot=7586971648
                Reduce input groups=45116
                Reduce input records=26024291
                Reduce output records=45116
                Reduce shuffle bytes=284899473
                Shuffled Maps =6
                Spilled Records=52048582
                Total committed heap usage (bytes)=7692353536
                Virtual memory (bytes) snapshot=23136010240
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
Streaming final output from hdfs://datanode1.selise.ch/user/root/movielen/output5...
Removing HDFS temp directory hdfs:///user/root/tmp/mrjob/word_count.root.20170814.122244.762965...
Removing temp directory /tmp/word_count.root.20170814.122244.762965...

Here is the std output

[root@namenode ml-latest]# head -n 50 log
0       0.0
1       3.8881574960610834
10      3.431840536054589
100     3.232303877366997
1000    3.0990783410138247
100001  4.0
100003  3.6666666666666665
100006  3.3333333333333335
100008  3.7142857142857144
100010  2.522727272727273
100013  3.4444444444444446
100015  2.5
100017  3.1
100032  3.6
100034  3.28125
100036  3.388888888888889
100038  3.4545454545454546
100040  3.0
100042  3.3333333333333335
100044  4.271573604060913
100046  3.769230769230769
100048  3.1666666666666665
100050  3.5
100052  2.0454545454545454
100054  3.75
100056  2.55
100058  3.2045454545454546
100060  3.576923076923077
100062  3.716666666666667
100068  3.6265060240963853
100070  3.5416666666666665
100072  2.875
100075  2.8157894736842106
100077  2.3333333333333335
100079  1.5
100081  3.7
100083  2.3055555555555554
100085  3.5
100087  2.8823529411764706
100089  3.6707317073170733
100091  3.0
100093  2.889830508474576
100096  3.125
100099  4.1
1001    2.8255813953488373
100101  4.0
100103  0.5
100106  3.7413793103448274
100108  3.16815144766147
100131  1.5

Hive & Hdfs to HBase Phoenix using Pig script

Few days before I had to work in my project to import data from hive to hbase phoenix database and the script I used during that time is given below :

Hive table schema

 	col_name	data_type	comment
0	roadid	int	
1	year	int	
2	month	int	
3	day	int	
4	hour	int	
5	total_trips	bigint

Here is Hive table snap short which has been stored in phoenix table

0	1612342	2016	4	1	3	3
1	1612342	2016	4	1	4	1
2	1612342	2016	4	1	5	8
3	1612342	2016	4	2	4	1
4	1612342	2016	4	2	7	4
5	1612342	2016	4	2	8	2
6	1612342	2016	4	2	9	2
7	1612342	2016	4	2	10	2
8	1612342	2016	4	2	11	2
9	1612342	2016	4	2	12	4
10	1612342	2016	4	3	11	1
11	1612342	2016	4	3	12	2
12	1612342	2016	4	3	13	4
13	1612342	2016	4	3	14	2
14	1612342	2016	4	3	15	3
15	1612342	2016	4	3	16	2
16	1612342	2016	4	3	17	4
17	1612342	2016	4	4	18	4
18	1612342	2016	4	6	2	1
19	1612342	2016	4	7	2	1

Pig Script

REGISTER /usr/hdp/current/phoenix-client/phoenix-client.jar

waypoints = LOAD 'zurich.zurich_waypoints_orc_cum' USING org.apache.hive.hcatalog.pig.HCatLoader();

STORE waypoints into 'hbase://waypoint_cum' using
org.apache.phoenix.pig.PhoenixHBaseStorage('namenode.selise.ch', '-batchSize 5000');

Here you can see that I have used a phoenix-client.jar which is located in my hortonworks distribution. Simply I have just registered the jar for the pig script.

Sample Hdfs data what I have stored in my phoenix db using pig script.

1887574,2016-04-05T10:26:43.000Z,8.6782,47.4279,c5a3d5c06036818187068cbf82d81f87,2016,04,05,10
1887574,2016-04-05T10:27:29.000Z,8.6829,47.4375,c5a3d5c06036818187068cbf82d81f87,2016,04,05,10
1887574,2016-04-05T10:28:54.000Z,8.7012,47.451,c5a3d5c06036818187068cbf82d81f87,2016,04,05,10
1822482,2016-04-07T16:26:50.000Z,8.5743,47.417,36b454bb2f66270d192d55fb35f68a59,2016,04,07,16
1974545,2016-04-07T16:27:32.000Z,8.5743,47.4221,36b454bb2f66270d192d55fb35f68a59,2016,04,07,16
1974545,2016-04-07T16:27:37.000Z,8.5742,47.4222,36b454bb2f66270d192d55fb35f68a59,2016,04,07,16
1974545,2016-04-07T16:27:42.000Z,8.5742,47.4223,36b454bb2f66270d192d55fb35f68a59,2016,04,07,16
1974545,2016-04-07T16:27:47.000Z,8.5742,47.4224,36b454bb2f66270d192d55fb35f68a59,2016,04,07,16
1974545,2016-04-07T16:27:52.000Z,8.5741,47.4225,36b454bb2f66270d192d55fb35f68a59,2016,04,07,16
1974545,2016-04-07T16:28:17.000Z,8.5741,47.4225,36b454bb2f66270d192d55fb35f68a59,2016,04,07,16
1974545,2016-04-07T16:28:18.000Z,8.574,47.4226,36b454bb2f66270d192d55fb35f68a59,2016,04,07,16
1974545,2016-04-07T16:28:21.000Z,8.5739,47.4227,36b454bb2f66270d192d55fb35f68a59,2016,04,07,16
1974545,2016-04-07T16:28:22.000Z,8.5738,47.4227,36b454bb2f66270d192d55fb35f68a59,2016,04,07,16
1974545,2016-04-07T16:28:27.000Z,8.5735,47.423,36b454bb2f66270d192d55fb35f68a59,2016,04,07,16
1974545,2016-04-07T16:28:28.000Z,8.5734,47.4231,36b454bb2f66270d192d55fb35f68a59,2016,04,07,16

The script what I have used to import hdfs data in phoenix/hbase

REGISTER /usr/hdp/current/phoenix-client/phoenix-client.jar

waypoints = LOAD 'hdfs://datanode1.selise.ch/home/admin/pyspark/zurich-road-waypoints-trimmed/part-m-000[0-9]*'
USING PigStorage(',')
AS (roadid:int, capture_date:chararray, longitude:double, latitude:double, tripid:chararray, year:int, month:int, day:int, hour:int);

STORE waypoints into 'hbase://waypoint' using
org.apache.phoenix.pig.PhoenixHBaseStorage('namenode.selise.ch', '-batchSize 5000');

Now lets run the script to test


pig  -x mapreduce hive2phoenix.pig

pig -x mapreduce hdfs2phoenix.pig

Geographical nearest neighbor using bigdata technologies

One of my complex problem in my blog. Yap its a little bit complex when you are dealing with bigdata for your specific problem. The problem you need to basically solve a nearest particle for a specific points. Then number of particle and number of points can be large. So obviously you have to handle this things using BigData, GIS, SQL, and some other algorithmic knowledge.

First of all I need to explain what is rtree. What purposes do we need to use this tree? How the problem can be solved and what kind of problem.

R-tree was Invented by Antonin Guttman in 1984 which is also my birth year :-D. So I am gonna explain this tree right at this moment.

R-trees are tree data structures used for spatial access methods, i.e., for indexing multi-dimensional information such as geographical coordinates, rectangles or polygons. A common real-world usage for an R-tree might be to store spatial objects such as restaurant locations or the polygons that typical maps are made of: streets, buildings, outlines of lakes, coastlines, etc. and then find answers quickly to queries such as “Find all museums within 2 km of my current location”, “retrieve all road segments within 2 km of my location” (to display them in a navigation system) or “find the nearest gas station” (although not taking roads into account). The R-tree can also accelerate nearest neighbor search for various distance metrics, including great-circle distance – wikipedia

Here is sample code snippet for my bigdata geo spatial nearest neighbor search algorithm. What I have used in here. First of all I have used python library shapely, rtree and anaconda package manager and few bigdata tools which are spark cluster, hadoop hdfs file systems
which is running over 3 node cluster, I have used 1 driver node and 3 slave node for my spark and hadoop cluster. When I run the script on hadoop cluster using spark at that time the task was not that much easy. I had to handle lots of things to run the cluster properly. I have used hortonworks hadoop distribution for my bigdata platform.

from shutil import rmtree
from tempfile import mkdtemp

from pyspark import SparkFiles
from pyspark.sql.types import StringType
from rtree import index
from shapely.geometry import asPoint
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row

from  shapely import speedups
import os

sc = SparkContext()

def build_nearest_road(linestrings):
        def nearest_road(x, y):
                from shapely.geometry import asPoint
                from rtree import index
                from  shapely import speedups
                idx = index.Index(SparkFiles.get('index.idx')[:-4])
                try:
                        min_name = "NOT FOUND"
                        min_distance = 2000000000.0
                        min_linestring = "NOT FOUND"
                        for fid in idx.nearest((float(x), float(y), float(x), float(y)), 20):
                                speedups.enable()
                                point = asPoint([float(x), float(y)])
                                (name, linestring) = road_broadcast.value[fid]


                                from shapely.geometry import LineString
                                linestring = LineString(linestring)
                                distance = point.distance(linestring)
                                if min_distance > distance:
                                        min_distance = distance
                                        min_name = name
                                        #min_linestring = linestring
                        return min_name
                except:
                        import sys
                        print str(sys.exc_info()[1])
                        return "NOT FOUND"
                        #if point.intersects(polygon):
                        #       return name
                return "NOT FOUND"

        from shapely.geometry import Point
        from shapely.geometry import Polygon
        from shapely.geometry import LineString
        from shapely.geometry import LineString, mapping, shape
        from shapely.wkt import loads
        linestrings_list = []
        for (name, linestring_wkt) in linestrings:
                linestring = loads(linestring_wkt)
                #linestring = shape(mapping(linestring))
                linestring = LineString(linestring.coords)
                linestrings_list.append((name, linestring))

        tempdir = mkdtemp()
        basename = os.path.join(tempdir, 'index')

        #try:
        idx = index.Index(basename, ((fid, linestring.bounds, None)
                for (fid, (name, linestring)) in enumerate(linestrings_list)))
        idx.close()
        sc.addFile(basename + '.idx')
        sc.addFile(basename + '.dat')
        #finally:
        #       rmtree(tempdir)

        road_broadcast = sc.broadcast(linestrings_list)
        return nearest_road

roads = sc.textFile("hdfs://datanode1.selise.ch/user/admin/geodjango/roads/part-m-00[0-9]*")
parts_list = roads.map(lambda l: l.split("LINESTRING"))
roads = parts_list.map(lambda parts: (parts[0], "LINESTRING" + parts[1]))
roads = roads.toLocalIterator()

sqlContext = SQLContext(sc)

zurich_waypoints = sc.textFile("hdfs://datanode1.selise.ch/user/admin/waypoints_in_zurich_2w/part-m-00[0-9]*")
zurich_waypoints = zurich_waypoints.map(lambda l: l.split(","))
zurich_waypoints = zurich_waypoints.map(lambda parts: Row(longitude=parts[3], latitude=parts[4], tripid=parts[0], CaptureDateHour=parts[1]))
zurich_waypoints = sqlContext.createDataFrame(zurich_waypoints)
zurich_waypoints.registerTempTable("zurich_waypoints")

sqlContext.registerFunction('getNearestRoad', build_nearest_road(roads))

df = sqlContext.sql("select getNearestRoad(zw.latitude, zw.longitude), zw.* from zurich_waypoints zw")
df.write.format('com.databricks.spark.csv').mode('overwrite').save('/home/admin/pyspark/zurich-road-waypoints-yarn-cluster.csv')

Now lets discuss the coding. Here you can see that I have used a python nested function for spark sql context. I have tried to build my road network in rtree as a spark udf function which will run as a distributed geo rtree over 3 hadoop node. That means you can query any road info into the rtree using spark sql context and the search functionality will work over multiple pcs. I have used a threshold value in rtree for over lapping or collusion detection for my road network. I have found that when a rtree form using road networks its not always the nearest road for a specific point. Here is the picture you might consider for your understanding the thoughts.

You could see that red is the point when rtree form using road networks it will return the R2 road but originally the nearest neighbor is R1. So I have considered a threshold value to over come the problems.

Here is a small sample data for testing the script

Near about 10Million data for testing

All data nodes are running.

Cpu usages 100%

You can see that datanode1.selise.ch, datanode2.selise.ch, namenode.selise.ch are running

All nodes are running

The article is not completed. I will explain

Why Hive on top of Hadoop spatial framework is not good for this kind of analysis

Why postgis is not suitable for this kind of analysis

Continued …

1.7 Billion weather data analysis which is around 28 years data for two different places

Hello Guys, Today I am going to give some idea how I handled 1.7 billion weather data which is around 28 years of data. Our main target was to generate some analytics from this weather data. We have handled only temperature and humidity data. Which is around 22GB.

Now one the main challenge is how we will download the data from the client server. Client specific given url is quite good enough to download the data. I had to download using some basic shell command. At the very first time I was concern to download the data using xargs but I changed my mind to not to use it. Because I had no hurry to download the data and my local pc bandwidth was not that much high to use xargs.

You could use xargs to download the data from a specific location

downloader.sh

d=1990-01-01
while [ "$d" != 2017-04-05 ]; do 
  echo  https://xxxxxxxx.com/"$d"T00:00:00ZXXXXXX/xxxxxxxxxx/xxxxxx.xxxxxxxxx/csv
  d=$(date -I -d "$d + 1 day")
done
[root@namenode.selise.ch ~]# chmod +x downloader.sh
[root@namenode.selise.ch ~]# ./downloader.sh > urls.txt
[root@namenode.selise.ch ~]# cat urls.txt | xargs -n 1 -P 6 wget -P data/

but what I have used for the download purposes which is given below

downloader.sh

d=1990-01-01
while [ "$d" != 2017-04-05 ]; do 
  echo  wget https://xxxxxxxx.com/"$d"T00:00:00ZXXXXXX/xxxxxxxxxx/xxxxxx.xxxxxxxxx/csv
  d=$(date -I -d "$d + 1 day")
done

chmod +x downloader.sh
./downloader.sh > urls.txt
chmod +x urls.txt
./urls.txt

It takes almost 48 hours using my low bandwidth.

Now I am ready to store the data in hdfs for analysis

hadoop fs -put *.csv /user/root/weather

Its also a time consuming to store.
Lets see how many records are there


hive> select count(*) from WeatherTemperatureHumidity_orc_update;
OK
1719639510
Time taken: 1.792 seconds, Fetched: 1 row(s)

now lets create the external hive table to use the hdfs data

CREATE EXTERNAL TABLE IF NOT EXISTS WeatherTemperatureHumidity(
        validdate string,
        temperature float,
        humidity float
    )
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ';'
    STORED AS TEXTFILE
    location '/user/root/weather';
CREATE TABLE IF NOT EXISTS WeatherTemperatureHumidity_orc(
        validdate string,
        temperature double,
        humidity double,
        suburb string)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ';'
    STORED AS ORC;
    
INSERT OVERWRITE TABLE WeatherTemperatureHumidity_orc SELECT w.*, "xxxx" FROM WeatherTemperatureHumidity w;
CREATE TABLE IF NOT EXISTS WeatherTemperatureHumidity_orc_update (
        `year` string,
        `month` string,
        `date` string,
        validdate string,
        temperature double,
        humidity double,
        suburb string)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ';'
    STORED AS ORC;
    
drop table WeatherTemperatureHumidity_orc_update;

INSERT OVERWRITE TABLE WeatherTemperatureHumidity_orc_update select year(to_date(w.validdate)), month(to_date(w.validdate)), to_date(w.validdate), w.validdate, w.temperature, w.humidity, w.suburb from WeatherTemperatureHumidity_orc w where w.validdate is not NULL

Now you are ready to start your analytics from the hive table. Here I have tried to find out the the min and max temperature and humidity from the weather hive table.

hive> select `year`, min(temperature), max(temperature) from WeatherTemperatureHumidity_orc_update where suburb='XXXX' group by `year` order by `year` desc;
Query ID = root_20170406182432_92f15cf5-5a23-4250-8739-288676e0589d
Total jobs = 1
Launching Job 1 out of 1


Status: Running (Executing on YARN cluster with App id application_1491480905379_0001)

--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED     38         38        0        0       0       0
Reducer 2 ......   SUCCEEDED    253        253        0        0       0       0
Reducer 3 ......   SUCCEEDED      1          1        0        0       0       0
--------------------------------------------------------------------------------
VERTICES: 03/03  [==========================>>] 100%  ELAPSED TIME: 48.97 s
--------------------------------------------------------------------------------
OK
2017    -12.399999618530273     20.100000381469727
2016    -6.900000095367432      31.200000762939453
2015    -9.699999809265137      33.099998474121094
2014    -15.800000190734863     30.700000762939453
2013    -13.199999809265137     30.5
2012    -19.299999237060547     30.700000762939453
2011    -12.199999809265137     30.0
2010    -15.199999809265137     29.399999618530273
2009    -16.5   28.5
2008    -12.800000190734863     27.700000762939453
2007    -14.199999809265137     29.600000381469727
2006    -11.0   29.200000762939453
2005    -15.600000381469727     28.5
2004    -10.699999809265137     26.5
2003    -15.0   31.700000762939453
2002    -13.300000190734863     29.799999237060547
2001    -16.899999618530273     30.100000381469727
2000    -16.899999618530273     28.700000762939453
1999    -15.0   28.0
1998    -15.0   29.600000381469727
1997    -11.399999618530273     26.799999237060547
1996    -16.799999237060547     27.100000381469727
1995    -19.399999618530273     28.799999237060547
1994    -13.699999809265137     29.700000762939453
1993    -18.600000381469727     28.700000762939453
1992    -18.100000381469727     29.799999237060547
1991    -18.799999237060547     30.600000381469727
1990    -11.699999809265137     29.100000381469727
NULL    NULL    NULL
Time taken: 53.314 seconds, Fetched: 29 row(s)

Now lets see for a another suburb

hive> select `year`, min(temperature), max(temperature) from WeatherTemperatureHumidity_orc_update where suburb='XXXXXXX' group by `year` order by `year` desc;
Query ID = root_20170406182754_271dd6cb-b69e-4fd3-b4c0-3e70d284fa1d
Total jobs = 1
Launching Job 1 out of 1


Status: Running (Executing on YARN cluster with App id application_1491480905379_0001)

--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED     38         38        0        0       0       0
Reducer 2 ......   SUCCEEDED    253        253        0        0       0       0
Reducer 3 ......   SUCCEEDED      1          1        0        0       0       0
--------------------------------------------------------------------------------
VERTICES: 03/03  [==========================>>] 100%  ELAPSED TIME: 47.21 s
--------------------------------------------------------------------------------
OK
2017    -12.399999618530273     19.700000762939453
2016    -7.300000190734863      30.600000381469727
2015    -9.100000381469727      32.79999923706055
2014    -15.300000190734863     30.600000381469727
2013    -13.199999809265137     30.299999237060547
2012    -19.5   30.600000381469727
2011    -12.399999618530273     29.899999618530273
2010    -15.399999618530273     29.200000762939453
2009    -16.600000381469727     28.299999237060547
2008    -13.0   27.5
2007    -14.399999618530273     29.399999618530273
2006    -11.199999809265137     29.0
2005    -15.699999809265137     28.399999618530273
2004    -10.800000190734863     26.399999618530273
2003    -15.199999809265137     31.5
2002    -13.399999618530273     29.700000762939453
2001    -16.899999618530273     29.899999618530273
2000    -16.799999237060547     28.600000381469727
1999    -15.199999809265137     27.899999618530273
1998    -15.100000381469727     29.399999618530273
1997    -11.300000190734863     26.600000381469727
1996    -17.0   27.0
1995    -19.5   28.700000762939453
1994    -13.899999618530273     29.600000381469727
1993    -18.799999237060547     28.5
1992    -18.200000762939453     29.600000381469727
1991    -18.899999618530273     30.5
1990    -11.800000190734863     29.0
NULL    NULL    NULL
Time taken: 50.031 seconds, Fetched: 29 row(s)

It takes around 61 seconds to complete from 1.7 billion data using 3 node hadoop cluster. Where my all nodes are using intel i5 with 16GB RAM and my name node is using 20GB.

Parking spot analysis and migration techniques

My hive cluster is running on 3 worker pcs. My name node configuration is core i5 and 20GB RAM and 2 other data nodes are 16GB RAM and core i5 processor. All of my pc HDD are sata and I’m using centos 7. My hadoop cluster running using ambari and hortonworks 2.4. If you don’t know about hortonworks then please try to download the sandbox from hortonworks website to start your journery using hive. https://hortonworks.com/products/sandbox/.

Today I am not going to give you the instruction regarding hortonworks installation. I will give you a another post where you will find the the installation procedure of hortonworks in multiple node using ambari.


hive:zurich> select count(*) from parking_events_text_partition;
   _col0
-----------
 229966246

Here you can see that almost 230 million record in my parking table.

select count(distinct(parking_status_event_id)) as total from zurich.parking_events_text_orc where parking_type = "Spot" and parking_city = "XXXXX" and parking_neighborhood = "XXXXXX" and parking_area = "XXXXXXXX" and parking_free =  0 and parking_event_time between from_utc_timestamp('2014-01-01 00:00:00.0','CET') and from_utc_timestamp('2015-08-01 00:00:00.0','CET')

Now I am trying to generate the number of records from 2014-01-01 to 2015-08-01 in my parking_events_text_orc where parking spots were free for a specific parking city, parking neighborhood and parking area.

Lets start the hive shell to run the query where hive table is ORC format

[root@namenode pyspark]# hive
WARNING: Use "yarn jar" to launch YARN applications.

Logging initialized using configuration in file:/etc/hive/2.4.2.0-258/0/hive-log4j.properties
hive> use zurich;
OK
Time taken: 2.04 seconds
hive> select count(distinct(parking_status_event_id)) as total from zurich.parking_events_text_orc where parking_type = "Spot" and parking_city = "XXXXXX" and parking_neighborhood = "XXXXXX" and parking_area = "XXXXXXX" and parking_free =  0 and parking_event_time between from_utc_timestamp('2014-01-01 00:00:00.0','CET') and from_utc_timestamp('2015-08-01 00:00:00.0','CET');

Here is your query which will run over hive table orc format

hive> select count(distinct(parking_status_event_id)) as total from zurich.parking_events_text_orc where parking_type = "Spot" and parking_city = "XXXXX" and parking_neighborhood = "XXXXXX" and parking_area = "XXXXXXX" and parking_free =  0 and parking_event_time between from_utc_timestamp('2014-01-01 00:00:00.0','CET') and from_utc_timestamp('2015-08-01 00:00:00.0','CET');
Query ID = root_20170329193604_0956af6f-3720-4ef8-804a-18cf2ef36c10
Total jobs = 1
Launching Job 1 out of 1


Status: Running (Executing on YARN cluster with App id application_1490785713790_0001)

--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ......         RUNNING     35         24       11        0       3       0
Reducer 2             INITED    118          0        0      118       0       0
Reducer 3             INITED      1          0        0        1       0       0
--------------------------------------------------------------------------------

VERTICES: 00/03 [====>>----------------------] 15% ELAPSED TIME: 25.89 s

--------------------------------------------------------------------------------

Query is running right at this moment.

hive> select count(distinct(parking_status_event_id)) as total from zurich.parking_events_text_orc where parking_type = "Spot" and parking_city = "XXXXX" and parking_neighborhood = "XXXXXXX" and parking_area = "XXXXXXX" and parking_free =  0 and parking_event_time between from_utc_timestamp('2014-01-01 00:00:00.0','CET') and from_utc_timestamp('2015-08-01 00:00:00.0','CET');
Query ID = root_20170329193604_0956af6f-3720-4ef8-804a-18cf2ef36c10
Total jobs = 1
Launching Job 1 out of 1


Status: Running (Executing on YARN cluster with App id application_1490785713790_0001)

--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED     35         35        0        0       3       0
Reducer 2 ......   SUCCEEDED     15         15        0        0       0       0
Reducer 3 ......   SUCCEEDED      1          1        0        0       0       0
--------------------------------------------------------------------------------

VERTICES: 03/03 [==========================>>] 100% ELAPSED TIME: 48.14 s

-------------------------------------------------------------------------------- OK 29943 Time taken: 53.44 seconds, Fetched: 1 row(s)

And it takes almost 53 second to generate the output

Now lets see the time duration for the parking_events_text_parquet table where hive table is parquet format.

hive> select count(distinct(parking_status_event_id)) as total from zurich.parking_events_text_parquet where parking_type = "Spot" and parking_city = "XXXXXX" and parking_neighborhood = "XXXXXXX" and parking_area = "XXXXXXX" and parking_free =  0 and parking_event_time between from_utc_timestamp('2014-01-01 00:00:00.0','CET') and from_utc_timestamp('2015-08-01 00:00:00.0','CET');
Query ID = root_20170329193806_c755690f-8839-4d12-aa4c-cc6122187cd6
Total jobs = 1
Launching Job 1 out of 1


Status: Running (Executing on YARN cluster with App id application_1490785713790_0001)

--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED     50         50        0        0       5       0
Reducer 2 ......   SUCCEEDED      1          1        0        0       0       0
Reducer 3 ......   SUCCEEDED      1          1        0        0       0       0
--------------------------------------------------------------------------------

VERTICES: 03/03 [==========================>>] 100% ELAPSED TIME: 68.80 s

-------------------------------------------------------------------------------- OK 29943 Time taken: 69.697 seconds, Fetched: 1 row(s)

And it tooks almost 69 second to complete

Now we could analysis the hive table as a text format.

hive> select count(distinct(parking_status_event_id)) as total from zurich.parking_events_text_partition where parking_type = "Spot" and parking_city = "XXXXXX" and parking_neighborhood = "XXXXXX" and parking_area = "XXXXXX" and parking_free =  0 and parking_event_time between from_utc_timestamp('2014-01-01 00:00:00.0','CET') and from_utc_timestamp('2015-08-01 00:00:00.0','CET');
Query ID = root_20170329193950_b3f851d2-f49f-4977-931b-6815f953d31f
Total jobs = 1
Launching Job 1 out of 1


Status: Running (Executing on YARN cluster with App id application_1490785713790_0001)

--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1                RUNNING     36          0       20       16       0       0
Reducer 2             INITED     18          0        0       18       0       0
Reducer 3             INITED      1          0        0        1       0       0
--------------------------------------------------------------------------------

VERTICES: 00/03 [>>--------------------------] 0% ELAPSED TIME: 69.74 s

Now you can see that already 69 second elapsed to generate 0% data to process when your hive table is running as a normal text format.

Hive data migration

Now its time to migrate your hive server data from one server to another. The data is not that much big which was < 35GB and 229 Million data, I had to migrate the data from one server to another, So what I actually did at that time is given below.

I have run a command to generate the result as a csv format

Now you could see that there are almost 37GB data which was generated after completed my shell command

Then I have created the tar file to compressed the data which is around 3.9GB

Now we are almost done to download the data from one server to another. Now there is a another challenge has come to your end, you need to import that data to your another server so you need to create a table to import which was partitioned before in your hive table. The schema was look like this

create table parking_events_text_partition
(
	parking_id string,
	parking_type string,
	parking_size int,
	parking_free int,
	parking_longitude double,
        parking_latitude double,
	parking_permit_types string,
	parking_country string,
	parking_city string,
	parking_neighborhood string,
	parking_area string,
	parking_address string,
	parking_event_type string,
	parking_event_time timestamp,
	parking_status_event_id string
)
partitioned by (`date` string)  
ROW FORMAT DELIMITED  
FIELDS TERMINATED BY ',';

Parquet

create table parking_events_text_parquet
(
	parking_id string,
	parking_type string,
	parking_size int,
	parking_free int,
	parking_longitude double,
        parking_latitude double,
	parking_permit_types string,
	parking_country string,
	parking_city string,
	parking_neighborhood string,
	parking_area string,
	parking_address string,
	parking_event_type string,
	parking_event_time timestamp,
	parking_status_event_id string,
    `date` string
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS parquet;
insert into parking_events_text_parquet select * from parking_events_text_partition;

ORC

create table parking_events_text_orc
(
	parking_id string,
	parking_type string,
	parking_size int,
	parking_free int,
	parking_longitude double,
        parking_latitude double,
	parking_permit_types string,
	parking_country string,
	parking_city string,
	parking_neighborhood string,
	parking_area string,
	parking_address string,
	parking_event_type string,
	parking_event_time timestamp,
	parking_status_event_id string,
        `date` string
) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' STORED AS orc;
insert into parking_events_text_orc select * from parking_events_text_partition;

Now you are need to restore your data in your hive table.

How to setup standalone spark cluster ?

First of all you need to download the spark-hadoop binary from spark site. I have already downloaded the binary. Then extracted using tar command

tar -xvf spark-1.6.1-bin-hadoop2.6.tgz
[root@namenode ~]# ls -la spark-*
-rw-r--r--  1 root root 289405702 Feb 16 18:53 spark-1.6.1-bin-hadoop2.6.tgz
-rw-r--r--  1 root root 195636829 Dec 29 06:49 spark-2.1.0-bin-hadoop2.7.tgz

spark-1.6.1-bin-hadoop2.6:
total 1408
drwxr-xr-x  14  500  500    4096 Feb 16 18:56 .
dr-xr-x---. 38 root root    4096 Mar 28 13:25 ..
drwxr-xr-x   2  500  500    4096 Feb 27  2016 bin
-rw-r--r--   1  500  500 1343562 Feb 27  2016 CHANGES.txt
drwxr-xr-x   2  500  500    4096 Feb 27  2016 conf
drwxr-xr-x   3  500  500      18 Feb 27  2016 data
drwxr-xr-x   3  500  500      75 Feb 27  2016 ec2
drwxr-xr-x   3  500  500      16 Feb 27  2016 examples
drwxr-xr-x   2  500  500    4096 Feb 27  2016 lib
-rw-r--r--   1  500  500   17352 Feb 27  2016 LICENSE
drwxr-xr-x   2  500  500    4096 Feb 27  2016 licenses
drwxr-xr-x   2 root root    4096 Mar 12 17:17 logs
-rw-r--r--   1  500  500   23529 Feb 27  2016 NOTICE
drwxr-xr-x   6  500  500     112 Feb 27  2016 python
drwxr-xr-x   3  500  500      16 Feb 27  2016 R
-rw-r--r--   1  500  500    3359 Feb 27  2016 README.md
-rw-r--r--   1  500  500     120 Feb 27  2016 RELEASE
drwxr-xr-x   2  500  500    4096 Feb 27  2016 sbin
drwxr-xr-x  22 root root    4096 Mar 12 17:18 work

spark-2.1.0-bin-hadoop2.7:
total 100
drwxr-xr-x  12  500  500  4096 Dec 16 08:18 .
dr-xr-x---. 38 root root  4096 Mar 28 13:25 ..
drwxr-xr-x   2  500  500  4096 Dec 16 08:18 bin
drwxr-xr-x   2  500  500  4096 Dec 16 08:18 conf
drwxr-xr-x   5  500  500    47 Dec 16 08:18 data
drwxr-xr-x   4  500  500    27 Dec 16 08:18 examples
drwxr-xr-x   2  500  500  8192 Dec 16 08:18 jars
-rw-r--r--   1  500  500 17811 Dec 16 08:18 LICENSE
drwxr-xr-x   2  500  500  4096 Dec 16 08:18 licenses
-rw-r--r--   1  500  500 24645 Dec 16 08:18 NOTICE
drwxr-xr-x   9  500  500  4096 Dec 16 08:18 python
drwxr-xr-x   3  500  500    16 Dec 16 08:18 R
-rw-r--r--   1  500  500  3818 Dec 16 08:18 README.md
-rw-r--r--   1  500  500   128 Dec 16 08:18 RELEASE
drwxr-xr-x   2  500  500  4096 Dec 16 08:18 sbin
drwxr-xr-x   2  500  500    41 Dec 16 08:18 yarn

I will use the version 1.6.1 for my spark cluster. What you really need to do just set your SPARK_HOME path to your ~/.bashrc file. My ~/.bashrc file is look like this

[root@namenode ~]# cat ~/.bashrc | grep SPARK
#export SPARK_HOME="/usr/hdp/current/spark-client"
#export PYSPARK_SUBMIT_ARGS="--master local[2]"
export SPARK_HOME=/root/spark-1.6.1-bin-hadoop2.6
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_PYTHON=/root/anaconda2/bin/python
export SPARK_YARN_USER_ENV=PYTHONHASHSEED=0

Here is my overall ~/.bashrc file signature, which is not related to this post. I have used Hortonworks distribution for my Hadoop.

[root@namenode ~]# cat ~/.bashrc
# .bashrc

# User specific aliases and functions

alias rm='rm -i'
alias cp='cp -i'
alias mv='mv -i'

# Source global definitions
if [ -f /etc/bashrc ]; then
        . /etc/bashrc
fi

#export SPARK_HOME="/usr/hdp/current/spark-client"
#export PYSPARK_SUBMIT_ARGS="--master local[2]"

export SPARK_HOME=/root/spark-1.6.1-bin-hadoop2.6
export PATH=$SPARK_HOME/bin:$PATH

export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH

export JAVA_HOME=/usr/jdk64/jdk1.8.0_60/
export PATH=$JAVA_HOME/bin:$PATH

export GEOS_DIR=/usr/local/geos

# added by Anaconda2 4.2.0 installer
export PATH="/root/anaconda2/bin:$PATH"

export PATH=$PATH:/opt/activemq/bin
export HADOOP_CONF_DIR=/etc/hadoop/conf

export PHOENIX_HOME=/home/admin/apache-phoenix-4.9.0-HBase-1.2-bin
export PATH=PHOENIX_HOME/bin:$PATH

export HBASE_HOME=/usr/hdp/current/hbase-client
export PATH=HBASE_HOME/bin:$PATH

export ZOOKEEPER_HOME=/usr/hdp/current/zookeeper-client
export PATH=ZOOKEEPER_HOME/bin:$PATH

export PYSPARK_PYTHON=/root/anaconda2/bin/python
export PYTHON_HASHSEED=0
export SPARK_YARN_USER_ENV=PYTHONHASHSEED=0

export CQLSH_NO_BUNDLED=true

Now you are almost done. Just try to activate your shell once you set the environment variable.


[root@namenode ~]# source ~/.bashrc
[root@namenode ~]# echo $SPARK_HOME
/root/spark-1.6.1-bin-hadoop2.6

Now you are ready to start your spark single node cluster

[root@namenode ~]# pyspark
Python 2.7.12 |Anaconda custom (64-bit)| (default, Jul  2 2016, 17:42:40)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Anaconda is brought to you by Continuum Analytics.
Please check out: http://continuum.io/thanks and https://anaconda.org
17/03/30 11:51:43 INFO spark.SparkContext: Running Spark version 1.6.1
17/03/30 11:51:45 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/03/30 11:51:46 INFO spark.SecurityManager: Changing view acls to: root
17/03/30 11:51:46 INFO spark.SecurityManager: Changing modify acls to: root
17/03/30 11:51:46 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
17/03/30 11:51:50 INFO util.Utils: Successfully started service 'sparkDriver' on port 33652.
17/03/30 11:51:58 INFO storage.BlockManagerMaster: Registered BlockManager
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/

Using Python version 2.7.12 (default, Jul  2 2016 17:42:40)
SparkContext available as sc, HiveContext available as sqlContext.
>>>

Now try to play with your spark single node cluster with shell mode. Here is an example

>>> lines = sc.textFile("file:///home/admin/biketrips.csv")
17/03/30 11:56:16 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 147.2 KB, free 147.2 KB)
17/03/30 11:56:17 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 16.4 KB, free 163.6 KB)
17/03/30 11:56:18 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:43822 (size: 16.4 KB, free: 511.1 MB)
17/03/30 11:56:18 INFO spark.SparkContext: Created broadcast 0 from textFile at NativeMethodAccessorImpl.java:-2

>>> lines.count()
.
.
.
17/03/30 12:07:47 INFO scheduler.TaskSetManager: Finished task 46.0 in stage 0.0 (TID 46) in 44661 ms on localhost (48/48)
17/03/30 12:07:47 INFO scheduler.DAGScheduler: ResultStage 0 (count at :1) finished in 592.768 s
17/03/30 12:07:47 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
17/03/30 12:07:47 INFO scheduler.DAGScheduler: Job 0 finished: count at :1, took 593.280470 s
15243514

top - 12:05:27 up 16:03,  2 users,  load average: 51.00, 37.08, 17.84
Tasks: 542 total,  21 running, 521 sleeping,   0 stopped,   0 zombie
%Cpu(s): 47.1 us, 47.1 sy,  0.0 ni,  0.0 id,  0.0 wa,  0.0 hi,  5.9 si,  0.0 st
KiB Mem : 21528888 total, 13107932 free,  4306892 used,  4114064 buff/cache
KiB Swap: 12518396 total, 12518396 free,        0 used. 16856396 avail Mem

   PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND
   266 root      rt   0       0      0      0 S  62.5  0.0   1:38.34 watchdog/0
   272 root      rt   0       0      0      0 S  62.1  0.0   2:12.52 watchdog/2
   267 root      rt   0       0      0      0 S  61.9  0.0   2:23.35 watchdog/1
   277 root      rt   0       0      0      0 S  45.8  0.0   1:45.95 watchdog/3
101271 root      20   0 4754184 596064  25836 S  34.8  2.8   6:28.83 java
 98143 hdfs      20   0 2942032 381484  25220 S  15.4  1.8   2:27.38 java
  2735 mongod    20   0  374404  68264   5684 R   7.3  0.3   8:48.23 mongod
 99025 yarn      20   0 2968840 565144  26924 S   7.1  2.6   3:31.46 java

Multi node standalone spark cluster

Now you need to copy spark-hadoop tar file to your worker node where you want to run your spark as a worker. I have used datanode1, datanode2 and datanode3 for my spark cluster worker.

[root@namenode ~]# scp -r /root/spark-1.6.1-bin-hadoop-2.6/* datanode1:/root/
[root@namenode admin]# ssh datanode1
Last login: Wed Mar 29 15:31:54 2017 from namenode.selise.ch
[root@datanode1 ~]# cd /root/spark-1.6.1-bin-hadoop2.6/
[root@datanode1 spark-1.6.1-bin-hadoop2.6]# ls -la
total 1424
drwxr-xr-x. 14  500  500    4096 Feb 16 18:34 .
dr-xr-x---. 20 root root    4096 Mar 27 13:33 ..
drwxr-xr-x.  2  500  500    4096 Feb 27  2016 bin
-rw-r--r--.  1  500  500 1343562 Feb 27  2016 CHANGES.txt
drwxr-xr-x.  2  500  500    4096 Feb 27  2016 conf
drwxr-xr-x.  3  500  500    4096 Feb 27  2016 data
drwxr-xr-x.  3  500  500    4096 Feb 27  2016 ec2
drwxr-xr-x.  3  500  500    4096 Feb 27  2016 examples
drwxr-xr-x.  2  500  500    4096 Feb 27  2016 lib
-rw-r--r--.  1  500  500   17352 Feb 27  2016 LICENSE
drwxr-xr-x.  2  500  500    4096 Feb 27  2016 licenses
drwxr-xr-x.  2 root root    4096 Mar 27 13:41 logs
-rw-r--r--.  1  500  500   23529 Feb 27  2016 NOTICE
drwxr-xr-x.  6  500  500    4096 Feb 27  2016 python
drwxr-xr-x.  3  500  500    4096 Feb 27  2016 R
-rw-r--r--.  1  500  500    3359 Feb 27  2016 README.md
-rw-r--r--.  1  500  500     120 Feb 27  2016 RELEASE
drwxr-xr-x.  2  500  500    4096 Feb 27  2016 sbin
drwxr-xr-x. 85 root root    4096 Mar 22 12:43 work

Now do the same things for datanode2 and datanode3 as well. I did the same thing for datanode2 as well now you are ready to start your spark master node, so you have to go to your spark master node. In my local environment my local spark master pc name is namenode. Another thing is that I have also run a worker in my master pc as well.

[root@namenode spark-1.6.1-bin-hadoop2.6]# ./sbin/start-master.sh
starting org.apache.spark.deploy.master.Master, logging to /root/spark-1.6.1-bin-hadoop2.6/logs/spark-root-org.apache.spark.deploy.master.Master-1-namenode.selise.ch.out

spark worker in master pc
[root@namenode spark-1.6.1-bin-hadoop2.6]# ./sbin/start-slave.sh spark://namdnode.selise.ch:7077
starting org.apache.spark.deploy.worker.Worker, logging to /root/spark-1.6.1-bin-hadoop2.6/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-namenode.selise.ch.out

Now you are ready to start your spark worker in datanode1

[root@datanode1 spark-1.6.1-bin-hadoop2.6]# ./sbin/start-slave.sh spark://namenode.selise.ch:7077
starting org.apache.spark.deploy.worker.Worker, logging to /root/spark-1.6.1-bin-hadoop2.6/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-datanode1.selise.ch.out

Here is the datanode2

[root@namenode admin]# ssh datanode2
Last login: Wed Mar 29 15:46:09 2017 from namenode.selise.ch
[root@datanode2 ~]# cd spark-1.6.1-bin-hadoop2.6/
[root@datanode2 spark-1.6.1-bin-hadoop2.6]# ./sbin/start-slave.sh spark://namenode.selise.ch:7077
starting org.apache.spark.deploy.worker.Worker, logging to /root/spark-1.6.1-bin-hadoop2.6/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-datanode2.selise.ch.out
[root@datanode2 spark-1.6.1-bin-hadoop2.6]#

Do the same things for datanode3 as well

now you are almost done your spark standalone cluster.
Here is a picture for 4 node spark standalone cluster

Here is picture for 3 node spark standalone cluster

You could also use your spark cluster when you are using spark shell, Here is the way you can do it

[root@namenode spark-1.6.1-bin-hadoop2.6]# pyspark --master spark://namenode.selise.ch:7077
Python 2.7.12 |Anaconda custom (64-bit)| (default, Jul  2 2016, 17:42:40)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Anaconda is brought to you by Continuum Analytics.
Please check out: http://continuum.io/thanks and https://anaconda.org
17/03/30 12:30:44 INFO spark.SparkContext: Running Spark version 1.6.1
17/03/30 12:30:44 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/03/30 12:30:44 INFO spark.SecurityManager: Changing view acls to: root
17/03/30 12:30:44 INFO spark.SecurityManager: Changing modify acls to: root
17/03/30 12:30:44 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
17/03/30 12:30:45 INFO util.Utils: Successfully started service 'sparkDriver' on port 34680.
17/03/30 12:30:45 INFO slf4j.Slf4jLogger: Slf4jLogger started
.
.
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/

Using Python version 2.7.12 (default, Jul  2 2016 17:42:40)
SparkContext available as sc, HiveContext available as sqlContext.
>>> 17/03/30 12:31:03 INFO cluster.SparkDeploySchedulerBackend: Registered executor NettyRpcEndpointRef(null) (datanode1.selise.ch:59560) with ID 0
17/03/30 12:31:03 INFO cluster.SparkDeploySchedulerBackend: Registered executor NettyRpcEndpointRef(null) (datanode2.selise.ch:41570) with ID 1
17/03/30 12:31:03 INFO storage.BlockManagerMasterEndpoint: Registering block manager datanode1.selise.ch:41001 with 511.1 MB RAM, BlockManagerId(0, datanode1.selise.ch, 41001)
17/03/30 12:31:03 INFO storage.BlockManagerMasterEndpoint: Registering block manager datanode2.selise.ch:37377 with 511.1 MB RAM, BlockManagerId(1, datanode2.selise.ch, 37377)
>>>

will be continued …

How to setup multi node presto cluster ?

Fist of all you need to download the presto-server binary from its website.

[root@namenode admin]# wget https://repo1.maven.org/maven2/com/facebook/presto/presto-server/0.164/presto-server-0.164.tar.gz
--2017-03-29 12:22:26--  https://repo1.maven.org/maven2/com/facebook/presto/presto-server/0.164/presto-server-0.164.tar.gz
Resolving repo1.maven.org (repo1.maven.org)... 151.101.24.209
Connecting to repo1.maven.org (repo1.maven.org)|151.101.24.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 406230611 (387M) [application/x-gzip]
Saving to: ‘presto-server-0.164.tar.gz’

 0% [                                                                                                                                                     ] 981,144     68.6KB/s  eta 69m 7s
[root@namenode admin]#tar -xvf presto-server-0.164.tar.gz 
[root@namenode admin]# cd presto-server-0.164/
[root@namenode presto-server-0.164]# ls
bin  etc  lib  NOTICE  plugin  README.txt
[root@namenode presto-server-0.164]# wget https://repo1.maven.org/maven2/com/facebook/presto/presto-cli/0.164/presto-cli-0.164-executable.jar
--2017-03-29 12:28:41--  https://repo1.maven.org/maven2/com/facebook/presto/presto-cli/0.164/presto-cli-0.164-executable.jar
Resolving repo1.maven.org (repo1.maven.org)... 151.101.24.209
Connecting to repo1.maven.org (repo1.maven.org)|151.101.24.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 15093095 (14M) [application/java-archive]
Saving to: ‘presto-cli-0.164-executable.jar’

 5% [======>                                                                                                                                              ] 794,624      202KB/s  eta 72s    

[root@namenode presto-server-0.164]# mv presto-cli-0.164-executable.jar presto
[root@namenode presto-server-0.164]# sudo chmod +x presto

Now you need to create a folder name “etc” in your presto binary directory and create another folder name catelog in side your etc folder. Then in your catalog folder you have to create two properties files which is hive.properties and another one is jmx.properties. Once you are done you need to create another 4 config and properties file inside your etc folder and your folder structure will be given bellow.

[root@namenode presto-server-0.164]# tree etc/
etc/
├── catalog
│   ├── hive.properties
│   └── jmx.properties
├── config.properties
├── jvm.config
├── log.properties
└── node.properties

1 directory, 6 files

Now you are ready to configure your presto etc folder. Here are the configuration files signature what you need to simply paste inside a specific file.
hive.properties

[root@namenode presto-server-0.164]# cat etc/catalog/hive.properties
hive.metastore.uri=thrift://datanode2.selise.ch:9083
connector.name=hive-hadoop2

jmx.properties

[root@namenode presto-server-0.164]# cat etc/catalog/jmx.properties
connector.name=jmx

config.properties

[root@namenode presto-server-0.164]# cat etc/config.properties
coordinator=true
node-scheduler.include-coordinator=true
http-server.http.port=8081
query.max-memory=800MB
query.max-memory-per-node=200MB
discovery-server.enabled=true
discovery.uri=http://namenode.selise.ch:8081

jvm.config

[root@namenode presto-server-0.164]# cat etc/jvm.config
-server
-Xmx800M
-XX:+UseG1GC
-XX:G1HeapRegionSize=32M
-XX:+UseGCOverheadLimit
-XX:+ExplicitGCInvokesConcurrent
-XX:+HeapDumpOnOutOfMemoryError
-XX:OnOutOfMemoryError=kill -9 %p

node.properties

[root@namenode presto-server-0.164]# cat etc/node.properties
node.environment=dev
node.id=presto-node-cordinator
node.data-dir=/root/datap

log.properties

[root@namenode presto-server-0.164]# cat etc/log.properties
com.facebook.presto=INFO

Now you need to create a data location where you need to store the presto data. I have assigned node.data-dir=/root/datap inside node.properties.

Now you are ready to start your presto server:

[root@namenode presto-server-0.164]# ls
bin  etc  lib  NOTICE  plugin  presto  presto-cli-0.170-executable.jar  README.txt

[root@namenode presto-server-0.164]#./bin/launcher run
 separated)
2017-03-29T13:00:43.264+0600    INFO    main    Bootstrap       hive.metastore                                     thrift      thrift
2017-03-29T13:00:43.264+0600    INFO    main    Bootstrap       hive.allow-add-column                              false       false                                Allow Hive connector to add column
2017-03-29T13:00:43.264+0600    INFO    main    Bootstrap       hive.allow-drop-table                              false       false                                Allow Hive connector to drop table
2017-03-29T13:00:43.264+0600    INFO    main    Bootstrap       hive.allow-rename-column                           false       false                                Allow Hive connector to rename column
2017-03-29T13:00:43.264+0600    INFO    main    Bootstrap       hive.allow-rename-table                            false       false                                Allow Hive connector to rename table
2017-03-29T13:00:43.264+0600    INFO    main    Bootstrap       hive.security                                      legacy      legacy
2017-03-29T13:00:43.264+0600    INFO    main    Bootstrap
2017-03-29T13:00:44.572+0600    INFO    main    com.facebook.presto.metadata.StaticCatalogStore -- Added catalog hive using connector hive-hadoop2 --
2017-03-29T13:00:44.574+0600    INFO    main    com.facebook.presto.execution.resourceGroups.InternalResourceGroupManager       -- Loading resource group configuration manager --
2017-03-29T13:00:44.575+0600    INFO    main    com.facebook.presto.execution.resourceGroups.InternalResourceGroupManager       -- Loaded resource group configuration manager legacy --
2017-03-29T13:00:44.575+0600    INFO    main    com.facebook.presto.security.AccessControlManager       -- Loading system access control --
2017-03-29T13:00:44.575+0600    INFO    main    com.facebook.presto.security.AccessControlManager       -- Loaded system access control allow-all --
2017-03-29T13:00:44.926+0600    INFO    main    com.facebook.presto.server.PrestoServer ======== SERVER STARTED ========

Now you are ready for your single node cluster where coordinator and worker is running in a single pc.

Lets start Multi worker presto cluster

I have three worker pc which are

namenode.selise.ch
datanode1.selise.ch
datanode2.selise.ch

Then I have copied presto-server-0.164.tar.gz to my datanode1.selise.ch and datanode2.selise.ch pc and tried to configure as like as my below configuration. Please have a look on my presto worker node configuration.

[root@namenode presto-server-0.164]# scp -r ../presto-server-0.164.tar.gz datanode1.selise.ch:/home/admin

[root@namenode presto-server-0.164]# ssh datanode1.selise.ch
Last login: Mon Mar 27 14:48:46 2017 from namenode.selise.ch

[root@datanode1 ~]# cd /home/admin
[root@datanode1 admin]# tar -xvf presto-server-0.164.tar.gz
[root@datanode1 admin]# cd presto-server-0.164/

[root@datanode1 presto-server-0.164]# tree etc/
etc/
├── catalog
│   ├── hive.properties
│   └── jmx.properties
├── config.properties
├── jvm.config
├── log.properties
└── node.properties

1 directory, 6 files

config.properties

[root@datanode1 presto-server-0.164]# cat etc/config.properties
coordinator=false
http-server.http.port=8081
query.max-memory=800MB
query.max-memory-per-node=200MB
discovery.uri=http://namenode.selise.ch:8081

Here I have disabled the coordinator=false and discovery.uri is my presto coordinator pc which is my namenode.selise.ch pc.

node.properties

[root@datanode1 presto-server-0.164]# cat etc/node.properties
node.environment=dev
node.id=presto-node-1
node.data-dir=/root/datap

Now do the same things as for datanode2.selise.ch and restart the coordinator pc and try to use your shell to check every thing is working or not.

[root@namenode.selise.ch presto-server-0.164]# ./bin/launcher run
2017-03-27T13:55:06.696+0600    INFO    main    Bootstrap       hive.allow-rename-table                            false       false                                Allow Hive connector to rename table
2017-03-27T13:55:06.696+0600    INFO    main    Bootstrap       hive.security                                      legacy      legacy
2017-03-27T13:55:06.696+0600    INFO    main    Bootstrap
2017-03-27T13:55:07.456+0600    INFO    main    com.facebook.presto.metadata.StaticCatalogStore -- Added catalog hive using connector hive-hadoop2 --
2017-03-27T13:55:07.457+0600    INFO    main    com.facebook.presto.execution.resourceGroups.InternalResourceGroupManager       -- Loading resource group configuration manager --
2017-03-27T13:55:07.459+0600    INFO    main    com.facebook.presto.execution.resourceGroups.InternalResourceGroupManager       -- Loaded resource group configuration manager legacy --
2017-03-27T13:55:07.459+0600    INFO    main    com.facebook.presto.security.AccessControlManager       -- Loading system access control --
2017-03-27T13:55:07.459+0600    INFO    main    com.facebook.presto.security.AccessControlManager       -- Loaded system access control allow-all --
2017-03-27T13:55:07.482+0600    INFO    main    com.facebook.presto.server.PrestoServer ======== SERVER STARTED ========
[root@namenode presto-server-0.164]# ./presto --server namenode.selise.ch:8081 --catalog hive
presto> use zurich;
presto:zurich> select count(*) from parking_events_text_partition;
   _col0
-----------
 229966246
(1 row)

Query 20170327_075512_00001_hgdm2, FINISHED, 3 nodes
Splits: 1,187 total, 1,187 done (100.00%)
6:50 [230M rows, 32.3GB] [561K rows/s, 80.7MB/s]

Now you can easily see that 3 node presto cluster is ready for work. Now lets enjoy with your presto custer
will be continued ….

Biketrips and parking data benchmark using hive and presto

Today I will give you some idea regrading hive and presto. Hive is a distributed data warehouse system where you could be able to read, write and manage your data through SQL. And presto is a another SQL Engine for interactive analytic queries against data. Presto can handle terabytes to petabyte range data. Currently presto is managing 300Petabytes of data in Facebook by 1000 engineers. They are running more than 30000 queries over this presto Custer.

Recently I have setup the presto cluster in my local environment. Where I have used 3 worker node and a coordinator and I have used presto over hive. I have experimented the presto Custer over 229 Million data sets. Its basically a parking spot statistical data which was one my small scale bigdata project where I had to handle and migrate the data from one server to another server. But this is not the main issues, actually I had to experiment the server performance once I informed apache presto is a kind of good choice for data interactive analytic queries.

Here are some of hive and presto benchmark what I have found during the migration from one server to another server.


Figure: Here is my parking events text parquet schema which was used during my migration


Figure: Year wise number of parking events when presto over hive is orc format and it takes 2:01 seconds for 1 worker nodes


Figure: Year wise parking events over hive orc format and it takes only 53 seconds which quite huge compare to presto because hive running for 3 node cluster and presto is for single node.


Figure: Total number of entry when presto over hive orc format and it takes 0:03 seconds which is very fast for 229M records


Figure: Total number of entry when hive orc format and it takes around 0.29 secons which is quite larger than presto.

Now its time to have a look on the hive as a parquet format.

Figure: Presto takes almost 6 to 9 second for single node


Figure: Now you can see that hive takes only 0.51 second for 3 worker node.

Biketrips data analysis

Here is a another data set which is biketrips. I have loaded the data for comparison. You could easily download he data from internet.

ORC format for both presto and hive

PARQUET format for both presto and hive

Uber data analysis

select * from trips_orc where trip_id is not NULL limit 10
 
trips_orc.trip_id	trips_orc.vendor_id	trips_orc.pickup_datetime	trips_orc.dropoff_datetime	trips_orc.store_and_fwd_flag	trips_orc.rate_code_id	trips_orc.pickup_longitude	trips_orc.pickup_latitude	trips_orc.dropoff_longitude	trips_orc.dropoff_latitude	trips_orc.passenger_count	trips_orc.trip_distance	trips_orc.fare_amount	trips_orc.extra	trips_orc.mta_tax	trips_orc.tip_amount	trips_orc.tolls_amount	trips_orc.ehail_fee	trips_orc.improvement_surcharge	trips_orc.total_amount	trips_orc.payment_type	trips_orc.trip_type	trips_orc.pickup	trips_orc.dropoff	trips_orc.cab_type	trips_orc.precipitation	trips_orc.snow_depth	trips_orc.snowfall	trips_orc.max_temperature	trips_orc.min_temperature	trips_orc.average_wind_speed	trips_orc.pickup_nyct2010_gid	trips_orc.pickup_ctlabel	trips_orc.pickup_borocode	trips_orc.pickup_boroname	trips_orc.pickup_ct2010	trips_orc.pickup_boroct2010	trips_orc.pickup_cdeligibil	trips_orc.pickup_ntacode	trips_orc.pickup_ntaname	trips_orc.pickup_puma	trips_orc.dropoff_nyct2010_gid	trips_orc.dropoff_ctlabel	trips_orc.dropoff_borocode	trips_orc.dropoff_boroname	trips_orc.dropoff_ct2010	trips_orc.dropoff_boroct2010	trips_orc.dropoff_cdeligibil	trips_orc.dropoff_ntacode	trips_orc.dropoff_ntaname	trips_orc.dropoff_puma
0	2	201	2016-01-01 00:39:36.0	NULL	1	-73	40.68061065673828	-73.92427825927734	40.69804382324219	1	1	8	0.5	0.5	1.86	0	NULL	0.3	11.16	1	1	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL
1	2	201	2016-01-01 00:39:18.0	NULL	1	-73	40.72317504882813	-73.92391967773438	40.76137924194336	1	3	15.5	0.5	0.5	0	0	NULL	0.3	16.8	2	1	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL
2	2	201	2016-01-01 00:39:48.0	NULL	1	-73	40.67610549926758	-74.0131607055664	40.64607238769531	1	3	16.5	0.5	0.5	4.45	0	NULL	0.3	22.25	1	1	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL
3	2	201	2016-01-01 00:38:32.0	NULL	1	-73	40.66957855224609	-74.00064849853516	40.68903350830078	1	3	13.5	0.5	0.5	0	0	NULL	0.3	14.8	2	1	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL
4	2	201	2016-01-01 00:39:22.0	NULL	1	-73	40.68285369873047	-73.94071960449219	40.66301345825195	1	2	12	0.5	0.5	0	0	NULL	0.3	13.3	2	1	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL
5	2	201	2016-01-01 00:39:35.0	NULL	1	-73	40.74645614624023	-73.86774444580078	40.74211120605469	1	1	7	0.5	0.5	0	0	NULL	0.3	8.3	2	1	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL
6	2	201	2016-01-01 00:39:21.0	NULL	1	-73	40.74619674682617	-73.88619232177734	40.74568939208984	1	0	5	0.5	0.5	0	0	NULL	0.3	6.3	2	1	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL
7	2	201	2016-01-01 00:39:36.0	NULL	1	-73	40.80355834960938	-73.94915008544922	40.79412078857422	1	1	7	0.5	0.5	0	0	NULL	0.3	8.3	2	1	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL
8	2	201	2016-01-01 00:39:52.0	NULL	1	-73	40.70281600952148	-73.97157287597656	40.67972564697266	1	2	12	0.5	0.5	2	0	NULL	0.3	15.3	1	1	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL
9	2	201	2016-01-01 00:39:23.0	NULL	1	-73	40.75664138793945	-73.91754913330078	40.73965835571289	1	1	9	0.5	0.5	1.6	0	NULL	0.3	11.9	1	1	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL

Here is a another data set which is 2016 uber trips for New York city. I have used only 3/2 month data for analysis.

Next time I will give you the more details idea over presto and hive. Today I have focused the single node presto cluster but next time I will introduce the multi node presto cluster experiment for your convenience.

How to automate your pyspark word count program using luigi ?

Suppose you have a word count management program which is PySparkWordCount.py and a python file wordcount.py which is given below.

And Here is my clint.cfg file for luigi configuration

[spark]
spark-submit: /root/spark-1.6.1-bin-hadoop2.6/bin/spark-submit
master: spark://namenode.selise.ch:7077

PySparkWordCount.py

import luigi
from luigi.s3 import S3Target
from luigi.contrib.spark import SparkSubmitTask, PySparkTask
import luigi.contrib.hadoop
import luigi.contrib.hdfs

class PySparkWordCount(SparkSubmitTask):
    """
    This task is the same as :py:class:`InlinePySparkWordCount` above but uses
    an external python driver file specified in :py:meth:`app`
    It runs a :py:class:`luigi.contrib.spark.SparkSubmitTask` task
    over the target data in :py:meth:`wordcount.input` (a file in S3) and
    writes the result into its :py:meth:`wordcount.output` target (a file in S3).
    This class uses :py:meth:`luigi.contrib.spark.SparkSubmitTask.run`.
    Example luigi configuration::
        [spark]
        spark-submit: /usr/local/spark/bin/spark-submit
        master: spark://spark.example.org:7077
        deploy-mode: client
    """
    driver_memory = '2g'
    executor_memory = '3g'
    total_executor_cores = luigi.IntParameter(default=100, significant=False)

    name = "PySpark Word Count"
    app = 'wordcount.py'

    def app_options(self):
        # These are passed to the Spark main args in the defined order.
        return [self.input().path, self.output().path]

    def input(self):
        return luigi.contrib.hdfs.HdfsTarget("hdfs://datanode1.selise.ch/home/admin/luigi/wordcount.input")

    def output(self):
        return luigi.contrib.hdfs.HdfsTarget('hdfs://datanode1.selise.ch/home/admin/luigi/wordcount.output')

if __name__ == '__main__':
        luigi.run()

Wordcount.py

import sys
from pyspark import SparkContext

if __name__ == "__main__":
    sc = SparkContext()
    sc.textFile(sys.argv[1]) \
      .flatMap(lambda line: line.split()) \
      .map(lambda word: (word, 1)) \
      .reduceByKey(lambda a, b: a + b) \
      .saveAsTextFile(sys.argv[2])

Please open your terminal and run the command for automation

[root@namenode wordcount]# time python PySparkWordCount.py PySparkWordCount --local-scheduler
No handlers could be found for logger "luigi-interface"
DEBUG: Checking if PySparkWordCount() is complete
/root/anaconda2/lib/python2.7/site-packages/luigi/parameter.py:259: UserWarning: Parameter None is not of type string.
  warnings.warn("Parameter {0} is not of type string.".format(str(x)))
DEBUG: Running file existence check: hadoop fs -stat hdfs://datanode1.selise.ch/home/admin/luigi/wordcount.output
INFO: Informed scheduler that task   PySparkWordCount__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 51836] Worker Worker(salt=818063944, workers=1, host=namenode.selise.ch, username=root, pid=51836) running   PySparkWordCount()
INFO: Running command: /root/spark-1.6.1-bin-hadoop2.6/bin/spark-submit --master spark://namenode.selise.ch:7077 --name PySpark Word Count --driver-memory 2g --executor-memory 3g --total-ex$
cutor-cores 100 wordcount.py hdfs://datanode1.selise.ch/home/admin/luigi/wordcount.input hdfs://datanode1.selise.ch/home/admin/luigi/wordcount.output
INFO: [pid 51836] Worker Worker(salt=818063944, workers=1, host=namenode.selise.ch, username=root, pid=51836) done      PySparkWordCount()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   PySparkWordCount__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=818063944, workers=1, host=namenode.selise.ch, username=root, pid=51836) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 PySparkWordCount(total_executor_cores=100)

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====


real    0m17.074s
user    0m14.077s
sys     0m5.391s