Geographical nearest neighbor using bigdata technologies

One of my complex problem in my blog. Yap its a little bit complex when you are dealing with bigdata for your specific problem. The problem you need to basically solve a nearest particle for a specific points. Then number of particle and number of points can be large. So obviously you have to handle this things using BigData, GIS, SQL, and some other algorithmic knowledge.

First of all I need to explain what is rtree. What purposes do we need to use this tree? How the problem can be solved and what kind of problem.

R-tree was Invented by Antonin Guttman in 1984 which is also my birth year :-D. So I am gonna explain this tree right at this moment.

R-trees are tree data structures used for spatial access methods, i.e., for indexing multi-dimensional information such as geographical coordinates, rectangles or polygons. A common real-world usage for an R-tree might be to store spatial objects such as restaurant locations or the polygons that typical maps are made of: streets, buildings, outlines of lakes, coastlines, etc. and then find answers quickly to queries such as “Find all museums within 2 km of my current location”, “retrieve all road segments within 2 km of my location” (to display them in a navigation system) or “find the nearest gas station” (although not taking roads into account). The R-tree can also accelerate nearest neighbor search for various distance metrics, including great-circle distance – wikipedia

Here is sample code snippet for my bigdata geo spatial nearest neighbor search algorithm. What I have used in here. First of all I have used python library shapely, rtree and anaconda package manager and few bigdata tools which are spark cluster, hadoop hdfs file systems
which is running over 3 node cluster, I have used 1 driver node and 3 slave node for my spark and hadoop cluster. When I run the script on hadoop cluster using spark at that time the task was not that much easy. I had to handle lots of things to run the cluster properly. I have used hortonworks hadoop distribution for my bigdata platform.

from shutil import rmtree
from tempfile import mkdtemp

from pyspark import SparkFiles
from pyspark.sql.types import StringType
from rtree import index
from shapely.geometry import asPoint
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row

from  shapely import speedups
import os

sc = SparkContext()

def build_nearest_road(linestrings):
        def nearest_road(x, y):
                from shapely.geometry import asPoint
                from rtree import index
                from  shapely import speedups
                idx = index.Index(SparkFiles.get('index.idx')[:-4])
                try:
                        min_name = "NOT FOUND"
                        min_distance = 2000000000.0
                        min_linestring = "NOT FOUND"
                        for fid in idx.nearest((float(x), float(y), float(x), float(y)), 20):
                                speedups.enable()
                                point = asPoint([float(x), float(y)])
                                (name, linestring) = road_broadcast.value[fid]


                                from shapely.geometry import LineString
                                linestring = LineString(linestring)
                                distance = point.distance(linestring)
                                if min_distance > distance:
                                        min_distance = distance
                                        min_name = name
                                        #min_linestring = linestring
                        return min_name
                except:
                        import sys
                        print str(sys.exc_info()[1])
                        return "NOT FOUND"
                        #if point.intersects(polygon):
                        #       return name
                return "NOT FOUND"

        from shapely.geometry import Point
        from shapely.geometry import Polygon
        from shapely.geometry import LineString
        from shapely.geometry import LineString, mapping, shape
        from shapely.wkt import loads
        linestrings_list = []
        for (name, linestring_wkt) in linestrings:
                linestring = loads(linestring_wkt)
                #linestring = shape(mapping(linestring))
                linestring = LineString(linestring.coords)
                linestrings_list.append((name, linestring))

        tempdir = mkdtemp()
        basename = os.path.join(tempdir, 'index')

        #try:
        idx = index.Index(basename, ((fid, linestring.bounds, None)
                for (fid, (name, linestring)) in enumerate(linestrings_list)))
        idx.close()
        sc.addFile(basename + '.idx')
        sc.addFile(basename + '.dat')
        #finally:
        #       rmtree(tempdir)

        road_broadcast = sc.broadcast(linestrings_list)
        return nearest_road

roads = sc.textFile("hdfs://datanode1.selise.ch/user/admin/geodjango/roads/part-m-00[0-9]*")
parts_list = roads.map(lambda l: l.split("LINESTRING"))
roads = parts_list.map(lambda parts: (parts[0], "LINESTRING" + parts[1]))
roads = roads.toLocalIterator()

sqlContext = SQLContext(sc)

zurich_waypoints = sc.textFile("hdfs://datanode1.selise.ch/user/admin/waypoints_in_zurich_2w/part-m-00[0-9]*")
zurich_waypoints = zurich_waypoints.map(lambda l: l.split(","))
zurich_waypoints = zurich_waypoints.map(lambda parts: Row(longitude=parts[3], latitude=parts[4], tripid=parts[0], CaptureDateHour=parts[1]))
zurich_waypoints = sqlContext.createDataFrame(zurich_waypoints)
zurich_waypoints.registerTempTable("zurich_waypoints")

sqlContext.registerFunction('getNearestRoad', build_nearest_road(roads))

df = sqlContext.sql("select getNearestRoad(zw.latitude, zw.longitude), zw.* from zurich_waypoints zw")
df.write.format('com.databricks.spark.csv').mode('overwrite').save('/home/admin/pyspark/zurich-road-waypoints-yarn-cluster.csv')

Now lets discuss the coding. Here you can see that I have used a python nested function for spark sql context. I have tried to build my road network in rtree as a spark udf function which will run as a distributed geo rtree over 3 hadoop node. That means you can query any road info into the rtree using spark sql context and the search functionality will work over multiple pcs. I have used a threshold value in rtree for over lapping or collusion detection for my road network. I have found that when a rtree form using road networks its not always the nearest road for a specific point. Here is the picture you might consider for your understanding the thoughts.

You could see that red is the point when rtree form using road networks it will return the R2 road but originally the nearest neighbor is R1. So I have considered a threshold value to over come the problems.

Here is a small sample data for testing the script

Near about 10Million data for testing

All data nodes are running.

Cpu usages 100%

You can see that datanode1.selise.ch, datanode2.selise.ch, namenode.selise.ch are running

All nodes are running

The article is not completed. I will explain

Why Hive on top of Hadoop spatial framework is not good for this kind of analysis

Why postgis is not suitable for this kind of analysis

Continued …

How to setup standalone spark cluster ?

First of all you need to download the spark-hadoop binary from spark site. I have already downloaded the binary. Then extracted using tar command

tar -xvf spark-1.6.1-bin-hadoop2.6.tgz
[root@namenode ~]# ls -la spark-*
-rw-r--r--  1 root root 289405702 Feb 16 18:53 spark-1.6.1-bin-hadoop2.6.tgz
-rw-r--r--  1 root root 195636829 Dec 29 06:49 spark-2.1.0-bin-hadoop2.7.tgz

spark-1.6.1-bin-hadoop2.6:
total 1408
drwxr-xr-x  14  500  500    4096 Feb 16 18:56 .
dr-xr-x---. 38 root root    4096 Mar 28 13:25 ..
drwxr-xr-x   2  500  500    4096 Feb 27  2016 bin
-rw-r--r--   1  500  500 1343562 Feb 27  2016 CHANGES.txt
drwxr-xr-x   2  500  500    4096 Feb 27  2016 conf
drwxr-xr-x   3  500  500      18 Feb 27  2016 data
drwxr-xr-x   3  500  500      75 Feb 27  2016 ec2
drwxr-xr-x   3  500  500      16 Feb 27  2016 examples
drwxr-xr-x   2  500  500    4096 Feb 27  2016 lib
-rw-r--r--   1  500  500   17352 Feb 27  2016 LICENSE
drwxr-xr-x   2  500  500    4096 Feb 27  2016 licenses
drwxr-xr-x   2 root root    4096 Mar 12 17:17 logs
-rw-r--r--   1  500  500   23529 Feb 27  2016 NOTICE
drwxr-xr-x   6  500  500     112 Feb 27  2016 python
drwxr-xr-x   3  500  500      16 Feb 27  2016 R
-rw-r--r--   1  500  500    3359 Feb 27  2016 README.md
-rw-r--r--   1  500  500     120 Feb 27  2016 RELEASE
drwxr-xr-x   2  500  500    4096 Feb 27  2016 sbin
drwxr-xr-x  22 root root    4096 Mar 12 17:18 work

spark-2.1.0-bin-hadoop2.7:
total 100
drwxr-xr-x  12  500  500  4096 Dec 16 08:18 .
dr-xr-x---. 38 root root  4096 Mar 28 13:25 ..
drwxr-xr-x   2  500  500  4096 Dec 16 08:18 bin
drwxr-xr-x   2  500  500  4096 Dec 16 08:18 conf
drwxr-xr-x   5  500  500    47 Dec 16 08:18 data
drwxr-xr-x   4  500  500    27 Dec 16 08:18 examples
drwxr-xr-x   2  500  500  8192 Dec 16 08:18 jars
-rw-r--r--   1  500  500 17811 Dec 16 08:18 LICENSE
drwxr-xr-x   2  500  500  4096 Dec 16 08:18 licenses
-rw-r--r--   1  500  500 24645 Dec 16 08:18 NOTICE
drwxr-xr-x   9  500  500  4096 Dec 16 08:18 python
drwxr-xr-x   3  500  500    16 Dec 16 08:18 R
-rw-r--r--   1  500  500  3818 Dec 16 08:18 README.md
-rw-r--r--   1  500  500   128 Dec 16 08:18 RELEASE
drwxr-xr-x   2  500  500  4096 Dec 16 08:18 sbin
drwxr-xr-x   2  500  500    41 Dec 16 08:18 yarn

I will use the version 1.6.1 for my spark cluster. What you really need to do just set your SPARK_HOME path to your ~/.bashrc file. My ~/.bashrc file is look like this

[root@namenode ~]# cat ~/.bashrc | grep SPARK
#export SPARK_HOME="/usr/hdp/current/spark-client"
#export PYSPARK_SUBMIT_ARGS="--master local[2]"
export SPARK_HOME=/root/spark-1.6.1-bin-hadoop2.6
export PATH=$SPARK_HOME/bin:$PATH
export PYSPARK_PYTHON=/root/anaconda2/bin/python
export SPARK_YARN_USER_ENV=PYTHONHASHSEED=0

Here is my overall ~/.bashrc file signature, which is not related to this post. I have used Hortonworks distribution for my Hadoop.

[root@namenode ~]# cat ~/.bashrc
# .bashrc

# User specific aliases and functions

alias rm='rm -i'
alias cp='cp -i'
alias mv='mv -i'

# Source global definitions
if [ -f /etc/bashrc ]; then
        . /etc/bashrc
fi

#export SPARK_HOME="/usr/hdp/current/spark-client"
#export PYSPARK_SUBMIT_ARGS="--master local[2]"

export SPARK_HOME=/root/spark-1.6.1-bin-hadoop2.6
export PATH=$SPARK_HOME/bin:$PATH

export LD_LIBRARY_PATH=/usr/local/lib:$LD_LIBRARY_PATH

export JAVA_HOME=/usr/jdk64/jdk1.8.0_60/
export PATH=$JAVA_HOME/bin:$PATH

export GEOS_DIR=/usr/local/geos

# added by Anaconda2 4.2.0 installer
export PATH="/root/anaconda2/bin:$PATH"

export PATH=$PATH:/opt/activemq/bin
export HADOOP_CONF_DIR=/etc/hadoop/conf

export PHOENIX_HOME=/home/admin/apache-phoenix-4.9.0-HBase-1.2-bin
export PATH=PHOENIX_HOME/bin:$PATH

export HBASE_HOME=/usr/hdp/current/hbase-client
export PATH=HBASE_HOME/bin:$PATH

export ZOOKEEPER_HOME=/usr/hdp/current/zookeeper-client
export PATH=ZOOKEEPER_HOME/bin:$PATH

export PYSPARK_PYTHON=/root/anaconda2/bin/python
export PYTHON_HASHSEED=0
export SPARK_YARN_USER_ENV=PYTHONHASHSEED=0

export CQLSH_NO_BUNDLED=true

Now you are almost done. Just try to activate your shell once you set the environment variable.


[root@namenode ~]# source ~/.bashrc
[root@namenode ~]# echo $SPARK_HOME
/root/spark-1.6.1-bin-hadoop2.6

Now you are ready to start your spark single node cluster

[root@namenode ~]# pyspark
Python 2.7.12 |Anaconda custom (64-bit)| (default, Jul  2 2016, 17:42:40)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Anaconda is brought to you by Continuum Analytics.
Please check out: http://continuum.io/thanks and https://anaconda.org
17/03/30 11:51:43 INFO spark.SparkContext: Running Spark version 1.6.1
17/03/30 11:51:45 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/03/30 11:51:46 INFO spark.SecurityManager: Changing view acls to: root
17/03/30 11:51:46 INFO spark.SecurityManager: Changing modify acls to: root
17/03/30 11:51:46 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
17/03/30 11:51:50 INFO util.Utils: Successfully started service 'sparkDriver' on port 33652.
17/03/30 11:51:58 INFO storage.BlockManagerMaster: Registered BlockManager
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/

Using Python version 2.7.12 (default, Jul  2 2016 17:42:40)
SparkContext available as sc, HiveContext available as sqlContext.
>>>

Now try to play with your spark single node cluster with shell mode. Here is an example

>>> lines = sc.textFile("file:///home/admin/biketrips.csv")
17/03/30 11:56:16 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 147.2 KB, free 147.2 KB)
17/03/30 11:56:17 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 16.4 KB, free 163.6 KB)
17/03/30 11:56:18 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:43822 (size: 16.4 KB, free: 511.1 MB)
17/03/30 11:56:18 INFO spark.SparkContext: Created broadcast 0 from textFile at NativeMethodAccessorImpl.java:-2

>>> lines.count()
.
.
.
17/03/30 12:07:47 INFO scheduler.TaskSetManager: Finished task 46.0 in stage 0.0 (TID 46) in 44661 ms on localhost (48/48)
17/03/30 12:07:47 INFO scheduler.DAGScheduler: ResultStage 0 (count at :1) finished in 592.768 s
17/03/30 12:07:47 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
17/03/30 12:07:47 INFO scheduler.DAGScheduler: Job 0 finished: count at :1, took 593.280470 s
15243514

top - 12:05:27 up 16:03,  2 users,  load average: 51.00, 37.08, 17.84
Tasks: 542 total,  21 running, 521 sleeping,   0 stopped,   0 zombie
%Cpu(s): 47.1 us, 47.1 sy,  0.0 ni,  0.0 id,  0.0 wa,  0.0 hi,  5.9 si,  0.0 st
KiB Mem : 21528888 total, 13107932 free,  4306892 used,  4114064 buff/cache
KiB Swap: 12518396 total, 12518396 free,        0 used. 16856396 avail Mem

   PID USER      PR  NI    VIRT    RES    SHR S  %CPU %MEM     TIME+ COMMAND
   266 root      rt   0       0      0      0 S  62.5  0.0   1:38.34 watchdog/0
   272 root      rt   0       0      0      0 S  62.1  0.0   2:12.52 watchdog/2
   267 root      rt   0       0      0      0 S  61.9  0.0   2:23.35 watchdog/1
   277 root      rt   0       0      0      0 S  45.8  0.0   1:45.95 watchdog/3
101271 root      20   0 4754184 596064  25836 S  34.8  2.8   6:28.83 java
 98143 hdfs      20   0 2942032 381484  25220 S  15.4  1.8   2:27.38 java
  2735 mongod    20   0  374404  68264   5684 R   7.3  0.3   8:48.23 mongod
 99025 yarn      20   0 2968840 565144  26924 S   7.1  2.6   3:31.46 java

Multi node standalone spark cluster

Now you need to copy spark-hadoop tar file to your worker node where you want to run your spark as a worker. I have used datanode1, datanode2 and datanode3 for my spark cluster worker.

[root@namenode ~]# scp -r /root/spark-1.6.1-bin-hadoop-2.6/* datanode1:/root/
[root@namenode admin]# ssh datanode1
Last login: Wed Mar 29 15:31:54 2017 from namenode.selise.ch
[root@datanode1 ~]# cd /root/spark-1.6.1-bin-hadoop2.6/
[root@datanode1 spark-1.6.1-bin-hadoop2.6]# ls -la
total 1424
drwxr-xr-x. 14  500  500    4096 Feb 16 18:34 .
dr-xr-x---. 20 root root    4096 Mar 27 13:33 ..
drwxr-xr-x.  2  500  500    4096 Feb 27  2016 bin
-rw-r--r--.  1  500  500 1343562 Feb 27  2016 CHANGES.txt
drwxr-xr-x.  2  500  500    4096 Feb 27  2016 conf
drwxr-xr-x.  3  500  500    4096 Feb 27  2016 data
drwxr-xr-x.  3  500  500    4096 Feb 27  2016 ec2
drwxr-xr-x.  3  500  500    4096 Feb 27  2016 examples
drwxr-xr-x.  2  500  500    4096 Feb 27  2016 lib
-rw-r--r--.  1  500  500   17352 Feb 27  2016 LICENSE
drwxr-xr-x.  2  500  500    4096 Feb 27  2016 licenses
drwxr-xr-x.  2 root root    4096 Mar 27 13:41 logs
-rw-r--r--.  1  500  500   23529 Feb 27  2016 NOTICE
drwxr-xr-x.  6  500  500    4096 Feb 27  2016 python
drwxr-xr-x.  3  500  500    4096 Feb 27  2016 R
-rw-r--r--.  1  500  500    3359 Feb 27  2016 README.md
-rw-r--r--.  1  500  500     120 Feb 27  2016 RELEASE
drwxr-xr-x.  2  500  500    4096 Feb 27  2016 sbin
drwxr-xr-x. 85 root root    4096 Mar 22 12:43 work

Now do the same things for datanode2 and datanode3 as well. I did the same thing for datanode2 as well now you are ready to start your spark master node, so you have to go to your spark master node. In my local environment my local spark master pc name is namenode. Another thing is that I have also run a worker in my master pc as well.

[root@namenode spark-1.6.1-bin-hadoop2.6]# ./sbin/start-master.sh
starting org.apache.spark.deploy.master.Master, logging to /root/spark-1.6.1-bin-hadoop2.6/logs/spark-root-org.apache.spark.deploy.master.Master-1-namenode.selise.ch.out

spark worker in master pc
[root@namenode spark-1.6.1-bin-hadoop2.6]# ./sbin/start-slave.sh spark://namdnode.selise.ch:7077
starting org.apache.spark.deploy.worker.Worker, logging to /root/spark-1.6.1-bin-hadoop2.6/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-namenode.selise.ch.out

Now you are ready to start your spark worker in datanode1

[root@datanode1 spark-1.6.1-bin-hadoop2.6]# ./sbin/start-slave.sh spark://namenode.selise.ch:7077
starting org.apache.spark.deploy.worker.Worker, logging to /root/spark-1.6.1-bin-hadoop2.6/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-datanode1.selise.ch.out

Here is the datanode2

[root@namenode admin]# ssh datanode2
Last login: Wed Mar 29 15:46:09 2017 from namenode.selise.ch
[root@datanode2 ~]# cd spark-1.6.1-bin-hadoop2.6/
[root@datanode2 spark-1.6.1-bin-hadoop2.6]# ./sbin/start-slave.sh spark://namenode.selise.ch:7077
starting org.apache.spark.deploy.worker.Worker, logging to /root/spark-1.6.1-bin-hadoop2.6/logs/spark-root-org.apache.spark.deploy.worker.Worker-1-datanode2.selise.ch.out
[root@datanode2 spark-1.6.1-bin-hadoop2.6]#

Do the same things for datanode3 as well

now you are almost done your spark standalone cluster.
Here is a picture for 4 node spark standalone cluster

Here is picture for 3 node spark standalone cluster

You could also use your spark cluster when you are using spark shell, Here is the way you can do it

[root@namenode spark-1.6.1-bin-hadoop2.6]# pyspark --master spark://namenode.selise.ch:7077
Python 2.7.12 |Anaconda custom (64-bit)| (default, Jul  2 2016, 17:42:40)
[GCC 4.4.7 20120313 (Red Hat 4.4.7-1)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Anaconda is brought to you by Continuum Analytics.
Please check out: http://continuum.io/thanks and https://anaconda.org
17/03/30 12:30:44 INFO spark.SparkContext: Running Spark version 1.6.1
17/03/30 12:30:44 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/03/30 12:30:44 INFO spark.SecurityManager: Changing view acls to: root
17/03/30 12:30:44 INFO spark.SecurityManager: Changing modify acls to: root
17/03/30 12:30:44 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
17/03/30 12:30:45 INFO util.Utils: Successfully started service 'sparkDriver' on port 34680.
17/03/30 12:30:45 INFO slf4j.Slf4jLogger: Slf4jLogger started
.
.
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.6.1
      /_/

Using Python version 2.7.12 (default, Jul  2 2016 17:42:40)
SparkContext available as sc, HiveContext available as sqlContext.
>>> 17/03/30 12:31:03 INFO cluster.SparkDeploySchedulerBackend: Registered executor NettyRpcEndpointRef(null) (datanode1.selise.ch:59560) with ID 0
17/03/30 12:31:03 INFO cluster.SparkDeploySchedulerBackend: Registered executor NettyRpcEndpointRef(null) (datanode2.selise.ch:41570) with ID 1
17/03/30 12:31:03 INFO storage.BlockManagerMasterEndpoint: Registering block manager datanode1.selise.ch:41001 with 511.1 MB RAM, BlockManagerId(0, datanode1.selise.ch, 41001)
17/03/30 12:31:03 INFO storage.BlockManagerMasterEndpoint: Registering block manager datanode2.selise.ch:37377 with 511.1 MB RAM, BlockManagerId(1, datanode2.selise.ch, 37377)
>>>

will be continued …

How to automate your pyspark word count program using luigi ?

Suppose you have a word count management program which is PySparkWordCount.py and a python file wordcount.py which is given below.

And Here is my clint.cfg file for luigi configuration

[spark]
spark-submit: /root/spark-1.6.1-bin-hadoop2.6/bin/spark-submit
master: spark://namenode.selise.ch:7077

PySparkWordCount.py

import luigi
from luigi.s3 import S3Target
from luigi.contrib.spark import SparkSubmitTask, PySparkTask
import luigi.contrib.hadoop
import luigi.contrib.hdfs

class PySparkWordCount(SparkSubmitTask):
    """
    This task is the same as :py:class:`InlinePySparkWordCount` above but uses
    an external python driver file specified in :py:meth:`app`
    It runs a :py:class:`luigi.contrib.spark.SparkSubmitTask` task
    over the target data in :py:meth:`wordcount.input` (a file in S3) and
    writes the result into its :py:meth:`wordcount.output` target (a file in S3).
    This class uses :py:meth:`luigi.contrib.spark.SparkSubmitTask.run`.
    Example luigi configuration::
        [spark]
        spark-submit: /usr/local/spark/bin/spark-submit
        master: spark://spark.example.org:7077
        deploy-mode: client
    """
    driver_memory = '2g'
    executor_memory = '3g'
    total_executor_cores = luigi.IntParameter(default=100, significant=False)

    name = "PySpark Word Count"
    app = 'wordcount.py'

    def app_options(self):
        # These are passed to the Spark main args in the defined order.
        return [self.input().path, self.output().path]

    def input(self):
        return luigi.contrib.hdfs.HdfsTarget("hdfs://datanode1.selise.ch/home/admin/luigi/wordcount.input")

    def output(self):
        return luigi.contrib.hdfs.HdfsTarget('hdfs://datanode1.selise.ch/home/admin/luigi/wordcount.output')

if __name__ == '__main__':
        luigi.run()

Wordcount.py

import sys
from pyspark import SparkContext

if __name__ == "__main__":
    sc = SparkContext()
    sc.textFile(sys.argv[1]) \
      .flatMap(lambda line: line.split()) \
      .map(lambda word: (word, 1)) \
      .reduceByKey(lambda a, b: a + b) \
      .saveAsTextFile(sys.argv[2])

Please open your terminal and run the command for automation

[root@namenode wordcount]# time python PySparkWordCount.py PySparkWordCount --local-scheduler
No handlers could be found for logger "luigi-interface"
DEBUG: Checking if PySparkWordCount() is complete
/root/anaconda2/lib/python2.7/site-packages/luigi/parameter.py:259: UserWarning: Parameter None is not of type string.
  warnings.warn("Parameter {0} is not of type string.".format(str(x)))
DEBUG: Running file existence check: hadoop fs -stat hdfs://datanode1.selise.ch/home/admin/luigi/wordcount.output
INFO: Informed scheduler that task   PySparkWordCount__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 51836] Worker Worker(salt=818063944, workers=1, host=namenode.selise.ch, username=root, pid=51836) running   PySparkWordCount()
INFO: Running command: /root/spark-1.6.1-bin-hadoop2.6/bin/spark-submit --master spark://namenode.selise.ch:7077 --name PySpark Word Count --driver-memory 2g --executor-memory 3g --total-ex$
cutor-cores 100 wordcount.py hdfs://datanode1.selise.ch/home/admin/luigi/wordcount.input hdfs://datanode1.selise.ch/home/admin/luigi/wordcount.output
INFO: [pid 51836] Worker Worker(salt=818063944, workers=1, host=namenode.selise.ch, username=root, pid=51836) done      PySparkWordCount()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   PySparkWordCount__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=818063944, workers=1, host=namenode.selise.ch, username=root, pid=51836) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 PySparkWordCount(total_executor_cores=100)

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====


real    0m17.074s
user    0m14.077s
sys     0m5.391s

Real time streaming using Spark streaming, Kafka and Pyspark

Now a day real time streaming is a one of the most challenging task in our data industries. It might be sensor data, fleet tacking, traffic, cellular, website real time streaming or web crawling. You might need to analyze the data on your real time for your application or end user needs.

Spark stream and storm are kind of technologies which will help you a lot for your real time streaming purposes. My choice is spark streaming for real time streaming.

Here I have implemented a basic streaming application which will cover the Spark streaming, kafka and a basic crawling using python.

will be continued …

Historical cricket data download and analysis.

Today I’m going to give you some idea how to download data from espncricinfo.com for your experimental works. I have implemented a basic python script using pandas html data reader. Where I just used a url and a iterator for different types of pages.

import panda as pd
from lxml import html

for x in range(1, 4009):
    url1 = "http://stats.espncricinfo.com/ci/engine/stats/index.html?class=11;page=" + str(x) + ";template=results;type=batting;view=innings"

    data1 = pd.read_html(url1)

    df1 = data1[2]
    df1.to_csv(path_or_buf='/Users/mohiulalamprince/work/python/cricstat/player-info-%s.csv' % '{:05d}'.format(x), sep=',')
    print str(x) + "  =>  [DONE]"

Here I have downloaded the data from year 1877 to February 26th, 2017. I have hosted the data for your analysis in my amazon s3 cricstat bucket. Here is the downloaded location where you can be able to download the data for your convenience.

https://s3-ap-southeast-1.amazonaws.com/cricstat/player-info.csv
https://s3-ap-southeast-1.amazonaws.com/cricstat/player-info.tar.gz

Here is my first analysis where I have tried to find out the highest number of run in all over the world using spark bigdata in memory based framework. You could follow my tutorial
Jupyter notebook pyspark installation guide. If you don’t know about spark then follow my spark setup guide.

from pyspark import SQLContext
from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.sql.functions import udf

sc = SparkContext()
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('/Users/mohiulalamprince/work/python/cs/player-info.csv').cache()

def formatting_unbitten_and_dnb(run):
    if run.endswith('*'):
        return run[:-1]
    elif (run == 'TDNB' or run == 'DNB'):
        return 0
    return run.strip()

ignore_dnb = udf(formatting_unbitten_and_dnb, StringType())
df = df.withColumn("Runs", ignore_dnb("Runs"))
df.registerTempTable('players')
sqlContext.sql("select Player, sum(Inns), count(Inns), sum(Runs) as total_runs from players group by Player order by total_runs desc").show()

Now here is your analysis output

player-by-runs

How you can calculate the total number of players in between 1877 to 2017 year

from pyspark import SQLContext
from pyspark import SparkContext

sc = SparkContext()
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('/Users/mohiulalamprince/work/python/cricstat/player-info-[0-9]*')
df.registerTempTable('players')

Here is a another cricket data analysis from 2005 to 2017 where we will calculate the highest run taker during this time frame.

data location : https://s3-ap-southeast-1.amazonaws.com/cricstat/player-ball-faced-2005to2017.csv.tar.gz
Here is the command what you need to start your task.


Now you can see that spark is running

Now We need to load the csv data from 2005 to 2017 for our analysis. I have used the databricks csv loader for my processing.

runsByPlayer = df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load("/Users/mohiulalamprince/work/p
ython/player-ball-faced-2005to2017.csv")

And I have used sparkSQL for analysis. SparkSQL is very powerfull, It’s basically accept the sql command.

sqlContext.sql("select player_one, sum(runs) as total_runs from stat group by player_one order by total_runs desc").show()

And Here is our result

few statistics

Here is my gitbub location for data location and technical source code. You could use it for your analysis.

https://github.com/mohiulalamprince/cricket

How to configure Jupyter notebook using pyspark ?

Step 1: First you need to install Jupyter notebook using below this command:

pip install jupyter
pip install "ipython[notebook]"

Step 2: Then you need to setup your environment variable in ~/.bashrc

export SPARK_HOME="/usr/hdp/current/spark-client"
export PYSPARK_SUBMIT_ARGS="--master local[2]"

Step 3: Then type in your terminal below this command

jupyter notebook --generate-config

Step 4: Now you need to open this file using vim or text editor

                   /root/.jupyter/jupyter_notebook_config.py

                   and paste this below lines
                   c.NotebookApp.ip = '*'
                   c.NotebookApp.open_browser = False
                   c.NotebookApp.port = 8889
                   c.NotebookApp.notebook_dir = u'/usr/hdp/current/spark-client/'

Step 5: And then create a file name “00-pyspark-setup.py” in “/root/.ipython/profile_default/” location and paste this given text

import os
import sys

spark_home = os.environ.get('SPARK_HOME', None)
sys.path.insert(0, spark_home + "/python")

sys.path.insert(0, os.path.join(spark_home, 'python/lib/py4j-0.8.2.1-src.zip'))

pyspark_submit_args = os.environ.get("PYSPARK_SUBMIT_ARGS", "")
if not "pyspark-shell" in pyspark_submit_args: pyspark_submit_args += " pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = pyspark_submit_args

execfile(os.path.join(spark_home, 'python/pyspark/shell.py'))

Step 6: Now type in your terminal “jupyter notebook” command

And enjoy jupyter notebook using pyspark