How to automate your pyspark word count program using luigi ?

Suppose you have a word count management program which is PySparkWordCount.py and a python file wordcount.py which is given below.

And Here is my clint.cfg file for luigi configuration

[spark]
spark-submit: /root/spark-1.6.1-bin-hadoop2.6/bin/spark-submit
master: spark://namenode.selise.ch:7077

PySparkWordCount.py

import luigi
from luigi.s3 import S3Target
from luigi.contrib.spark import SparkSubmitTask, PySparkTask
import luigi.contrib.hadoop
import luigi.contrib.hdfs

class PySparkWordCount(SparkSubmitTask):
    """
    This task is the same as :py:class:`InlinePySparkWordCount` above but uses
    an external python driver file specified in :py:meth:`app`
    It runs a :py:class:`luigi.contrib.spark.SparkSubmitTask` task
    over the target data in :py:meth:`wordcount.input` (a file in S3) and
    writes the result into its :py:meth:`wordcount.output` target (a file in S3).
    This class uses :py:meth:`luigi.contrib.spark.SparkSubmitTask.run`.
    Example luigi configuration::
        [spark]
        spark-submit: /usr/local/spark/bin/spark-submit
        master: spark://spark.example.org:7077
        deploy-mode: client
    """
    driver_memory = '2g'
    executor_memory = '3g'
    total_executor_cores = luigi.IntParameter(default=100, significant=False)

    name = "PySpark Word Count"
    app = 'wordcount.py'

    def app_options(self):
        # These are passed to the Spark main args in the defined order.
        return [self.input().path, self.output().path]

    def input(self):
        return luigi.contrib.hdfs.HdfsTarget("hdfs://datanode1.selise.ch/home/admin/luigi/wordcount.input")

    def output(self):
        return luigi.contrib.hdfs.HdfsTarget('hdfs://datanode1.selise.ch/home/admin/luigi/wordcount.output')

if __name__ == '__main__':
        luigi.run()

Wordcount.py

import sys
from pyspark import SparkContext

if __name__ == "__main__":
    sc = SparkContext()
    sc.textFile(sys.argv[1]) \
      .flatMap(lambda line: line.split()) \
      .map(lambda word: (word, 1)) \
      .reduceByKey(lambda a, b: a + b) \
      .saveAsTextFile(sys.argv[2])

Please open your terminal and run the command for automation

[root@namenode wordcount]# time python PySparkWordCount.py PySparkWordCount --local-scheduler
No handlers could be found for logger "luigi-interface"
DEBUG: Checking if PySparkWordCount() is complete
/root/anaconda2/lib/python2.7/site-packages/luigi/parameter.py:259: UserWarning: Parameter None is not of type string.
  warnings.warn("Parameter {0} is not of type string.".format(str(x)))
DEBUG: Running file existence check: hadoop fs -stat hdfs://datanode1.selise.ch/home/admin/luigi/wordcount.output
INFO: Informed scheduler that task   PySparkWordCount__99914b932b   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 51836] Worker Worker(salt=818063944, workers=1, host=namenode.selise.ch, username=root, pid=51836) running   PySparkWordCount()
INFO: Running command: /root/spark-1.6.1-bin-hadoop2.6/bin/spark-submit --master spark://namenode.selise.ch:7077 --name PySpark Word Count --driver-memory 2g --executor-memory 3g --total-ex$
cutor-cores 100 wordcount.py hdfs://datanode1.selise.ch/home/admin/luigi/wordcount.input hdfs://datanode1.selise.ch/home/admin/luigi/wordcount.output
INFO: [pid 51836] Worker Worker(salt=818063944, workers=1, host=namenode.selise.ch, username=root, pid=51836) done      PySparkWordCount()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   PySparkWordCount__99914b932b   has status   DONE
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
INFO: Worker Worker(salt=818063944, workers=1, host=namenode.selise.ch, username=root, pid=51836) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====

Scheduled 1 tasks of which:
* 1 ran successfully:
    - 1 PySparkWordCount(total_executor_cores=100)

This progress looks :) because there were no failed tasks or missing external dependencies

===== Luigi Execution Summary =====


real    0m17.074s
user    0m14.077s
sys     0m5.391s

Leave a Reply