MRJob using Amazon EMR (2 Small slave instances and 1 master node)

Today I will show you how to run a very basic map reduce word count job using AWS EMR. I have used EMR 4.8.2 and hadoop 2.7.3. Most of the time I actually used the hortonworks distribution for my works. But Today I will show you the amazon EMR.

First of all you need to create the security credential from your amazon account. If you don’t have any amazon account then don’t hesitate, you can be able to create a amazon account using a credit card. You don’t need to be pay for the first year (But there are some limitation for usages).

To run the map reduce job in amazon you need to install python pip. And by using pip you have to install awscli and mrjob which is given below.

pip install awscli
pip install mrjob

Now its time to create the amazon security credential from my amazon account. And I have included that amazon security credential to my mrjob.conf file. What you need to do, just open a text editor and paste below this lines

runners:
  emr:
    aws_access_key_id: xxxxxxxxxxxxxxxxxxxx
    aws_secret_access_key: XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
    num_core_instances: 2
    core_instance_type: m1.small

Now your configuration is almost done. Lets start to write down the word count map reduce job using python. My program is given below which has been downloaded from amazon website.


from mrjob.job import MRJob

class MRWordFrequencyCount(MRJob):
    def mapper(self, _, line):
        yield "chars", len(line)
        yield "words", len(line.split())
        yield "lines", 1

    def reducer(self, key, values):
        yield key, sum(values)

if __name__ == '__main__':
    MRWordFrequencyCount.run()

Now lets save the code in file name mr_word_count.py and try to run the program using your terminal

The command what I have followed is given below.

python mr_word_count.py -r emr README.rst --output-dir=s3://my-mr-jobs/wc_out2/ --conf-path=mrjob.conf 

Lets see what is happening in your console. My console is given below


python mr_word_count.py -r emr README.rst --output-dir=s3://my-mr-jobs/wc_out2/ --conf[46/1229$
ob.conf                                                                                                                                                       
Using s3://mrjob-be5ff8dcbdecb973/tmp/ as our temp dir on S3
Creating temp directory /var/folders/mh/qvz8smz53njctvsp0r8_xt040000gn/T/mr_word_count.mohiulalamprince.20170813.193712.456524
Copying local files to s3://mrjob-be5ff8dcbdecb973/tmp/mr_word_count.mohiulalamprince.20170813.193712.456524/files/...
Created new cluster j-1CBMSHE0GMZHN
Waiting for step 1 of 1 (s-2E4DYAQOEV00C) to complete...
  PENDING (cluster is STARTING)
  PENDING (cluster is STARTING)
  PENDING (cluster is STARTING)
  PENDING (cluster is STARTING: Configuring cluster software)
  PENDING (cluster is STARTING: Configuring cluster software)
  PENDING (cluster is BOOTSTRAPPING: Running bootstrap actions)
  PENDING (cluster is BOOTSTRAPPING: Running bootstrap actions)
  PENDING (cluster is BOOTSTRAPPING: Running bootstrap actions)
  PENDING (cluster is BOOTSTRAPPING: Running bootstrap actions)
  PENDING (cluster is BOOTSTRAPPING: Running bootstrap actions)
  PENDING (cluster is BOOTSTRAPPING: Running bootstrap actions)
  PENDING (cluster is BOOTSTRAPPING: Running bootstrap actions)
Terminating cluster: j-1CBMSHE0GMZHN

Now its time to see the amazon console, what is happening over there. Here is my console snap shot which is given below.

EMR Cluster list

EMR Cluster configuration

aws s3 ls s3://my-mr-jobs/wc_out2/
2017-08-14 01:49:20         12 part-00000

S3 output location

EMR Cluster list

aws emr list-clusters 
{
    "Clusters": [
        {
            "Status": {
                "Timeline": {
                    "ReadyDateTime": 1502653570.991, 
                    "CreationDateTime": 1502653079.879, 
                    "EndDateTime": 1502653805.849
                }, 
                "State": "TERMINATED", 
                "StateChangeReason": {
                    "Message": "Terminated by user request", 
                    "Code": "USER_REQUEST"
                }
            }, 
            "NormalizedInstanceHours": 4, 
            "Id": "j-1CBMSHE0GMZHN", 
            "Name": "mr_word_count.mohiulalamprince.20170813.193712.456524"
        }, 
        {
            "Status": {
                "Timeline": {
                    "ReadyDateTime": 1502650935.456, 
                    "CreationDateTime": 1502650470.064, 
                    "EndDateTime": 1502651081.962
                }, 
                "State": "TERMINATED", 
                "StateChangeReason": {
                    "Message": "Terminated by user request", 
                    "Code": "USER_REQUEST"
                }
            }, 
            "NormalizedInstanceHours": 2, 
            "Id": "j-1VLKU9ZZINT6L", 
            "Name": "mr_word_count.mohiulalamprince.20170813.185301.918036"
        }
    ]
}

The job is completed and terminated

Output result

Now lets give a try for this code snippet

from mrjob.job import MRJob
from mrjob.step import MRStep
import re

WORD_RE = re.compile(r"[\w']+")


class MRMostUsedWord(MRJob):

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_words,
                   combiner=self.combiner_count_words,
                   reducer=self.reducer_count_words),
            MRStep(reducer=self.reducer_find_max_word)
        ]

    def mapper_get_words(self, _, line):
        # yield each word in the line
        for word in WORD_RE.findall(line):
            yield (word.lower(), 1)

    def combiner_count_words(self, word, counts):
        # optimization: sum the words we've seen so far
        yield (word, sum(counts))

    def reducer_count_words(self, word, counts):
        # send all (num_occurrences, word) pairs to the same reducer.
        # num_occurrences is so we can easily use Python's max() function.
        yield None, (sum(counts), word)

    # discard the key; it is just None
    def reducer_find_max_word(self, _, word_count_pairs):
        # each item of word_count_pairs is (count, word),
        # so yielding one results in key=counts, value=word
        yield max(word_count_pairs)


if __name__ == '__main__':
    MRMostUsedWord.run()

Now its time to run a map reduce program on Hortonworks cluster. Its a 3 node cluster. My master node is consist of 20GB memory and 2 other slave nodes are 16 GB memory each. Lets see how to process the movielen data set using map reduce program. I have downloaded the movie lens dataset from grouplens.org. I have downloaded the largest data set which is around 26 Million ratings.Lets see how we can find the average rating per movies.

Grouplens 26 Million dataset analysis using map reduce

Download the data from grouplens.org

wget http://files.grouplens.org/datasets/movielens/ml-latest.zip

Unzip the file

unzip ml-latest.zip && cd ml-latest

Now its time to put your ratings.csv & movies.csv file into your hdfs


hadoop dfs -mkdir /user/root/movielen
hadoop dfs -put ratings.csv /user/root/movielen/
hadoop dfs -put movies.csv /user/root/movielen/

Now lets see the average rating code.

from mrjob.job import MRJob
import re

WORD_RE = re.compile(r"[\w']+")

class MRAverageRatingPerMovie(MRJob):

    def mapper(self, _, line):
        userid = movieid = rating = timestamp = 0
        try:
                data = line.split(",")
                userid = int(data[0])
                movieid = int(data[1])
                rating = float(data[2])
                timestamp = int(data[3])
        except:
                yield 0, 0
        yield movieid, rating

    def reducer(self, movieid, ratings):
        counter = 0
        total_ratings = 0.0
        for rating in ratings:
                total_ratings += rating
                counter += 1
        yield movieid, (1.0 * total_ratings)/counter * 1.0


if __name__ == '__main__':
    MRAverageRatingPerMovie.run()

How to run your map reduce program using mrjob on top of hortonworks distribution

Don’t to be afraid. Its a very basic approach. I have paste the command below this snippet.


[root@namenode ml-latest]# python AverageRatingCountPerMovie.py \
>     -r hadoop \
>     --hadoop-streaming-jar /usr/hdp/current/hadoop-client/hadoop-streaming.jar \
>     --output hdfs://datanode1.selise.ch/user/root/movielen/output5 \
>     hdfs:///user/root/movielen/ratings.csv > log

and here is my out put log once I press the enter button

No configs found; falling back on auto-configuration
Looking for hadoop binary in $PATH...
Found hadoop binary: /usr/bin/hadoop
Using Hadoop version 2.7.1.2.4.2.0
Creating temp directory /tmp/word_count.root.20170814.122244.762965
Copying local files to hdfs:///user/root/tmp/mrjob/word_count.root.20170814.122244.762965/files/...
Running step 1 of 1...
  WARNING: Use "yarn jar" to launch YARN applications.
  packageJobJar: [] [/usr/hdp/2.4.2.0-258/hadoop/hadoop-streaming.jar] /tmp/streamjob211933095411666257.jar tmpDir=null
  Timeline service address: http://datanode2.selise.ch:8188/ws/v1/timeline/
  Timeline service address: http://datanode2.selise.ch:8188/ws/v1/timeline/
  Total input paths to process : 1
  number of splits:6
  Submitting tokens for job: job_1502454305190_0007
  Submitted application application_1502454305190_0007
  The url to track the job: http://datanode2.selise.ch:8088/proxy/application_1502454305190_0007/
  Running job: job_1502454305190_0007
  Job job_1502454305190_0007 running in uber mode : false
   map 0% reduce 0%
   map 5% reduce 0%
   map 6% reduce 0%
   map 9% reduce 0%
   map 13% reduce 0%
   map 17% reduce 0%
   map 18% reduce 0%
   map 20% reduce 0%
   map 26% reduce 0%
   map 27% reduce 0%
   map 28% reduce 0%
   map 30% reduce 0%
   map 31% reduce 6%
   map 32% reduce 6%
   map 34% reduce 6%
   map 35% reduce 6%
   map 36% reduce 6%
   map 38% reduce 6%
   map 39% reduce 6%
   map 41% reduce 6%
   map 42% reduce 6%
   map 43% reduce 6%
   map 44% reduce 6%
   map 45% reduce 6%
   map 46% reduce 6%
   map 47% reduce 6%
   map 49% reduce 6%
   map 51% reduce 6%
   map 53% reduce 6%
   map 54% reduce 6%
   map 56% reduce 6%
   map 57% reduce 6%
   map 62% reduce 6%
   map 63% reduce 6%
   map 64% reduce 6%
   map 65% reduce 11%
   map 66% reduce 11%
   map 67% reduce 11%
   map 68% reduce 11%
   map 69% reduce 11%
   map 70% reduce 11%
   map 71% reduce 11%
   map 72% reduce 11%
   map 73% reduce 11%
   map 74% reduce 11%
   map 75% reduce 11%
   map 76% reduce 11%
   map 77% reduce 11%
   map 89% reduce 11%
   map 94% reduce 11%
   map 100% reduce 11%
   map 100% reduce 17%
   map 100% reduce 22%
   map 100% reduce 28%
   map 100% reduce 45%
   map 100% reduce 59%
   map 100% reduce 67%
   map 100% reduce 68%
   map 100% reduce 69%
   map 100% reduce 70%
   map 100% reduce 71%
   map 100% reduce 72%
   map 100% reduce 73%
   map 100% reduce 74%
   map 100% reduce 75%
   map 100% reduce 76%
   map 100% reduce 77%
   map 100% reduce 78%
   map 100% reduce 79%
   map 100% reduce 80%
   map 100% reduce 81%
   map 100% reduce 82%
   map 100% reduce 83%
   map 100% reduce 84%
   map 100% reduce 85%
   map 100% reduce 86%
   map 100% reduce 87%
   map 100% reduce 88%
   map 100% reduce 89%
   map 100% reduce 90%
   map 100% reduce 91%
   map 100% reduce 92%
   map 100% reduce 93%
   map 100% reduce 94%
   map 100% reduce 95%
   map 100% reduce 96%
   map 100% reduce 97%
   map 100% reduce 98%
   map 100% reduce 99%
   map 100% reduce 100%
  Job job_1502454305190_0007 completed successfully
  Output directory: hdfs://datanode1.selise.ch/user/root/movielen/output5
Counters: 49
        File Input Format Counters
                Bytes Read=710205687
        File Output Format Counters
                Bytes Written=808301
        File System Counters
                FILE: Number of bytes read=284899443
                FILE: Number of bytes written=570779561
                FILE: Number of large read operations=0
                FILE: Number of read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=710206371
                HDFS: Number of bytes written=808301
                HDFS: Number of large read operations=0
                HDFS: Number of read operations=21
                HDFS: Number of write operations=2
        Job Counters
                Data-local map tasks=6
                Launched map tasks=6
                Launched reduce tasks=1
                Total megabyte-seconds taken by all map tasks=972241920
                Total megabyte-seconds taken by all reduce tasks=533227520
                Total time spent by all map tasks (ms)=632970
                Total time spent by all maps in occupied slots (ms)=1265940
                Total time spent by all reduce tasks (ms)=260365
                Total time spent by all reduces in occupied slots (ms)=520730
                Total vcore-seconds taken by all map tasks=632970
                Total vcore-seconds taken by all reduce tasks=260365
        Map-Reduce Framework
                CPU time spent (ms)=727900
                Combine input records=0
                Combine output records=0
                Failed Shuffles=0
                GC time elapsed (ms)=5188
                Input split bytes=684
                Map input records=26024290
                Map output bytes=232850855
                Map output materialized bytes=284899473
                Map output records=26024291
                Merged Map outputs=6
                Physical memory (bytes) snapshot=7586971648
                Reduce input groups=45116
                Reduce input records=26024291
                Reduce output records=45116
                Reduce shuffle bytes=284899473
                Shuffled Maps =6
                Spilled Records=52048582
                Total committed heap usage (bytes)=7692353536
                Virtual memory (bytes) snapshot=23136010240
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
Streaming final output from hdfs://datanode1.selise.ch/user/root/movielen/output5...
Removing HDFS temp directory hdfs:///user/root/tmp/mrjob/word_count.root.20170814.122244.762965...
Removing temp directory /tmp/word_count.root.20170814.122244.762965...

Here is the std output

[root@namenode ml-latest]# head -n 50 log
0       0.0
1       3.8881574960610834
10      3.431840536054589
100     3.232303877366997
1000    3.0990783410138247
100001  4.0
100003  3.6666666666666665
100006  3.3333333333333335
100008  3.7142857142857144
100010  2.522727272727273
100013  3.4444444444444446
100015  2.5
100017  3.1
100032  3.6
100034  3.28125
100036  3.388888888888889
100038  3.4545454545454546
100040  3.0
100042  3.3333333333333335
100044  4.271573604060913
100046  3.769230769230769
100048  3.1666666666666665
100050  3.5
100052  2.0454545454545454
100054  3.75
100056  2.55
100058  3.2045454545454546
100060  3.576923076923077
100062  3.716666666666667
100068  3.6265060240963853
100070  3.5416666666666665
100072  2.875
100075  2.8157894736842106
100077  2.3333333333333335
100079  1.5
100081  3.7
100083  2.3055555555555554
100085  3.5
100087  2.8823529411764706
100089  3.6707317073170733
100091  3.0
100093  2.889830508474576
100096  3.125
100099  4.1
1001    2.8255813953488373
100101  4.0
100103  0.5
100106  3.7413793103448274
100108  3.16815144766147
100131  1.5

Geographical nearest neighbor using bigdata technologies

One of my complex problem in my blog. Yap its a little bit complex when you are dealing with bigdata for your specific problem. The problem you need to basically solve a nearest particle for a specific points. Then number of particle and number of points can be large. So obviously you have to handle this things using BigData, GIS, SQL, and some other algorithmic knowledge.

First of all I need to explain what is rtree. What purposes do we need to use this tree? How the problem can be solved and what kind of problem.

R-tree was Invented by Antonin Guttman in 1984 which is also my birth year :-D. So I am gonna explain this tree right at this moment.

R-trees are tree data structures used for spatial access methods, i.e., for indexing multi-dimensional information such as geographical coordinates, rectangles or polygons. A common real-world usage for an R-tree might be to store spatial objects such as restaurant locations or the polygons that typical maps are made of: streets, buildings, outlines of lakes, coastlines, etc. and then find answers quickly to queries such as “Find all museums within 2 km of my current location”, “retrieve all road segments within 2 km of my location” (to display them in a navigation system) or “find the nearest gas station” (although not taking roads into account). The R-tree can also accelerate nearest neighbor search for various distance metrics, including great-circle distance – wikipedia

Here is sample code snippet for my bigdata geo spatial nearest neighbor search algorithm. What I have used in here. First of all I have used python library shapely, rtree and anaconda package manager and few bigdata tools which are spark cluster, hadoop hdfs file systems
which is running over 3 node cluster, I have used 1 driver node and 3 slave node for my spark and hadoop cluster. When I run the script on hadoop cluster using spark at that time the task was not that much easy. I had to handle lots of things to run the cluster properly. I have used hortonworks hadoop distribution for my bigdata platform.

from shutil import rmtree
from tempfile import mkdtemp

from pyspark import SparkFiles
from pyspark.sql.types import StringType
from rtree import index
from shapely.geometry import asPoint
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row

from  shapely import speedups
import os

sc = SparkContext()

def build_nearest_road(linestrings):
        def nearest_road(x, y):
                from shapely.geometry import asPoint
                from rtree import index
                from  shapely import speedups
                idx = index.Index(SparkFiles.get('index.idx')[:-4])
                try:
                        min_name = "NOT FOUND"
                        min_distance = 2000000000.0
                        min_linestring = "NOT FOUND"
                        for fid in idx.nearest((float(x), float(y), float(x), float(y)), 20):
                                speedups.enable()
                                point = asPoint([float(x), float(y)])
                                (name, linestring) = road_broadcast.value[fid]


                                from shapely.geometry import LineString
                                linestring = LineString(linestring)
                                distance = point.distance(linestring)
                                if min_distance > distance:
                                        min_distance = distance
                                        min_name = name
                                        #min_linestring = linestring
                        return min_name
                except:
                        import sys
                        print str(sys.exc_info()[1])
                        return "NOT FOUND"
                        #if point.intersects(polygon):
                        #       return name
                return "NOT FOUND"

        from shapely.geometry import Point
        from shapely.geometry import Polygon
        from shapely.geometry import LineString
        from shapely.geometry import LineString, mapping, shape
        from shapely.wkt import loads
        linestrings_list = []
        for (name, linestring_wkt) in linestrings:
                linestring = loads(linestring_wkt)
                #linestring = shape(mapping(linestring))
                linestring = LineString(linestring.coords)
                linestrings_list.append((name, linestring))

        tempdir = mkdtemp()
        basename = os.path.join(tempdir, 'index')

        #try:
        idx = index.Index(basename, ((fid, linestring.bounds, None)
                for (fid, (name, linestring)) in enumerate(linestrings_list)))
        idx.close()
        sc.addFile(basename + '.idx')
        sc.addFile(basename + '.dat')
        #finally:
        #       rmtree(tempdir)

        road_broadcast = sc.broadcast(linestrings_list)
        return nearest_road

roads = sc.textFile("hdfs://datanode1.selise.ch/user/admin/geodjango/roads/part-m-00[0-9]*")
parts_list = roads.map(lambda l: l.split("LINESTRING"))
roads = parts_list.map(lambda parts: (parts[0], "LINESTRING" + parts[1]))
roads = roads.toLocalIterator()

sqlContext = SQLContext(sc)

zurich_waypoints = sc.textFile("hdfs://datanode1.selise.ch/user/admin/waypoints_in_zurich_2w/part-m-00[0-9]*")
zurich_waypoints = zurich_waypoints.map(lambda l: l.split(","))
zurich_waypoints = zurich_waypoints.map(lambda parts: Row(longitude=parts[3], latitude=parts[4], tripid=parts[0], CaptureDateHour=parts[1]))
zurich_waypoints = sqlContext.createDataFrame(zurich_waypoints)
zurich_waypoints.registerTempTable("zurich_waypoints")

sqlContext.registerFunction('getNearestRoad', build_nearest_road(roads))

df = sqlContext.sql("select getNearestRoad(zw.latitude, zw.longitude), zw.* from zurich_waypoints zw")
df.write.format('com.databricks.spark.csv').mode('overwrite').save('/home/admin/pyspark/zurich-road-waypoints-yarn-cluster.csv')

Now lets discuss the coding. Here you can see that I have used a python nested function for spark sql context. I have tried to build my road network in rtree as a spark udf function which will run as a distributed geo rtree over 3 hadoop node. That means you can query any road info into the rtree using spark sql context and the search functionality will work over multiple pcs. I have used a threshold value in rtree for over lapping or collusion detection for my road network. I have found that when a rtree form using road networks its not always the nearest road for a specific point. Here is the picture you might consider for your understanding the thoughts.

You could see that red is the point when rtree form using road networks it will return the R2 road but originally the nearest neighbor is R1. So I have considered a threshold value to over come the problems.

Here is a small sample data for testing the script

Near about 10Million data for testing

All data nodes are running.

Cpu usages 100%

You can see that datanode1.selise.ch, datanode2.selise.ch, namenode.selise.ch are running

All nodes are running

The article is not completed. I will explain

Why Hive on top of Hadoop spatial framework is not good for this kind of analysis

Why postgis is not suitable for this kind of analysis

Continued …

1.7 Billion weather data analysis which is around 28 years data for two different places

Hello Guys, Today I am going to give some idea how I handled 1.7 billion weather data which is around 28 years of data. Our main target was to generate some analytics from this weather data. We have handled only temperature and humidity data. Which is around 22GB.

Now one the main challenge is how we will download the data from the client server. Client specific given url is quite good enough to download the data. I had to download using some basic shell command. At the very first time I was concern to download the data using xargs but I changed my mind to not to use it. Because I had no hurry to download the data and my local pc bandwidth was not that much high to use xargs.

You could use xargs to download the data from a specific location

downloader.sh

d=1990-01-01
while [ "$d" != 2017-04-05 ]; do 
  echo  https://xxxxxxxx.com/"$d"T00:00:00ZXXXXXX/xxxxxxxxxx/xxxxxx.xxxxxxxxx/csv
  d=$(date -I -d "$d + 1 day")
done
[root@namenode.selise.ch ~]# chmod +x downloader.sh
[root@namenode.selise.ch ~]# ./downloader.sh > urls.txt
[root@namenode.selise.ch ~]# cat urls.txt | xargs -n 1 -P 6 wget -P data/

but what I have used for the download purposes which is given below

downloader.sh

d=1990-01-01
while [ "$d" != 2017-04-05 ]; do 
  echo  wget https://xxxxxxxx.com/"$d"T00:00:00ZXXXXXX/xxxxxxxxxx/xxxxxx.xxxxxxxxx/csv
  d=$(date -I -d "$d + 1 day")
done

chmod +x downloader.sh
./downloader.sh > urls.txt
chmod +x urls.txt
./urls.txt

It takes almost 48 hours using my low bandwidth.

Now I am ready to store the data in hdfs for analysis

hadoop fs -put *.csv /user/root/weather

Its also a time consuming to store.
Lets see how many records are there


hive> select count(*) from WeatherTemperatureHumidity_orc_update;
OK
1719639510
Time taken: 1.792 seconds, Fetched: 1 row(s)

now lets create the external hive table to use the hdfs data

CREATE EXTERNAL TABLE IF NOT EXISTS WeatherTemperatureHumidity(
        validdate string,
        temperature float,
        humidity float
    )
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ';'
    STORED AS TEXTFILE
    location '/user/root/weather';
CREATE TABLE IF NOT EXISTS WeatherTemperatureHumidity_orc(
        validdate string,
        temperature double,
        humidity double,
        suburb string)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ';'
    STORED AS ORC;
    
INSERT OVERWRITE TABLE WeatherTemperatureHumidity_orc SELECT w.*, "xxxx" FROM WeatherTemperatureHumidity w;
CREATE TABLE IF NOT EXISTS WeatherTemperatureHumidity_orc_update (
        `year` string,
        `month` string,
        `date` string,
        validdate string,
        temperature double,
        humidity double,
        suburb string)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY ';'
    STORED AS ORC;
    
drop table WeatherTemperatureHumidity_orc_update;

INSERT OVERWRITE TABLE WeatherTemperatureHumidity_orc_update select year(to_date(w.validdate)), month(to_date(w.validdate)), to_date(w.validdate), w.validdate, w.temperature, w.humidity, w.suburb from WeatherTemperatureHumidity_orc w where w.validdate is not NULL

Now you are ready to start your analytics from the hive table. Here I have tried to find out the the min and max temperature and humidity from the weather hive table.

hive> select `year`, min(temperature), max(temperature) from WeatherTemperatureHumidity_orc_update where suburb='XXXX' group by `year` order by `year` desc;
Query ID = root_20170406182432_92f15cf5-5a23-4250-8739-288676e0589d
Total jobs = 1
Launching Job 1 out of 1


Status: Running (Executing on YARN cluster with App id application_1491480905379_0001)

--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED     38         38        0        0       0       0
Reducer 2 ......   SUCCEEDED    253        253        0        0       0       0
Reducer 3 ......   SUCCEEDED      1          1        0        0       0       0
--------------------------------------------------------------------------------
VERTICES: 03/03  [==========================>>] 100%  ELAPSED TIME: 48.97 s
--------------------------------------------------------------------------------
OK
2017    -12.399999618530273     20.100000381469727
2016    -6.900000095367432      31.200000762939453
2015    -9.699999809265137      33.099998474121094
2014    -15.800000190734863     30.700000762939453
2013    -13.199999809265137     30.5
2012    -19.299999237060547     30.700000762939453
2011    -12.199999809265137     30.0
2010    -15.199999809265137     29.399999618530273
2009    -16.5   28.5
2008    -12.800000190734863     27.700000762939453
2007    -14.199999809265137     29.600000381469727
2006    -11.0   29.200000762939453
2005    -15.600000381469727     28.5
2004    -10.699999809265137     26.5
2003    -15.0   31.700000762939453
2002    -13.300000190734863     29.799999237060547
2001    -16.899999618530273     30.100000381469727
2000    -16.899999618530273     28.700000762939453
1999    -15.0   28.0
1998    -15.0   29.600000381469727
1997    -11.399999618530273     26.799999237060547
1996    -16.799999237060547     27.100000381469727
1995    -19.399999618530273     28.799999237060547
1994    -13.699999809265137     29.700000762939453
1993    -18.600000381469727     28.700000762939453
1992    -18.100000381469727     29.799999237060547
1991    -18.799999237060547     30.600000381469727
1990    -11.699999809265137     29.100000381469727
NULL    NULL    NULL
Time taken: 53.314 seconds, Fetched: 29 row(s)

Now lets see for a another suburb

hive> select `year`, min(temperature), max(temperature) from WeatherTemperatureHumidity_orc_update where suburb='XXXXXXX' group by `year` order by `year` desc;
Query ID = root_20170406182754_271dd6cb-b69e-4fd3-b4c0-3e70d284fa1d
Total jobs = 1
Launching Job 1 out of 1


Status: Running (Executing on YARN cluster with App id application_1491480905379_0001)

--------------------------------------------------------------------------------
        VERTICES      STATUS  TOTAL  COMPLETED  RUNNING  PENDING  FAILED  KILLED
--------------------------------------------------------------------------------
Map 1 ..........   SUCCEEDED     38         38        0        0       0       0
Reducer 2 ......   SUCCEEDED    253        253        0        0       0       0
Reducer 3 ......   SUCCEEDED      1          1        0        0       0       0
--------------------------------------------------------------------------------
VERTICES: 03/03  [==========================>>] 100%  ELAPSED TIME: 47.21 s
--------------------------------------------------------------------------------
OK
2017    -12.399999618530273     19.700000762939453
2016    -7.300000190734863      30.600000381469727
2015    -9.100000381469727      32.79999923706055
2014    -15.300000190734863     30.600000381469727
2013    -13.199999809265137     30.299999237060547
2012    -19.5   30.600000381469727
2011    -12.399999618530273     29.899999618530273
2010    -15.399999618530273     29.200000762939453
2009    -16.600000381469727     28.299999237060547
2008    -13.0   27.5
2007    -14.399999618530273     29.399999618530273
2006    -11.199999809265137     29.0
2005    -15.699999809265137     28.399999618530273
2004    -10.800000190734863     26.399999618530273
2003    -15.199999809265137     31.5
2002    -13.399999618530273     29.700000762939453
2001    -16.899999618530273     29.899999618530273
2000    -16.799999237060547     28.600000381469727
1999    -15.199999809265137     27.899999618530273
1998    -15.100000381469727     29.399999618530273
1997    -11.300000190734863     26.600000381469727
1996    -17.0   27.0
1995    -19.5   28.700000762939453
1994    -13.899999618530273     29.600000381469727
1993    -18.799999237060547     28.5
1992    -18.200000762939453     29.600000381469727
1991    -18.899999618530273     30.5
1990    -11.800000190734863     29.0
NULL    NULL    NULL
Time taken: 50.031 seconds, Fetched: 29 row(s)

It takes around 61 seconds to complete from 1.7 billion data using 3 node hadoop cluster. Where my all nodes are using intel i5 with 16GB RAM and my name node is using 20GB.

Biketrips and parking data benchmark using hive and presto

Today I will give you some idea regrading hive and presto. Hive is a distributed data warehouse system where you could be able to read, write and manage your data through SQL. And presto is a another SQL Engine for interactive analytic queries against data. Presto can handle terabytes to petabyte range data. Currently presto is managing 300Petabytes of data in Facebook by 1000 engineers. They are running more than 30000 queries over this presto Custer.

Recently I have setup the presto cluster in my local environment. Where I have used 3 worker node and a coordinator and I have used presto over hive. I have experimented the presto Custer over 229 Million data sets. Its basically a parking spot statistical data which was one my small scale bigdata project where I had to handle and migrate the data from one server to another server. But this is not the main issues, actually I had to experiment the server performance once I informed apache presto is a kind of good choice for data interactive analytic queries.

Here are some of hive and presto benchmark what I have found during the migration from one server to another server.


Figure: Here is my parking events text parquet schema which was used during my migration


Figure: Year wise number of parking events when presto over hive is orc format and it takes 2:01 seconds for 1 worker nodes


Figure: Year wise parking events over hive orc format and it takes only 53 seconds which quite huge compare to presto because hive running for 3 node cluster and presto is for single node.


Figure: Total number of entry when presto over hive orc format and it takes 0:03 seconds which is very fast for 229M records


Figure: Total number of entry when hive orc format and it takes around 0.29 secons which is quite larger than presto.

Now its time to have a look on the hive as a parquet format.

Figure: Presto takes almost 6 to 9 second for single node


Figure: Now you can see that hive takes only 0.51 second for 3 worker node.

Biketrips data analysis

Here is a another data set which is biketrips. I have loaded the data for comparison. You could easily download he data from internet.

ORC format for both presto and hive

PARQUET format for both presto and hive

Uber data analysis

select * from trips_orc where trip_id is not NULL limit 10
 
trips_orc.trip_id	trips_orc.vendor_id	trips_orc.pickup_datetime	trips_orc.dropoff_datetime	trips_orc.store_and_fwd_flag	trips_orc.rate_code_id	trips_orc.pickup_longitude	trips_orc.pickup_latitude	trips_orc.dropoff_longitude	trips_orc.dropoff_latitude	trips_orc.passenger_count	trips_orc.trip_distance	trips_orc.fare_amount	trips_orc.extra	trips_orc.mta_tax	trips_orc.tip_amount	trips_orc.tolls_amount	trips_orc.ehail_fee	trips_orc.improvement_surcharge	trips_orc.total_amount	trips_orc.payment_type	trips_orc.trip_type	trips_orc.pickup	trips_orc.dropoff	trips_orc.cab_type	trips_orc.precipitation	trips_orc.snow_depth	trips_orc.snowfall	trips_orc.max_temperature	trips_orc.min_temperature	trips_orc.average_wind_speed	trips_orc.pickup_nyct2010_gid	trips_orc.pickup_ctlabel	trips_orc.pickup_borocode	trips_orc.pickup_boroname	trips_orc.pickup_ct2010	trips_orc.pickup_boroct2010	trips_orc.pickup_cdeligibil	trips_orc.pickup_ntacode	trips_orc.pickup_ntaname	trips_orc.pickup_puma	trips_orc.dropoff_nyct2010_gid	trips_orc.dropoff_ctlabel	trips_orc.dropoff_borocode	trips_orc.dropoff_boroname	trips_orc.dropoff_ct2010	trips_orc.dropoff_boroct2010	trips_orc.dropoff_cdeligibil	trips_orc.dropoff_ntacode	trips_orc.dropoff_ntaname	trips_orc.dropoff_puma
0	2	201	2016-01-01 00:39:36.0	NULL	1	-73	40.68061065673828	-73.92427825927734	40.69804382324219	1	1	8	0.5	0.5	1.86	0	NULL	0.3	11.16	1	1	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL
1	2	201	2016-01-01 00:39:18.0	NULL	1	-73	40.72317504882813	-73.92391967773438	40.76137924194336	1	3	15.5	0.5	0.5	0	0	NULL	0.3	16.8	2	1	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL
2	2	201	2016-01-01 00:39:48.0	NULL	1	-73	40.67610549926758	-74.0131607055664	40.64607238769531	1	3	16.5	0.5	0.5	4.45	0	NULL	0.3	22.25	1	1	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL
3	2	201	2016-01-01 00:38:32.0	NULL	1	-73	40.66957855224609	-74.00064849853516	40.68903350830078	1	3	13.5	0.5	0.5	0	0	NULL	0.3	14.8	2	1	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL
4	2	201	2016-01-01 00:39:22.0	NULL	1	-73	40.68285369873047	-73.94071960449219	40.66301345825195	1	2	12	0.5	0.5	0	0	NULL	0.3	13.3	2	1	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL
5	2	201	2016-01-01 00:39:35.0	NULL	1	-73	40.74645614624023	-73.86774444580078	40.74211120605469	1	1	7	0.5	0.5	0	0	NULL	0.3	8.3	2	1	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL
6	2	201	2016-01-01 00:39:21.0	NULL	1	-73	40.74619674682617	-73.88619232177734	40.74568939208984	1	0	5	0.5	0.5	0	0	NULL	0.3	6.3	2	1	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL
7	2	201	2016-01-01 00:39:36.0	NULL	1	-73	40.80355834960938	-73.94915008544922	40.79412078857422	1	1	7	0.5	0.5	0	0	NULL	0.3	8.3	2	1	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL
8	2	201	2016-01-01 00:39:52.0	NULL	1	-73	40.70281600952148	-73.97157287597656	40.67972564697266	1	2	12	0.5	0.5	2	0	NULL	0.3	15.3	1	1	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL
9	2	201	2016-01-01 00:39:23.0	NULL	1	-73	40.75664138793945	-73.91754913330078	40.73965835571289	1	1	9	0.5	0.5	1.6	0	NULL	0.3	11.9	1	1	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL	NULL

Here is a another data set which is 2016 uber trips for New York city. I have used only 3/2 month data for analysis.

Next time I will give you the more details idea over presto and hive. Today I have focused the single node presto cluster but next time I will introduce the multi node presto cluster experiment for your convenience.

How to automate your pyspark word count program using luigi ?

Suppose you have a word count management program which is PySparkWordCount.py and a python file wordcount.py which is given below.

And Here is my clint.cfg file for luigi configuration

[spark]
spark-submit: /root/spark-1.6.1-bin-hadoop2.6/bin/spark-submit
master: spark://namenode.selise.ch:7077

PySparkWordCount.py

import luigi
from luigi.s3 import S3Target
from luigi.contrib.spark import SparkSubmitTask, PySparkTask
import luigi.contrib.hadoop
import luigi.contrib.hdfs

class PySparkWordCount(SparkSubmitTask):
    """
    This task is the same as :py:class:`InlinePySparkWordCount` above but uses
    an external python driver file specified in :py:meth:`app`
    It runs a :py:class:`luigi.contrib.spark.SparkSubmitTask` task
    over the target data in :py:meth:`wordcount.input` (a file in S3) and
    writes the result into its :py:meth:`wordcount.output` target (a file in S3).
    This class uses :py:meth:`luigi.contrib.spark.SparkSubmitTask.run`.
    Example luigi configuration::
        [spark]
        spark-submit: /usr/local/spark/bin/spark-submit
        master: spark://spark.example.org:7077
        deploy-mode: client
    """
    driver_memory = '2g'
    executor_memory = '3g'
    total_executor_cores = luigi.IntParameter(default=100, significant=False)

    name = "PySpark Word Count"
    app = 'wordcount.py'

    def app_options(self):
        # These are passed to the Spark main args in the defined order.
        return [self.input().path, self.output().path]

    def input(self):
        return luigi.contrib.hdfs.HdfsTarget("hdfs://datanode1.selise.ch/home/admin/luigi/wordcount.input")

    def output(self):
        return luigi.contrib.hdfs.HdfsTarget('hdfs://datanode1.selise.ch/home/admin/luigi/wordcount.output')

if __name__ == '__main__':
        luigi.run()

Wordcount.py

import sys
from pyspark import SparkContext

if __name__ == "__main__":
    sc = SparkContext()
    sc.textFile(sys.argv[1]) \
      .flatMap(lambda line: line.split()) \
      .map(lambda word: (word, 1)) \
      .reduceByKey(lambda a, b: a + b) \
      .saveAsTextFile(sys.argv[2])

Please open your terminal and run the command for automation

[root@namenode wordcount]# time python PySparkWordCount.py PySparkWordCount --local-scheduler
No handlers could be found for logger "luigi-interface"
DEBUG: Checking if PySparkWordCount() is complete
/root/anaconda2/lib/python2.7/site-packages/luigi/parameter.py:259: UserWarning: Parameter None is not of type string.
  warnings.warn("Parameter {0} is not of type string.".format(str(x)))
DEBUG: Running file existence check: hadoop fs -stat hdfs://datanode1.selise.ch/home/admin/luigi/wordcount.output
INFO: Informed scheduler that task   PySparkWordCount__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 51836] Worker Worker(salt=818063944, workers=1, host=namenode.selise.ch, username=root, pid=51836) running   PySparkWordCount()
INFO: Running command: /root/spark-1.6.1-bin-hadoop2.6/bin/spark-submit --master spark://namenode.selise.ch:7077 --name PySpark Word Count --driver-memory 2g --executor-memory 3g --total-ex$
cutor-cores 100 wordcount.py hdfs://datanode1.selise.ch/home/admin/luigi/wordcount.input hdfs://datanode1.selise.ch/home/admin/luigi/wordcount.output
INFO: [pid 51836] Worker Worker(salt=818063944, workers=1, host=namenode.selise.ch, username=root, pid=51836) done      PySparkWordCount()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   PySparkWordCount__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=818063944, workers=1, host=namenode.selise.ch, username=root, pid=51836) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 PySparkWordCount(total_executor_cores=100)

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====


real    0m17.074s
user    0m14.077s
sys     0m5.391s

Real time streaming using Spark streaming, Kafka and Pyspark

Now a day real time streaming is a one of the most challenging task in our data industries. It might be sensor data, fleet tacking, traffic, cellular, website real time streaming or web crawling. You might need to analyze the data on your real time for your application or end user needs.

Spark stream and storm are kind of technologies which will help you a lot for your real time streaming purposes. My choice is spark streaming for real time streaming.

Here I have implemented a basic streaming application which will cover the Spark streaming, kafka and a basic crawling using python.

will be continued …

How to export remote pc’s csv file to HDFS file system?

Last night I had to work on hdfs where I need to import remote pc’s csv file to hdfs file system. Here are the steps what I have followed.

STEP1
First I have conformed password less login

mohiulalamprince@ubuntu:/home/postgres$ ssh-keygen -t rsa
Generating public/private rsa key pair.
Enter file in which to save the key (/home/mohiulalamprince/.ssh/id_rsa):
Enter passphrase (empty for no passphrase):
Enter same passphrase again:
Your identification has been saved in /home/mohiulalamprince/.ssh/id_rsa.
Your public key has been saved in /home/mohiulalamprince/.ssh/id_rsa.pub.
The key fingerprint is:
d1:17:73:19:74:15:81:15:6a:d4:f8:18:9b:02:f5:6b mohiulalamprince@ubuntu
The key's randomart image is:
+--[ RSA 2048]----+
|          ..ooOBB|
|         o  oBoo |
|        . o .+*  |
|         . o.+.. |
|        S   .E   |
|            .    |
|                 |
|                 |
|                 |
+-----------------+

STEP2

mohiulalamprince@ubuntu:/home/postgres$ ssh-copy-id admin@172.16.0.215
/usr/bin/ssh-copy-id: INFO: attempting to log in with the new key(s), to filter out any that are already installed
/usr/bin/ssh-copy-id: INFO: 1 key(s) remain to be installed -- if you are prompted now it is to install the new keys
admin@172.16.0.215's password:

Number of key(s) added: 1

Now try logging into the machine, with:   "ssh 'admin@172.16.0.215'"
and check to make sure that only the key(s) you wanted were added.

STEP3

mohiulalamprince@ubuntu:/home/postgres$ cat waypoints_road_area.csv | ssh admin@172.16.0.215 "hadoop dfs -put - /user/admin/waypoints_road_area.csv"
DEPRECATED: Use of this script to execute hdfs command is deprecated.
Instead use the hdfs command for it.

How to configure Jupyter notebook using pyspark ?

Step 1: First you need to install Jupyter notebook using below this command:

pip install jupyter
pip install "ipython[notebook]"

Step 2: Then you need to setup your environment variable in ~/.bashrc

export SPARK_HOME="/usr/hdp/current/spark-client"
export PYSPARK_SUBMIT_ARGS="--master local[2]"

Step 3: Then type in your terminal below this command

jupyter notebook --generate-config

Step 4: Now you need to open this file using vim or text editor

                   /root/.jupyter/jupyter_notebook_config.py

                   and paste this below lines
                   c.NotebookApp.ip = '*'
                   c.NotebookApp.open_browser = False
                   c.NotebookApp.port = 8889
                   c.NotebookApp.notebook_dir = u'/usr/hdp/current/spark-client/'

Step 5: And then create a file name “00-pyspark-setup.py” in “/root/.ipython/profile_default/” location and paste this given text

import os
import sys

spark_home = os.environ.get('SPARK_HOME', None)
sys.path.insert(0, spark_home + "/python")

sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip'))

pyspark_submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "")
if not "pyspark-shell" in pyspark_submit_args: pyspark_submit_args += " pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))

Step 6: Now type in your terminal “jupyter notebook” command

And enjoy jupyter notebook using pyspark