vlambda博客
学习文章列表

「大数据」(八十)Spark之Shuffle机制

【导读:数据是二十一世纪的石油,蕴含巨大价值,这是·情报通·大数据技术系列第[80]篇文章,欢迎阅读和收藏】

1 基本概念

shuffle : Shuffle 是 MapReduce 框架中的一个特定的 phase ,介于 Map phase 和 Reduce phase 之间,当 Map 的输出结果要被 Reduce 使用时,输出结果需要按 key 哈希,并且分发到每一个 Reducer 上去,这个过程就是 shuffle 。

由于 shuffle 涉及到了磁盘的读写和网络的传输,因此 shuffle 性能的高低直接影响到了整个程序的运行效率。下面这幅图清晰地描述了 MapReduce 算法的整个流程,其中 shuffle phase 是介于 Map phase 和 Reduce phase 之间作为大数据离线计算平台中最重要的模块,其性能直接影响作业的运行时效和平台资源消耗。主要工作是从 Map 结束到 Reduce 开始之间的过程。shuffle 阶段又可以分为 Map 端的 shuffle 和 Reduce 端的 shuffle 。


2 术语解释

2.1 Map 端的 Shuffler

每个 map task 都有一个内存缓冲区(如上图中的 buffer in memory 默认为 100MB ),存储着 map 的输出数据 ( 格式为 Key-Value 键值对 ) ,当缓冲区快满的时候,需要将缓冲区中的数据以一个临时文件的方式存放到磁盘上 , 溢写是单线程来完成的,他不影响往该缓存中写 map 结果的线程。溢写线程启动时不应该阻止 map 的结果输出,所以整个缓冲区有个溢写的比例 spill.percent, 这个比例默认是 0.8 ,也就是当缓冲区的数据已经达到阈值( buffer size * spill percent = 100MB * 0.8 = 80MB ),溢写线程启动,锁定这 80MB 的内存,执行溢写过程。Map task 的输出结果还可以往剩下的 20MB 内存中写,互不影响。这里需要注意在写入磁盘之前,需要进行 partition,sort , combine 操作。随着数据不断地输出,溢写的文件越来越多,还要在磁盘上进行合并操作 , 最后合成一个溢出文件。

2.1.1 partition

每个输出数据要经过 partition 这个程序得到一个分区号。默认情况下, partition 根据每一对 Key-Value 键值对的键的哈希值对 reduce 的个数取模得到一个分区号( hashcode%reduce 个数)。partition 的目的是把数据分配到一个 reduce task 中去。

2.1.2 sort

sort 程序的目的是排序,排序的目标是 map 输出的数据。默认情况下,按照键值对的 key 的 ASCII 码值进行排序(也就是字典排序)。

2.1.3 combine

combine 函数是将具有相同 Key 的多个 Key-Value 键值对合并成一个键值对,这里需要注意,若在 map 端使用 combine 操作,则首先需要在 map 中也调用 group 程序(在后面将会被介绍),因为在 combine 中需要用到相同的键。这样做的目的是为了减少网络传输。

2.2 Reduce 端的 Shuffler

2.2.1 fetch

Reduce 端按照各个 Map 端的 partition 程序得出的分区号进行抓取,抓取的数据同样存在于内存中。

2.2.2 sort

将 Reduce 抓到的数据再次进行排序,排序调用的还是 Map 端使用的 Sort 程序。

2.2.3 group

接下来进行的是分组,默认的分组模式是根据每个数据( Key-Value )的 Key 是否相同进行分组的。键相等就在同一个组中。每一组数据传给 Reduce 程序中(每组数据的特点都是相同的 Key 键)。

2.3 MapReduce 的 split 大小

分片大小要趋向与一个 HDFS 的一个数据块的大小,默认是 64MB ,这里我们需要叙述一下为何要趋向与一个数据块的大小:

hadoop 在存储输入数据的节点上运行 map 任务,可以获得最佳性能。这就是所谓的 “ 数据本地化优化 ” ,因为它无需使用宝贵的集群带宽资源。分片趋向于数据块大小的目的也是为了节省集群带宽资源, hdfs 数据块的大小就等于存储在单个节点上的最大传输输入块的大小。若分片大于 HDFS 数据块(跨越两个数据块),对于任何一个节点,基本上都不可能同时存储这两个数据块,因此分片中的部分数据需要通过网络传输到 map 任务节点,这样就要消耗集群带宽资源。

split 大小是根据以下算法确定的:

- max.split(100M)
- min.split(10M)
- block(64M)
- max(min.split,min(max.split,block))( 任何一个 split 都不能大于 block )。