vlambda博客
学习文章列表

大数据系列教程03:MapReduce执行流程

在上一小节中,我们通过一个气象数据分析案例讲解了MapReduce的编程模型,并与数据库SQL做类比讲解了map函数和reduce函数的作用。

其实如果只是针对上一节的那个案例,即使不用MapReduce,通过简单的for循环也能够很容易的获取分析结果。那么MapReduce与常规的应用程序或SQL相比有哪些优势呢?答案就是MapReduce是一个分布式计算框架。当要处理的数据量特别大时,MapReduce能够在多个服务器上并行处理任务的各个部分(例如每个服务器处理文件中的一部分数据),再对每个服务器的处理结果进行汇总以得到最终结果。本节我们就来学习MapReduce底层的分布式运行机制。

拆分任务

Job是我们向Hadoop集群提交MapReduce任务的基本单元,一个Job会由MapReduce程序代码、数据文件、配置信息三方面组成。单个Job会被拆分成多个Task,所有Task会被分成两类,一类是map task,一类是reduce task。

每个Task会由调度系统YARN负责调度到一个节点上运行,如果某个task运行失败,则YARN会自动将失败的Task重新调度到另一个节点上运行。

Hadoop会将数据文件切分成多个固定大小的split,每个split会交给一个单独的map task处理。map task会利用我们写好的map函数依次处理split中的record。

聪明的你一定已经想到,文件被拆分后的split越多,在map阶段的并行度就越高,进而执行map函数所需要的时间就越少。但过多的split和map task也会导致对其进行管理的成本增加。一般情况,split的大小与HDFS block的大小相同,默认也是128MB。

一个MapReduce集群,通常就是HDFS文件系统对应的集群。YARN在调度一个map task的时候,会尽量让这个task运行在一个拥有输入数据的节点上。也就是说,HDFS中的一个block会对应一个map task,且该map task运行的节点上就有这个block的副本。这样map task只要访问本地文件系统就能够获取到输入数据,无需消耗宝贵的网络带宽资源。但有的时候可能有多个Job同时在集群中运行,导致拥有对应block副本的节点已经运行了其它的task,无法再接收新的task。这时YARN会尽量找到一个离block副本最近的节点运行map task,即先尝试在同一个机架内分配map task。如果同一个机架内也无法找到合适的节点,就只能到其它的机架为map task分配节点,但这种概率非常低。

下图展示了map task与要处理的HDFS Block的三种位置关系,a是map task与HDFS Block在同一个节点上,b是map task与HDFS Block在同一个机架上,c是map task与HDFS Block不在同一个机架上:

大数据系列教程03:MapReduce执行流程

执行流程

假设要处理的数据文件在HDFS上有3个Block,且我们配置的Reducer数量是1,则MapReduce处理数据的的执行流程如下图所示:

大数据系列教程03:MapReduce执行流程

首先,每个Block会交给一个单独的map task进行处理。每个map task在处理完数据后,会将处理结果保存到其运行节点的本地磁盘上。注意map task的处理结果并没有保存在HDFS上,因为map task的结果只是中间结果,一旦整个job完成,这些中间结果就可以被删除了,所以没必要将map task的处理结果以多副本的方式保存在HDFS。reduce task会主动拉取map task的处理结果,在map task的处理结果被消费前,如果发生节点宕机导致map task的处理结果丢失,则Hadoop会自动在另外一个节点上重新运行那个map task。

所有的map task都执行完之后,YARN会开始调度reduce task执行。reduce task不会有任何的本地数据优势,因为reduce task的输入数据会来自所有的map task,map task的处理结果必须通过网络传输到reduce task运行的节点。reduce task运行的节点收到所有map task的结果后,会先对拥有相同key的结果进行合并,然后再调用定义的reduce方法。

reduce task的输出结果就是整个Job的运行结果,所以它的结果文件会被保存在HDFS上。对于结果文件的每个Block,第一个副本会保存在本地节点上,其它副本会保存在别的机架中的节点上,以此来确保数据的可用性。

每个Job的reduce task数量是可以配置的。如果是有多个reduce task,则map task会对它的输出结果进行分区(partition),每个分区的数据交给一个reduce task进行处理。一个分区中可以包含多个key,但相同key的所有结果一定全部在一个分区。我们可以通过自定义partitioning方法来动态改变分区的规则,但通常默认的分区规则(hash)就已经适合大部分场景了。有多个reduce task的数据处理流程如下图所示:

在整个流程中,将map task的处理结果传给reduce task的过程被称作shuffle。shuffle的过程我们后面会用专门的一节来介绍。

Combiner方法

我们已经知道,map task的处理结果会通过网络传给reduce task。然而,很多的结果数据是没有必要发送给reduce task的。例如在气象数据统计例子中,我们要统计的是每一年的最高温度。那对于key的值是同一个year的温度数据,map task只需要把温度最高的数据发送给reduce task,而不用发送所有的温度数据。

例如有两个map task都包含1950年的温度数据,第一个map task的结果数据如下:

(1950, 0)
(1950, 20)
(1950, 10)

第二个map task的结果数据如下:

(1950, 25)
(1950, 15)

正常情况,reduce task会接收到如下数据:

(1950, [0, 20, 10, 25, 15])

reduce task处理后得到结果:

(1950, 25)

虽然结果是正确的,但两个map task实际上并不需要将所有的数据都发送给reduce task,它们只需要把各自的最大温度数据发给reduce task即可。即reduce task会收到如下数据:

(1950, [20, 25])

这组数据经过reduce函数计算后,也可以得到正确的结果。这样不仅降低了reduce函数的计算量,同时也节省了宝贵的网络带宽资源。我们可以通过在创建Job时指定combiner方法执行这个逻辑。

combiner方法并不能代替reduce方法,但它可以减少map task和reduce task之间交互的数据量,在很大程度上提高性能。可以通过如下代码给Job对象设置一个combiner方法:

job.setCombinerClass(MaxTemperatureReducer.class);

—————END—————