0008 - MapReduce中Shuffle和排序机制解析
Map 函数输出时,并不是直接写到磁盘,而是利用缓冲的方式写到内存并进行预排序。
其过程如下:
Reduce 端在 Shuffle 阶段主要分为:Copy、Merge、Sort 以及 Reduce 阶段。
ReduceTask 从各个 MapTask 上远程复制数据,因此只要有 MapTask 完成,ReduceTask 就开始复制其输出。复制的过程可以使多线程并发进行,并发数由(mapreduce.reduce.shuffle.parallelcopies,默认为 5)设置。
在远程复制数据的同时,随着磁盘上溢写文件的增多,ReduceTask 启动两个后台线程对内存和磁盘上的文件进行合并(可能需要多次合并),每次合并的文件数由(mapreduce.task.io.sort.factor,默认为 10)控制。为了合并,压缩的 Map 输出都必须在内存中被解压缩。
用户编写 reduce() 函数输入数据是按 key 聚集的一组数据,进行快速排序(字典顺序排序)。由于各个 MapTask 已经实现对其输出处理结果进行了局部排序,因此,ReduceTask 只需要对所有数据进行一次归并排序即可。
最后一次合并排序(可以来自内存和磁盘片段)的时候,直接把数据写入到 reduce() 函数,从而省略了一次磁盘的往返读写过程。最后输出直接写到文件系统(如:HDFS)上。
ReduceTask=0,表示没有 Reduce 阶段,输出文件个数 Map 个数一致。
ReduceTask=1,默认值,所以输出文件个数为 1 个。
具体有多少个 ReduceTask,需要根据集群性能而定。
如果数据分布不均匀,可能会在 Reduce 阶段产生数据倾斜。
在上面介绍 Shuffle 过程时,已经提过相关参数来提高 MapReduce 的性能。在 mapred-default.xml 配置文件修改,如下进行统一整理说明:
4.1 - Map 端参数调优
属性名称 |
默认值 | 描述 |
---|---|---|
mapreduce.task.io.sort.mb | 100 | Map 输出时所使用的内存缓冲区大小, 单位:MB |
mapreduce.map.sort.spill.percent | 0.80 | Map 输出溢写到磁盘的内存阈值百分比 (0.8 或 80%) |
mapreduce.cluster.local.dir | ${hadoop.tmp.dir}/mapred/local | 溢出文件指定本地磁盘的目录 |
mapreduce.task.io.sort.factor | 10 | 排序文件一次最多合并的流数量 |
mapreduce.map.output.compress | false | 是否压缩 Map 输出 |
mapreduce.map.output.compress.codec | org.apache.hadoop.io.compress.DefaultCodec | Map 输出的压缩编解码器 |
mapreduce.shuffle.max.threads | 0 | Map 输出到 Reducer 的每个 NM 的工作线程。0 表示使用机器中可用处理器数的 2 倍 |
压缩格式 |
HadoopCompressionCodec |
---|---|
DEFLATE | org.apache.hadoop.io.compress.DefaultCodec |
gzip | org.apache.hadoop.io.compress.GzipCodec |
bzip2 | org.apache.hadoop.io.compress.BZip2Codec |
LZO | com.hadoop.compression.lzo.LzopCodec |
LZ4 | org.apache.hadoop.io.compress.Lz4Codec |
Snappy | org.apache.hadoop.io.compress.SnappyCodec |
4.2 - Reduce 端参数调优
属性名称 | 默认值 | 描述 |
---|---|---|
mapreduce.reduce.shuffle.parallelcopies | 5 | 把 Map 输出复制到 Reduce 的线程数 |
mapreduce.task.io.sort.factor | 10 | 排序文件一次最多合并的流数量 |
mapreduce.reduce.shuffle.input.buffer.percent | 0.70 | Shuffle 的复制阶段,分配给 Map 输出的缓冲区占堆空间的百分比 |
mapreduce.reduce.shuffle.merge.percent | 0.66 | Map 输出缓冲区的阈值使用百分比,超过将进行合并输出和溢写磁盘 |
mapreduce.reduce.merge.inmem.threshold | 1000 | 当 Map 输出文件数超过该阈值,进行合并输出和溢写磁盘,0 或更小的数表示没有阈值限制 |
mapreduce.reduce.input.buffer.percent | 0.0 | 在 reduce 过程中,内存保存 Map 输出的空间占整个堆空间的比例。默认情况下,Reduce 任务开始前,所有的 Map 输出合并到磁盘,以便为 reduce 提供尽可能多的内存。 |
扫描二维码获取
更多精彩
大数据梦工厂