Geographical nearest neighbor using bigdata technologies

One of my complex problem in my blog. Yap its a little bit complex when you are dealing with bigdata for your specific problem. The problem you need to basically solve a nearest particle for a specific points. Then number of particle and number of points can be large. So obviously you have to handle this things using BigData, GIS, SQL, and some other algorithmic knowledge.

First of all I need to explain what is rtree. What purposes do we need to use this tree? How the problem can be solved and what kind of problem.

R-tree was Invented by Antonin Guttman in 1984 which is also my birth year :-D. So I am gonna explain this tree right at this moment.

R-trees are tree data structures used for spatial access methods, i.e., for indexing multi-dimensional information such as geographical coordinates, rectangles or polygons. A common real-world usage for an R-tree might be to store spatial objects such as restaurant locations or the polygons that typical maps are made of: streets, buildings, outlines of lakes, coastlines, etc. and then find answers quickly to queries such as “Find all museums within 2 km of my current location”, “retrieve all road segments within 2 km of my location” (to display them in a navigation system) or “find the nearest gas station” (although not taking roads into account). The R-tree can also accelerate nearest neighbor search for various distance metrics, including great-circle distance – wikipedia

Here is sample code snippet for my bigdata geo spatial nearest neighbor search algorithm. What I have used in here. First of all I have used python library shapely, rtree and anaconda package manager and few bigdata tools which are spark cluster, hadoop hdfs file systems
which is running over 3 node cluster, I have used 1 driver node and 3 slave node for my spark and hadoop cluster. When I run the script on hadoop cluster using spark at that time the task was not that much easy. I had to handle lots of things to run the cluster properly. I have used hortonworks hadoop distribution for my bigdata platform.

from shutil import rmtree
from tempfile import mkdtemp

from pyspark import SparkFiles
from pyspark.sql.types import StringType
from rtree import index
from shapely.geometry import asPoint
from pyspark import SparkContext
from pyspark.sql import SQLContext, Row

from  shapely import speedups
import os

sc = SparkContext()

def build_nearest_road(linestrings):
        def nearest_road(x, y):
                from shapely.geometry import asPoint
                from rtree import index
                from  shapely import speedups
                idx = index.Index(SparkFiles.get('index.idx')[:-4])
                try:
                        min_name = "NOT FOUND"
                        min_distance = 2000000000.0
                        min_linestring = "NOT FOUND"
                        for fid in idx.nearest((float(x), float(y), float(x), float(y)), 20):
                                speedups.enable()
                                point = asPoint([float(x), float(y)])
                                (name, linestring) = road_broadcast.value[fid]


                                from shapely.geometry import LineString
                                linestring = LineString(linestring)
                                distance = point.distance(linestring)
                                if min_distance > distance:
                                        min_distance = distance
                                        min_name = name
                                        #min_linestring = linestring
                        return min_name
                except:
                        import sys
                        print str(sys.exc_info()[1])
                        return "NOT FOUND"
                        #if point.intersects(polygon):
                        #       return name
                return "NOT FOUND"

        from shapely.geometry import Point
        from shapely.geometry import Polygon
        from shapely.geometry import LineString
        from shapely.geometry import LineString, mapping, shape
        from shapely.wkt import loads
        linestrings_list = []
        for (name, linestring_wkt) in linestrings:
                linestring = loads(linestring_wkt)
                #linestring = shape(mapping(linestring))
                linestring = LineString(linestring.coords)
                linestrings_list.append((name, linestring))

        tempdir = mkdtemp()
        basename = os.path.join(tempdir, 'index')

        #try:
        idx = index.Index(basename, ((fid, linestring.bounds, None)
                for (fid, (name, linestring)) in enumerate(linestrings_list)))
        idx.close()
        sc.addFile(basename + '.idx')
        sc.addFile(basename + '.dat')
        #finally:
        #       rmtree(tempdir)

        road_broadcast = sc.broadcast(linestrings_list)
        return nearest_road

roads = sc.textFile("hdfs://datanode1.selise.ch/user/admin/geodjango/roads/part-m-00[0-9]*")
parts_list = roads.map(lambda l: l.split("LINESTRING"))
roads = parts_list.map(lambda parts: (parts[0], "LINESTRING" + parts[1]))
roads = roads.toLocalIterator()

sqlContext = SQLContext(sc)

zurich_waypoints = sc.textFile("hdfs://datanode1.selise.ch/user/admin/waypoints_in_zurich_2w/part-m-00[0-9]*")
zurich_waypoints = zurich_waypoints.map(lambda l: l.split(","))
zurich_waypoints = zurich_waypoints.map(lambda parts: Row(longitude=parts[3], latitude=parts[4], tripid=parts[0], CaptureDateHour=parts[1]))
zurich_waypoints = sqlContext.createDataFrame(zurich_waypoints)
zurich_waypoints.registerTempTable("zurich_waypoints")

sqlContext.registerFunction('getNearestRoad', build_nearest_road(roads))

df = sqlContext.sql("select getNearestRoad(zw.latitude, zw.longitude), zw.* from zurich_waypoints zw")
df.write.format('com.databricks.spark.csv').mode('overwrite').save('/home/admin/pyspark/zurich-road-waypoints-yarn-cluster.csv')

Now lets discuss the coding. Here you can see that I have used a python nested function for spark sql context. I have tried to build my road network in rtree as a spark udf function which will run as a distributed geo rtree over 3 hadoop node. That means you can query any road info into the rtree using spark sql context and the search functionality will work over multiple pcs. I have used a threshold value in rtree for over lapping or collusion detection for my road network. I have found that when a rtree form using road networks its not always the nearest road for a specific point. Here is the picture you might consider for your understanding the thoughts.

You could see that red is the point when rtree form using road networks it will return the R2 road but originally the nearest neighbor is R1. So I have considered a threshold value to over come the problems.

Here is a small sample data for testing the script

Near about 10Million data for testing

All data nodes are running.

Cpu usages 100%

You can see that datanode1.selise.ch, datanode2.selise.ch, namenode.selise.ch are running

All nodes are running

The article is not completed. I will explain

Why Hive on top of Hadoop spatial framework is not good for this kind of analysis

Why postgis is not suitable for this kind of analysis

Continued …