vlambda博客
学习文章列表

详解MapReduce执行流程

1

mr原理


       Mapreduce是一个分布式运算程序的编程框架,是用户开发“基于hadoop的数据分析应用”的核心框架;

       Mapreduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个hadoop集群上;

2

为什么要用mapreduce


  • 海量数据在单机上处理因为硬件资源限制,无法胜任

  • 而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度

  • 引入mapreduce框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将分布式计算中的复杂性交由框架来处理


3

 mapreduce结构及核心运行机制


1

结构






一个完整的mapreduce程序在分布式运行时有三类实例进程:

  • MRAppMaster:负责整个程序的过程调度及状态协调

  • mapTask:负责map阶段的整个数据处理流程

  • ReduceTask:负责reduce阶段的整个数据处理流程

思想:分而治之,先分后合


2
整体流程图



      其中,maptask的数量是不能设置的,reducetask可以自己设置job.setNumReduceTasks(5);


3
流程解析


1.一个mr程序启动的时候,最先启动的是MRAppMaster,MRAppMaster启动后根据本次job的描述信息,计算出需要的maptask实例数量,然后向集群申请机器启动相应数量的maptask进程

2.maptask进程启动之后,根据给定的数据切片范围进行数据处理,主体流程为:

  • 利用客户指定的inputformat来获取RecordReader读取数据,形成输入KV对

  • 将输入KV对传递给客户定义的map()方法,做逻辑运算,并将map()方法输出的KV对收集到缓存

  • 将缓存中的KV对按照K分区排序后不断溢写到磁盘文件


3.MRAppMaster监控到所有maptask进程任务完成之后,会根据客户指定的参数启动相应数量的reducetask进程,并告知reducetask进程要处理的数据范围(数据分区)

4.Reducetask进程启动之后,根据MRAppMaster告知的待处理数据所在位置,从若干台maptask运行所在机器上获取到若干个maptask输出结果文件,并在本地进行重新归并排序,然后按照相同key的KV为一个组,调用客户定义的reduce()方法进行逻辑运算,并收集运算输出的结果KV,然后调用客户指定的outputformat将结果数据输出到外部存储


4

shuffle机制


1
概述


  1. mapreduce中,map阶段处理的数据如何传递reduce阶段,是mapreduce框架中最关键的一个流程,这个流程就叫shuffle;

  2. shuffle: 洗牌、发牌——(核心机制:数据分区,排序,缓存);

  3. 具体来说:就是将maptask输出的处理结果数据,分发给reducetask,并在分发的过程中,对数据按key进行了分区和排序


2

主要流程






详解MapReduce执行流程

shuffle是MR处理流程中的一个过程,它的每一个处理步骤是分散在各个map task和reduce task节点上完成的,整体来看,分为3个操作:

  • 分区partition

  • Sort根据key排序

  • Combiner进行局部value的合并


具体来说就是将maptask输出的处理结果数据,分发给reducetask,并在分发的过程中,对数据按key进行了分区和排序;


3
详细流程


详解MapReduce执行流程

  1. maptask收集我们的map()方法输出的kv对,先进入分区方法,把数据标记好分区,然后把数据发送到内存缓冲区(默认100M)

  2. 当环形缓冲区达到80%时,进行溢写,从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件 (溢写前对数据进行快速排序,排序按照key的索引进行字典顺序排序)

  3. 多个溢出文件会被合并成大的溢出文件(归并排序算法),对溢写的文件也可以进行combiner操作,前提是汇总操作,求平均值不行。

  4. 在溢出过程中,及合并的过程中,都要调用partitoner进行分组和针对key进行排序

  5. reducetask根据自己的分区号,去各个maptask机器上取相应的结果分区数据,拉取的数据先存储在内存中,内存不够了,再存储到磁盘。

  6. reducetask会取到同一个分区的来自不同maptask的结果文件,reducetask会将这些文件再进行合并(归并排序)

  7. 合并成大文件后,shuffle的过程也就结束了,后面进入reducetask的逻辑运算过程(从文件中取出一个一个的键值对group,调用用户自定义的reduce()方法)

Shuffle中的缓冲区大小会影响到mapreduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快

缓冲区的大小可以通过参数调整,  参数:io.sort.mb  默认100M




如何写好一篇数据部门规范文档