vlambda博客
学习文章列表

大数据序列五:Hadoop之MapReduce分布式计算框架(上)

MapReduce思想

        MapReduce思想在生活中处处可见。我们或多或少都曾接触过这种思想。MapReduce的思想核心是分而治之

        MapReduce任务过程是分为两个处理阶段:

        Map阶段Map阶段的主要作用是,即把复杂的任务分解为若干个简单的任务并行处理。Map阶段的这些任务可以并行计算,彼此间没有依赖关系。

        Reduce阶段:Reduce阶段的主要作用是,即对map阶段的结果进行全局汇总。

以下图为例来理解mapReduce的思想



MapReduce原理分析

        MapTask运行机制详解

大数据序列五:Hadoop之MapReduce分布式计算框架(上)


详细步骤:

        1. 首先,读取数据组件InputFormat(默认TextInputFormat)会通过getSplits方法对输入目录中文件进行逻辑切片规划得到splits,有多少个split就对应启动多少个MapTasksplitblock的对应关系默认是一对一。

    2. 将输入文件切分为splits之后,由RecordReader对象(默认LineRecordReader)进行读取,以\n作为分隔符,读取一行数据,返回<keyvalue>Key表示每行首字符偏移值,value表示这一行文本内容。

        3. 读取split返回<key,value>,进入用户自己继承的Mapper类中,执行用户重写的map函数。RecordReader读取一行这里调用一次。

        4. map逻辑完之后,将map的每条结果通过context.write进行collect数据收集。在collect中,会先对其进行分区处理,默认使用HashPartitioner

          4-1:MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。 

          5:接下来,会将数据写入内存,内存中这片区域叫做环形缓冲区,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,keyvalue值都会被序列化成字节数组。

        5-1:环形缓冲区其实是一个数组,数组中存放着keyvalue的序列化数据和keyvalue的元数据信息,包括partitionkey的起始位置、value的起始位置以及value的长度。环形结构是一个抽象概念。

        5-2:缓冲区是有大小限制,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写。这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spillpercent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Maptask的输出结果还可以往剩下的20MB内存中写,互不影响。

        6:当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为!

        6-1:如果job设置过Combiner,那么现在就是使用Combiner的时候了。将有相同keykey/value对的value加起来,减少溢写到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用。

        6-2:那哪些场景才能使用Combiner呢?从这里分析,Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等。Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果。

        7. 合并溢写文件:每次溢写会在磁盘上生成一个临时文件(写之前判断是否有combiner),如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个临时文件存在。当整个数据处理结束之后开始对磁盘中的临时文件进行merge合并,因为最终的文件只有一个,写入磁盘,并且为这个文件提供了一个索引文件,以记录每个reduce对应数据的偏移量。

至此map整个阶段结束!!


MapTask的一些配置:

大数据序列五:Hadoop之MapReduce分布式计算框架(上)

配置可参考官方文档

https://hadoop.apache.org/docs/


MapReduce的并行度

        1. MapTask并行度思考

        MapTask的并行度决定Map阶段的任务处理并发度,从而影响到整个Job的处理速度。思考:MapTask并行任务是否越多越好呢?哪些因素影响了MapTask并行度?

        2. MapTask并行度决定机制

        数据块:Block是HDFS物理上把数据分成一块一块。切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。


        并行度决定机制:

大数据序列五:Hadoop之MapReduce分布式计算框架(上)


        切片机制源码阅读

大数据序列五:Hadoop之MapReduce分布式计算框架(上)

大数据序列五:Hadoop之MapReduce分布式计算框架(上)

                    

        默认就是128M;

        MapTask并行度是不是越多越好呢?

    并不是,如果一个文件仅仅比128M大一点点也被当成一个split来对待,而不是多个split。

        MR框架在并行运算的同时也会消耗更多资源,并行度越高资源消耗也越高,假设129M文件分为两个分片,一个是128M,一个是1M;对于1M的切片的Maptask来说,太浪费资源。

        129M的文件在Hdfs存储的时候会不会切成两块?

        实际也不会分,hdfs也是有自己的优化算法。

        所以建议map任务的切片大小和hdfs的块(block)大小保持一致,当然也可以更改。


        3.ReduceTask 工作机制

大数据序列五:Hadoop之MapReduce分布式计算框架(上)


        Reduce大致分为copysortreduce三个阶段,重点在前两个阶段。

        copy阶段包含一个eventFetcher来获取已完成的map列表,由Fetcher线程去copy数据,在此过程中会启动两个merge线程,分别为inMemoryMergeronDiskMerger,分别将内存中的数据merge到磁盘和将磁盘中的数据进行merge。待数据copy完成之后,copy阶段就完成了,开始进行sort阶段,sort阶段主要是执行finalMerge操作,纯粹的sort阶段,完成之后就是reduce阶段,调用用户定义的reduce函数进行处理。


        详细步骤

        Copy阶段,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求maptask获取属于自己的文件。Merge阶段。这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第一种形式不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件。合并排序。把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。


        4.ReduceTask 工作机制

        ReduceTask的并行度同样影响整个Job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不同,ReduceTask数量的决定是可以直接手动设置:        


        注意事项

        1. ReduceTask=0,表示没有Reduce阶段,输出文件数和MapTask数量保持一致;

        2. ReduceTask数量不设置默认就是一个,输出文件数量为1个;

        3. 如果数据分布不均匀,可能在Reduce阶段产生倾斜;


           ╭─  ─  ─  ─  ─  ─  ─  ─  ─╮
           ┃           Flying  Young        
     ─  ─  ─  ─  ─  ─  ─  ─  ─  ─  ─╯
码上飞驰
旨在为广大IT人员服务与贡献,为IT行业的推进贡献自己的力量。
27篇原创内容
Official Account