vlambda博客
学习文章列表

第二天 Mapreduce WordCount问题及解决

讲一讲我运行MapReduce 中的Word Count遇到的坑,代码使用py2

map.py

import sys

for line in sys.stdin:

    ss = line.strip().split('')

    for word in ss:

                print'\t'.join([word.strip(), '1'])

red.py

import sys

cur_word = None

sum = 0

for line in sys.stdin:

    ss =line.strip().split('\t')

    if len(ss) != 2:

                continue

    word, cnt = ss 

    if cur_word == None:

                cur_word = word

    if cur_word != word:

                print'\t'.join([cur_word, str(sum)])

                cur_word = word

                sum = 0

    sum += int(cnt)

print '\t'.join([cur_word, str(sum)])

Run.sh

HADOOP_CMD="/usr/local/src/hadoop-1.2.1/bin/hadoop"

STREAM_JAR_PATH="/usr/local/src/hadoop-1.2.1/contrib/streaming/hadoop-streaming-1.2.1.jar"

 

#INPUT_FILE_PATH_1="/The_Man_of_Property.txt"

INPUT_FILE_PATH_1="/1.data"

OUTPUT_PATH="/output"

 

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

 

# Step 1.

$HADOOP_CMD jar $STREAM_JAR_PATH \

    -input $INPUT_FILE_PATH_1\

    -output $OUTPUT_PATH \

    -mapper "pythonmap_new.py" \

    -reducer "pythonred_new.py" \

    -file ./map_new.py \

    -file ./red_new.py

 

将文件都上传到hadoop上去

Hadoop fs -put * /

然后执行

Bash run.sh

日志显示如下错误:

 

2020-04-25 23:37:34,198 INFO [main]org.apache.hadoop.metrics2.impl.MetricsConfig: loaded properties fromhadoop-metrics2.properties

2020-04-25 23:37:34,385 INFO [main]org.apache.hadoop.metrics2.impl.MetricsSystemImpl: Scheduled snapshot period at10 second(s).

2020-04-25 23:37:34,385 INFO [main]org.apache.hadoop.metrics2.impl.MetricsSystemImpl: MapTask metrics systemstarted

2020-04-25 23:37:34,400 INFO [main]org.apache.hadoop.mapred.YarnChild: Executing with tokens:

2020-04-25 23:37:34,400 INFO [main]org.apache.hadoop.mapred.YarnChild: Kind: mapreduce.job, Service:job_1587828984925_0001, Ident: (org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier@40005471)

2020-04-25 23:37:34,571 INFO [main]org.apache.hadoop.mapred.YarnChild: Sleeping for 0ms before retrying again. Gotnull now.

2020-04-25 23:37:35,389 INFO [main]org.apache.hadoop.mapred.YarnChild: mapreduce.cluster.local.dir for child:/usr/local/src/hadoop-2.6.5/tmp/nm-local-dir/usercache/root/appcache/application_

java.io.IOException: 断开的管道

 atjava.io.FileOutputStream.writeBytes(Native Method)

 atjava.io.FileOutputStream.write(FileOutputStream.java:326)

 at java.io.BufferedOutputStream.write(BufferedOutputStream.java:122)

 atjava.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:82)

 atjava.io.BufferedOutputStream.write(BufferedOutputStream.java:126)

 atjava.io.DataOutputStream.write(DataOutputStream.java:107)

 atorg.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72)

 atorg.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51)

 atorg.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:106)

 at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)

 atorg.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)

 atorg.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:450)

 atorg.apache.hadoop.mapred.MapTask.run(MapTask.java:343)

 atorg.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)

 atjava.security.AccessController.doPrivileged(Native Method)

 atjavax.security.auth.Subject.doAs(Subject.java:422)

 atorg.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)

 atorg.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158)

2020-04-25 23:37:38,244 WARN [main]org.apache.hadoop.streaming.PipeMapRed: java.io.IOException: 断开的管道

2020-04-25 23:37:38,248 INFO [Thread-14]org.apache.hadoop.streaming.PipeMapRed: MRErrorThread done

2020-04-25 23:37:38,249 INFO [main] org.apache.hadoop.streaming.PipeMapRed:PipeMapRed failed!

java.lang.RuntimeException: PipeMapRed.waitOutputThreads():subprocess failed with code 1

 atorg.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:322)

 atorg.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:535)

 atorg.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:120)

 atorg.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)

 atorg.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:34)

 at org.apache.hadoop.mapred.MapTask

 

解决:

上网搜索原因:PipeMapRed.waitOutputThreads(): subprocess failed with code 1

主要问题是脚本可能出现问题

1.本地运行: 可以先输入一个小的txt的文档,方便查阅,我当时就编辑了一个a.txt里面每行输入aaa    bbb ccc  aa类似的字母或数字

通过管道执行:cat a.txt | python map.py | sort-k1 |python red.py

出现map.pyred.py  tab键和空格不一致,我明明每个都输入tab键,不知道原因,为了解决这个问题,遇到tab就手动按4下空格键

运行时这个问题解决

2.接着运行,出现print后面语法错误,这个原因是未切换到python2,因为我使用python2的名称是py27tf, 输入source activatepy27tf按回车

再次运行的时候,就可以正常显示结果了

3. 接着将代码重新上传到hadoop上面

执行bash run.sh仍显示开始的错误代码,继续上网搜索,这次换个思路,hadoop2自带wordcount模板,使用网上的方法试了一下:

Hadoop jar /usr/local/src/Hadoop-2.6.5/share/hadoop/mapreduce/hadoop-mapreduce-examples-2.6.5.jar wordcount  /a.txt  /output/01

这下执行成功了

4. 代码检查好多遍,实在不行,我就自己手写一个python3的代码,重新上传到hadoop,抱着死马当活马医的态度,没想到居然执行成功了

,真的是太激动了,后来又把原先代码print后面都加(),也执行成功了,两天的调试及查找原因弄得筋疲力尽,但是看到成功的那一刻,真的感觉都值了

 

总结

后面看到别的同学也遇到类似的问题,主要是centos7自带python2.7,复制后的slave1slave2同样也自带python2.7,然后又安装anaconda3

如果只在master安装anaconda3slave1slave2不安装的话,不影响,原因如下:

1) master上面的RM只负责资源管理和任务分配,slave1,slave2上面的AM才负责执行,这样豁然开朗,我自己是每台虚拟机都安装anaconda3,所以才有这个bug产生

我自己仍有个问题,后面找到解决方案在更新:

我明明把所有代码全部切到python2了,还是执行失败

经过测试,我将环境变量更新了一下:vim ~/.bashrc --> 添加一句:source activate py27tf --> 保存后,输入:source ~/.bashrc,主要是slaves需要,master随便添不添加


最后,激励自己一下,过程真的很痛苦,但是成功后的成就感,自信感和满足感成倍的增加