MRJob using Amazon EMR (2 Small slave instances and 1 master node)

Today I will show you how to run a very basic map reduce word count job using AWS EMR. I have used EMR 4.8.2 and hadoop 2.7.3. Most of the time I actually used the hortonworks distribution for my works. But Today I will show you the amazon EMR.

First of all you need to create the security credential from your amazon account. If you don’t have any amazon account then don’t hesitate, you can be able to create a amazon account using a credit card. You don’t need to be pay for the first year (But there are some limitation for usages).

To run the map reduce job in amazon you need to install python pip. And by using pip you have to install awscli and mrjob which is given below.

pip install awscli
pip install mrjob

Now its time to create the amazon security credential from my amazon account. And I have included that amazon security credential to my mrjob.conf file. What you need to do, just open a text editor and paste below this lines

runners:
  emr:
    aws_access_key_id: xxxxxxxxxxxxxxxxxxxx
    aws_secret_access_key: XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
    num_core_instances: 2
    core_instance_type: m1.small

Now your configuration is almost done. Lets start to write down the word count map reduce job using python. My program is given below which has been downloaded from amazon website.


from mrjob.job import MRJob

class MRWordFrequencyCount(MRJob):
    def mapper(self, _, line):
        yield "chars", len(line)
        yield "words", len(line.split())
        yield "lines", 1

    def reducer(self, key, values):
        yield key, sum(values)

if __name__ == '__main__':
    MRWordFrequencyCount.run()

Now lets save the code in file name mr_word_count.py and try to run the program using your terminal

The command what I have followed is given below.

python mr_word_count.py -r emr README.rst --output-dir=s3://my-mr-jobs/wc_out2/ --conf-path=mrjob.conf 

Lets see what is happening in your console. My console is given below


python mr_word_count.py -r emr README.rst --output-dir=s3://my-mr-jobs/wc_out2/ --conf[46/1229$
ob.conf                                                                                                                                                       
Using s3://mrjob-be5ff8dcbdecb973/tmp/ as our temp dir on S3
Creating temp directory /var/folders/mh/qvz8smz53njctvsp0r8_xt040000gn/T/mr_word_count.mohiulalamprince.20170813.193712.456524
Copying local files to s3://mrjob-be5ff8dcbdecb973/tmp/mr_word_count.mohiulalamprince.20170813.193712.456524/files/...
Created new cluster j-1CBMSHE0GMZHN
Waiting for step 1 of 1 (s-2E4DYAQOEV00C) to complete...
  PENDING (cluster is STARTING)
  PENDING (cluster is STARTING)
  PENDING (cluster is STARTING)
  PENDING (cluster is STARTING: Configuring cluster software)
  PENDING (cluster is STARTING: Configuring cluster software)
  PENDING (cluster is BOOTSTRAPPING: Running bootstrap actions)
  PENDING (cluster is BOOTSTRAPPING: Running bootstrap actions)
  PENDING (cluster is BOOTSTRAPPING: Running bootstrap actions)
  PENDING (cluster is BOOTSTRAPPING: Running bootstrap actions)
  PENDING (cluster is BOOTSTRAPPING: Running bootstrap actions)
  PENDING (cluster is BOOTSTRAPPING: Running bootstrap actions)
  PENDING (cluster is BOOTSTRAPPING: Running bootstrap actions)
Terminating cluster: j-1CBMSHE0GMZHN

Now its time to see the amazon console, what is happening over there. Here is my console snap shot which is given below.

EMR Cluster list

EMR Cluster configuration

aws s3 ls s3://my-mr-jobs/wc_out2/
2017-08-14 01:49:20         12 part-00000

S3 output location

EMR Cluster list

aws emr list-clusters 
{
    "Clusters": [
        {
            "Status": {
                "Timeline": {
                    "ReadyDateTime": 1502653570.991, 
                    "CreationDateTime": 1502653079.879, 
                    "EndDateTime": 1502653805.849
                }, 
                "State": "TERMINATED", 
                "StateChangeReason": {
                    "Message": "Terminated by user request", 
                    "Code": "USER_REQUEST"
                }
            }, 
            "NormalizedInstanceHours": 4, 
            "Id": "j-1CBMSHE0GMZHN", 
            "Name": "mr_word_count.mohiulalamprince.20170813.193712.456524"
        }, 
        {
            "Status": {
                "Timeline": {
                    "ReadyDateTime": 1502650935.456, 
                    "CreationDateTime": 1502650470.064, 
                    "EndDateTime": 1502651081.962
                }, 
                "State": "TERMINATED", 
                "StateChangeReason": {
                    "Message": "Terminated by user request", 
                    "Code": "USER_REQUEST"
                }
            }, 
            "NormalizedInstanceHours": 2, 
            "Id": "j-1VLKU9ZZINT6L", 
            "Name": "mr_word_count.mohiulalamprince.20170813.185301.918036"
        }
    ]
}

The job is completed and terminated

Output result

Now lets give a try for this code snippet

from mrjob.job import MRJob
from mrjob.step import MRStep
import re

WORD_RE = re.compile(r"[\w']+")


class MRMostUsedWord(MRJob):

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_words,
                   combiner=self.combiner_count_words,
                   reducer=self.reducer_count_words),
            MRStep(reducer=self.reducer_find_max_word)
        ]

    def mapper_get_words(self, _, line):
        # yield each word in the line
        for word in WORD_RE.findall(line):
            yield (word.lower(), 1)

    def combiner_count_words(self, word, counts):
        # optimization: sum the words we've seen so far
        yield (word, sum(counts))

    def reducer_count_words(self, word, counts):
        # send all (num_occurrences, word) pairs to the same reducer.
        # num_occurrences is so we can easily use Python's max() function.
        yield None, (sum(counts), word)

    # discard the key; it is just None
    def reducer_find_max_word(self, _, word_count_pairs):
        # each item of word_count_pairs is (count, word),
        # so yielding one results in key=counts, value=word
        yield max(word_count_pairs)


if __name__ == '__main__':
    MRMostUsedWord.run()

Now its time to run a map reduce program on Hortonworks cluster. Its a 3 node cluster. My master node is consist of 20GB memory and 2 other slave nodes are 16 GB memory each. Lets see how to process the movielen data set using map reduce program. I have downloaded the movie lens dataset from grouplens.org. I have downloaded the largest data set which is around 26 Million ratings.Lets see how we can find the average rating per movies.

Grouplens 26 Million dataset analysis using map reduce

Download the data from grouplens.org

wget http://files.grouplens.org/datasets/movielens/ml-latest.zip

Unzip the file

unzip ml-latest.zip && cd ml-latest

Now its time to put your ratings.csv & movies.csv file into your hdfs


hadoop dfs -mkdir /user/root/movielen
hadoop dfs -put ratings.csv /user/root/movielen/
hadoop dfs -put movies.csv /user/root/movielen/

Now lets see the average rating code.

from mrjob.job import MRJob
import re

WORD_RE = re.compile(r"[\w']+")

class MRAverageRatingPerMovie(MRJob):

    def mapper(self, _, line):
        userid = movieid = rating = timestamp = 0
        try:
                data = line.split(",")
                userid = int(data[0])
                movieid = int(data[1])
                rating = float(data[2])
                timestamp = int(data[3])
        except:
                yield 0, 0
        yield movieid, rating

    def reducer(self, movieid, ratings):
        counter = 0
        total_ratings = 0.0
        for rating in ratings:
                total_ratings += rating
                counter += 1
        yield movieid, (1.0 * total_ratings)/counter * 1.0


if __name__ == '__main__':
    MRAverageRatingPerMovie.run()

How to run your map reduce program using mrjob on top of hortonworks distribution

Don’t to be afraid. Its a very basic approach. I have paste the command below this snippet.


[root@namenode ml-latest]# python AverageRatingCountPerMovie.py \
>     -r hadoop \
>     --hadoop-streaming-jar /usr/hdp/current/hadoop-client/hadoop-streaming.jar \
>     --output hdfs://datanode1.selise.ch/user/root/movielen/output5 \
>     hdfs:///user/root/movielen/ratings.csv > log

and here is my out put log once I press the enter button

No configs found; falling back on auto-configuration
Looking for hadoop binary in $PATH...
Found hadoop binary: /usr/bin/hadoop
Using Hadoop version 2.7.1.2.4.2.0
Creating temp directory /tmp/word_count.root.20170814.122244.762965
Copying local files to hdfs:///user/root/tmp/mrjob/word_count.root.20170814.122244.762965/files/...
Running step 1 of 1...
  WARNING: Use "yarn jar" to launch YARN applications.
  packageJobJar: [] [/usr/hdp/2.4.2.0-258/hadoop/hadoop-streaming.jar] /tmp/streamjob211933095411666257.jar tmpDir=null
  Timeline service address: http://datanode2.selise.ch:8188/ws/v1/timeline/
  Timeline service address: http://datanode2.selise.ch:8188/ws/v1/timeline/
  Total input paths to process : 1
  number of splits:6
  Submitting tokens for job: job_1502454305190_0007
  Submitted application application_1502454305190_0007
  The url to track the job: http://datanode2.selise.ch:8088/proxy/application_1502454305190_0007/
  Running job: job_1502454305190_0007
  Job job_1502454305190_0007 running in uber mode : false
   map 0% reduce 0%
   map 5% reduce 0%
   map 6% reduce 0%
   map 9% reduce 0%
   map 13% reduce 0%
   map 17% reduce 0%
   map 18% reduce 0%
   map 20% reduce 0%
   map 26% reduce 0%
   map 27% reduce 0%
   map 28% reduce 0%
   map 30% reduce 0%
   map 31% reduce 6%
   map 32% reduce 6%
   map 34% reduce 6%
   map 35% reduce 6%
   map 36% reduce 6%
   map 38% reduce 6%
   map 39% reduce 6%
   map 41% reduce 6%
   map 42% reduce 6%
   map 43% reduce 6%
   map 44% reduce 6%
   map 45% reduce 6%
   map 46% reduce 6%
   map 47% reduce 6%
   map 49% reduce 6%
   map 51% reduce 6%
   map 53% reduce 6%
   map 54% reduce 6%
   map 56% reduce 6%
   map 57% reduce 6%
   map 62% reduce 6%
   map 63% reduce 6%
   map 64% reduce 6%
   map 65% reduce 11%
   map 66% reduce 11%
   map 67% reduce 11%
   map 68% reduce 11%
   map 69% reduce 11%
   map 70% reduce 11%
   map 71% reduce 11%
   map 72% reduce 11%
   map 73% reduce 11%
   map 74% reduce 11%
   map 75% reduce 11%
   map 76% reduce 11%
   map 77% reduce 11%
   map 89% reduce 11%
   map 94% reduce 11%
   map 100% reduce 11%
   map 100% reduce 17%
   map 100% reduce 22%
   map 100% reduce 28%
   map 100% reduce 45%
   map 100% reduce 59%
   map 100% reduce 67%
   map 100% reduce 68%
   map 100% reduce 69%
   map 100% reduce 70%
   map 100% reduce 71%
   map 100% reduce 72%
   map 100% reduce 73%
   map 100% reduce 74%
   map 100% reduce 75%
   map 100% reduce 76%
   map 100% reduce 77%
   map 100% reduce 78%
   map 100% reduce 79%
   map 100% reduce 80%
   map 100% reduce 81%
   map 100% reduce 82%
   map 100% reduce 83%
   map 100% reduce 84%
   map 100% reduce 85%
   map 100% reduce 86%
   map 100% reduce 87%
   map 100% reduce 88%
   map 100% reduce 89%
   map 100% reduce 90%
   map 100% reduce 91%
   map 100% reduce 92%
   map 100% reduce 93%
   map 100% reduce 94%
   map 100% reduce 95%
   map 100% reduce 96%
   map 100% reduce 97%
   map 100% reduce 98%
   map 100% reduce 99%
   map 100% reduce 100%
  Job job_1502454305190_0007 completed successfully
  Output directory: hdfs://datanode1.selise.ch/user/root/movielen/output5
Counters: 49
        File Input Format Counters
                Bytes Read=710205687
        File Output Format Counters
                Bytes Written=808301
        File System Counters
                FILE: Number of bytes read=284899443
                FILE: Number of bytes written=570779561
                FILE: Number of large read operations=0
                FILE: Number of read operations=0
                FILE: Number of write operations=0
                HDFS: Number of bytes read=710206371
                HDFS: Number of bytes written=808301
                HDFS: Number of large read operations=0
                HDFS: Number of read operations=21
                HDFS: Number of write operations=2
        Job Counters
                Data-local map tasks=6
                Launched map tasks=6
                Launched reduce tasks=1
                Total megabyte-seconds taken by all map tasks=972241920
                Total megabyte-seconds taken by all reduce tasks=533227520
                Total time spent by all map tasks (ms)=632970
                Total time spent by all maps in occupied slots (ms)=1265940
                Total time spent by all reduce tasks (ms)=260365
                Total time spent by all reduces in occupied slots (ms)=520730
                Total vcore-seconds taken by all map tasks=632970
                Total vcore-seconds taken by all reduce tasks=260365
        Map-Reduce Framework
                CPU time spent (ms)=727900
                Combine input records=0
                Combine output records=0
                Failed Shuffles=0
                GC time elapsed (ms)=5188
                Input split bytes=684
                Map input records=26024290
                Map output bytes=232850855
                Map output materialized bytes=284899473
                Map output records=26024291
                Merged Map outputs=6
                Physical memory (bytes) snapshot=7586971648
                Reduce input groups=45116
                Reduce input records=26024291
                Reduce output records=45116
                Reduce shuffle bytes=284899473
                Shuffled Maps =6
                Spilled Records=52048582
                Total committed heap usage (bytes)=7692353536
                Virtual memory (bytes) snapshot=23136010240
        Shuffle Errors
                BAD_ID=0
                CONNECTION=0
                IO_ERROR=0
                WRONG_LENGTH=0
                WRONG_MAP=0
                WRONG_REDUCE=0
Streaming final output from hdfs://datanode1.selise.ch/user/root/movielen/output5...
Removing HDFS temp directory hdfs:///user/root/tmp/mrjob/word_count.root.20170814.122244.762965...
Removing temp directory /tmp/word_count.root.20170814.122244.762965...

Here is the std output

[root@namenode ml-latest]# head -n 50 log
0       0.0
1       3.8881574960610834
10      3.431840536054589
100     3.232303877366997
1000    3.0990783410138247
100001  4.0
100003  3.6666666666666665
100006  3.3333333333333335
100008  3.7142857142857144
100010  2.522727272727273
100013  3.4444444444444446
100015  2.5
100017  3.1
100032  3.6
100034  3.28125
100036  3.388888888888889
100038  3.4545454545454546
100040  3.0
100042  3.3333333333333335
100044  4.271573604060913
100046  3.769230769230769
100048  3.1666666666666665
100050  3.5
100052  2.0454545454545454
100054  3.75
100056  2.55
100058  3.2045454545454546
100060  3.576923076923077
100062  3.716666666666667
100068  3.6265060240963853
100070  3.5416666666666665
100072  2.875
100075  2.8157894736842106
100077  2.3333333333333335
100079  1.5
100081  3.7
100083  2.3055555555555554
100085  3.5
100087  2.8823529411764706
100089  3.6707317073170733
100091  3.0
100093  2.889830508474576
100096  3.125
100099  4.1
1001    2.8255813953488373
100101  4.0
100103  0.5
100106  3.7413793103448274
100108  3.16815144766147
100131  1.5

4 thoughts on “MRJob using Amazon EMR (2 Small slave instances and 1 master node)”

Leave a Reply