vlambda博客
学习文章列表

0008 - MapReduce中Shuffle和排序机制解析


01
 Shuffle



MapReduce 确保每个 Reducer 的输入都是按 key 排序。系统执行排序,将 Map 的输出作为输入传给 Reducer 的过程称为 Shuffle。

图片来自《Hadoop: The Definitive Guide》

02
  Map 端



Map 函数输出时,并不是直接写到磁盘,而是利用缓冲的方式写到内存并进行预排序。

其过程如下:

1、 每个 Map 任务都有一个环形内存缓冲区用于存储任务输出。在默认情况下,缓冲的大小为 100MB mapreduce.task.io.sort.mb ),一旦缓冲区的内容达到 80% mapreduce.map.sort.spill.percent ),便有一个后台线程将写入缓冲区的内容按照轮询的方式溢出(spill)写到磁盘指定目录( mapreduce.cluster.local.dir )。溢出的过程中 Map 任务一边溢出一边继续写入缓冲区,如果缓冲区被写满,Map 任务就会阻塞直到后台线程写磁盘过程结束。

2、 在写磁盘之前,线程根据数据最终要传的 Reducer 把数据划分到相应的分区( Partition )。每个分区按 key 在内存中排序。

3、 如果有 Combiner 函数,则在分区排序后的输出上运行局部聚合操作,以减轻 Shuffle 过程中网络负载的压力。

4、 每次内存缓冲区达到溢出阈值,就会新建一个溢出文件( spill file ),因此在 Map 任务写完其最后一个输出后,会有多个溢出文件。最终会被合并成一个已分区且已排序的输出文件。如果 Map 输出的数据比较多,产生的小文件会很多,影响系统性能,因此需要进行合并,通过( mapreduce.task.io.sort.factor,默认为 10 )设置一次最多可以合并的文件个数。

5、 Map 输出到磁盘的过程中,可以设置压缩,默认关闭 Map 输出压缩。为加快写磁盘速度,节约磁盘空间,并且减少传给 Reducer 的数据量,通过设置( mapreduce.map.output.compress )为 true 开启压缩,使用的压缩库由( org.apache.hadoop.io.compress.DefaultCodec )指定( Hadoop 的压缩 codec 格式见下文 )。

以上便是 Map 任务输出过程的主要步骤,输出到磁盘后,Reducer 通过 HTTP 服务获取输出文件的分区数据。用于文件分区的工作线程的数据由( mapreduce.shuffle.max.threads )控制,该设置针对每一个 NodeManager,而不是每个 Map 任务。默认值为 0 表示最大线程设置为机器中可用处理器数的 2 倍。

03
  Reduce 端



Reduce 端在 Shuffle 阶段主要分为:Copy、Merge、Sort 以及 Reduce 阶段。


3.1 - Copy 阶段

ReduceTask 从各个 MapTask 上远程复制数据,因此只要有 MapTask 完成,ReduceTask 就开始复制其输出。复制的过程可以使多线程并发进行,并发数由(mapreduce.reduce.shuffle.parallelcopies,默认为 5)设置。


MapTask 成功完成后,通过心跳机制通知 ApplicationMaster,Reducer 中的一个线程定期查询 ApplicationMaster,以获取完成的 MapTask 输出的主机位置,从而去对应的主机复制数据,直到获得所有的输出位置。

由于第一个 Reducer 可能会失败,因此主机并没有在第一个 Reducer 检索到 Map 输出时就立即从磁盘上删除它们。相反,主机会等待作业完成,直到 ApplicationMaster 告知它删除 Map 输出。

如果 Map 输出相对较小,会被复制到 ReduceTask 的 JVM 内存( 缓冲区大小比例由 mapreduce.reduce.shuffle.input.buffer.percent 控制,默认为 70% ),否则,Map 输出被复制到磁盘。一旦内存缓冲区达到阈值( mapreduce.reduce.shuffle.merge.percent,默认为 66% )或达到 Map 输出文件数阈值( mapreduce.reduce.merge.inmem.threshold,默认为 1000 ),则将内存的数据合并后溢出写到磁盘。如果设置了 Combiner 函数,则在写入磁盘前调用 Combiner 函数以减少写入磁盘的数据量。

3.2 - Merge 阶段

在远程复制数据的同时,随着磁盘上溢写文件的增多,ReduceTask 启动两个后台线程对内存和磁盘上的文件进行合并(可能需要多次合并),每次合并的文件数由(mapreduce.task.io.sort.factor,默认为 10)控制。为了合并,压缩的 Map 输出都必须在内存中被解压缩。


3.3 - Sort 阶段

用户编写 reduce() 函数输入数据是按 key 聚集的一组数据,进行快速排序(字典顺序排序)。由于各个 MapTask 已经实现对其输出处理结果进行了局部排序,因此,ReduceTask 只需要对所有数据进行一次归并排序即可。


3.4 - Reduce 阶段

最后一次合并排序(可以来自内存和磁盘片段)的时候,直接把数据写入到 reduce() 函数,从而省略了一次磁盘的往返读写过程。最后输出直接写到文件系统(如:HDFS)上。


注意事项:
  • ReduceTask=0,表示没有 Reduce 阶段,输出文件个数 Map 个数一致。

  • ReduceTask=1,默认值,所以输出文件个数为 1 个。

  • 具体有多少个 ReduceTask,需要根据集群性能而定。

  • 如果数据分布不均匀,可能会在 Reduce 阶段产生数据倾斜。


04
  参数调优



在上面介绍 Shuffle 过程时,已经提过相关参数来提高 MapReduce 的性能。在 mapred-default.xml 配置文件修改,如下进行统一整理说明:


4.1 - Map 端参数调优

属性名称
默认值 描述
mapreduce.task.io.sort.mb 100

Map 输出时所使用的内存缓冲区大小,

单位:MB

mapreduce.map.sort.spill.percent 0.80

Map 输出溢写到磁盘的内存阈值百分

(0.8 或 80%)

mapreduce.cluster.local.dir ${hadoop.tmp.dir}/mapred/local 溢出文件指定本地磁盘的目录
mapreduce.task.io.sort.factor 10 排序文件一次最多合并的流数量
mapreduce.map.output.compress false 是否压缩 Map 输出
mapreduce.map.output.compress.codec org.apache.hadoop.io.compress.DefaultCodec Map 输出的压缩编解码器
mapreduce.shuffle.max.threads 0

Map 输出到 Reducer 的每个 NM 的工作线程0 表示使用机器中可用处理器数的 2 倍


Hadoop 的压缩 codec 格式如下:
压缩格式
HadoopCompressionCodec
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
gzip org.apache.hadoop.io.compress.GzipCodec
bzip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compression.lzo.LzopCodec
LZ4 org.apache.hadoop.io.compress.Lz4Codec
Snappy org.apache.hadoop.io.compress.SnappyCodec

1、减少溢写(Spill)次数
通过评估 Map 输出大小,增加 mapreduce.task.io.sort.mb  的值来减少溢写磁盘的次数,从而减少磁盘 I/O。可参考 MapReduce 计数器( SPILLED_RECORDS )计算在作业运行整个阶段中溢写磁盘的记录数。

2、减少合并(Merge)次数
通过调整 mapreduce.task.io.sort.factor  的值,增大 Merge 的文件数目,减少 Merge 的次数,从而缩短 MapReduce 的处理时间。

3、合理修改运行任务内存
运行 Map 任务和 Reduce 任务的 JVM 内存大小由 mapreduce.map.java.opts  和 mapreduce.reduce.java.opts  设置。

4.2 - Reduce 端参数调优

属性名称 默认值 描述
mapreduce.reduce.shuffle.parallelcopies 5 把 Map 输出复制到 Reduce 的线程数
mapreduce.task.io.sort.factor 10 排序文件一次最多合并的流数量
mapreduce.reduce.shuffle.input.buffer.percent 0.70 Shuffle 的复制阶段,分配给 Map 输出的缓冲区占堆空间的百分比
mapreduce.reduce.shuffle.merge.percent 0.66 Map 输出缓冲区的阈值使用百分比,超过将进行合并输出和溢写磁盘
mapreduce.reduce.merge.inmem.threshold 1000 当 Map 输出文件数超过该阈值,进行合并输出和溢写磁盘,0 或更小的数表示没有阈值限制
mapreduce.reduce.input.buffer.percent 0.0 在 reduce 过程中,内存保存 Map 输出的空间占整个堆空间的比例。默认情况下,Reduce 任务开始前,所有的 Map 输出合并到磁盘,以便为 reduce 提供尽可能多的内存。


1、合理设置 Map 和 Reduce 个数
如果都设置较小,会导致 Task 等待,延长处理时间;如果设置太多,会导致 Map 和 Reduce 任务间竞争资源,造成处理超时等错误。

2、合理设置 Map 和 Reduce 共存
通过调整 mapreduce.job.reduce.slowstart.completedmaps (默认0.05),表示至少 Map 任务完成 5% 时,Reduce 任务才会开始运行,以减少 Reduce 任务的等待时间。

3、合理设置 Reduce 端的缓冲区
当数据达到一个阈值时,缓冲区中的数据就会写入磁盘,Reduce 从磁盘中读取所有数据,增加网络宽带负载。通过调整 mapreduce.reduce.input.buffer.percent  参数为 1.0(或一个更低的值,但要大于 0.0),Reduce 会直接读取保留指定比例的缓冲区中的数据,以提升性能。



扫一 扫,我 们的故事就开始了。


0008 - MapReduce中Shuffle和排序机制解析

扫描二维码获取

更多精彩

大数据梦工厂