vlambda博客
学习文章列表

一文快速入门大数据计算框架MapReduce

MaprReduce基础

分布式计算网络带宽是其限制

MR定义

MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。

MR优缺点

优点

1.MR易于编程

它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。

2.MR具有良好扩展性

可以通过简单的增加机器来扩展MR的计算能力和计算资源

3.高容错性

其中一台机器挂了,MR可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的。

4.适合PB级别以上海量数据离线处理

可以实现上千台服务器集群并发工作,提供数据处理能力。




缺点

1.不擅长实时计算

MR无法像MySql一样,在毫秒级内返回结果;

2.不擅长流式计算

流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。

3.不擅长DAG(有向图)计算

计算多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入磁盘,造成大量的磁盘IO,导致性能非常低下。




MR核心思想

1.一个MR分布式的运算程序往往需要分成至少2 个阶段。2.第一个阶段的MapTask 并发实例,完全并行运行,互不相干。3.第二个阶段的ReduceTask 并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask 并发实例的输出。4.MapReduce 编程模型只能包含一个 Map 阶段和一个 Reduce 阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce 程序,串行运行。

MR进程

MRv1

JobTracker

集群资源管理+任务协调必须不断跟踪所有TaskTracker和所有的map、reduce任务

Tasktracker

任务监控(心跳机制)



MRv2

ResourceManager

负责任务集群资源的管理

MrApplicationMaster

是运行MR任务时产生的AM,负责整个任务程序的过程调度及状态协调

NodeManager

负责接受rm信息启动容器相当于是tasktracker,这里NM和RM通过心跳进行通讯。

MR任务运行时产生的进程

一个完整的MapReduce程序在分布式运行时有三类实例进程:

MrAppMaster

负责整个程序的过程调度及状态协调

MapTask

负责Map阶段的整个数据处理流程

ReduceTask

负责Reduce阶段的整个数据处理流程




MR作业原理

一文快速入门大数据计算框架MapReduce

1.首先client已经写好了一个MR程序,将该作业或jar包提交给JobTracker;2.JobTracker进行作业的校验,判断输出路径是否存在,判断是否有写入路径权限,输入的路径是否存在等,如果都ok,JobTracker初始化后会将运行作业所需要的资源提交到集群去,将作业相关的jar文件、配置文件信息等复制到hdfs上,并返回作业ID给client;3.JobTracker根据TaskTracker的心跳消息,判断哪个节点可以运行任务,并安排相应节点进行作业;4.作业节点到hdfs上拉取作业相关的队列信息或运行所需数据;5.在运行作业节点的内部会启动一个JVM,在其内部执行Maptask或Reducetask;6.任务执行完成后,TaskTracker上报jobtracker任务信息;

拓展2

Hadoop序列

序列化定义

序列化

就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。 反序列化

就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对象。

为什么要序列化

一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的” 对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。

为什么不要Java序列化

Java的序列化是一个重量级序列化框架(Serializable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,Header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable)。

Hadoop序列化特点

1.紧凑 :高效使用存储空间;2.快速:读写数据的额外开销小;3.可扩展:随着通信协议的升级而升级;4.互操作:支持多语言的交互

MR进阶

MR框架原理

InputFormat数据输入

切片与MapTask并行度决定机制

MapTask 的并行度决定Map 阶段的任务处理并发度,进而影响到整个Job 的处理速度。

MapTask并行度决定机制

数据块:Block 是HDFS 物理上把数据分成一块一块。

数据切片:数据切片只是在逻辑上对输入进行分片,并不会在磁盘上将其切分成片进行存储。

一文快速入门大数据计算框架MapReduce

FileInputFormat切片机制

切片机制

1.简单地按照文件的内容长度进行切片2.切片大小,默认等于Block大小3.切片时不考虑数据集整体,而是逐个针对每一个文件单独切片

案例

一文快速入门大数据计算框架MapReduce

CombineTextInputFormat 切片机制

框架默认的 TextInputFormat 切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个 MapTask,这样如果有大量小文件,就会产生大量的MapTask,处理效率极其低下。 CombineTextInputFormat 用于小文件过多的场景,它可以将多个小文件从逻辑上规划到一个切片,这样,多个小文件就可以交给一个MapTask处理。

切片机制

生成切片过程包括:虚拟存储过程和切片过程二部分。

一文快速入门大数据计算框架MapReduce

1.

虚拟存储过程: 将输入目录下所有文件大小,依次和设置的 setMaxInputSplitSize 值比较,如果不大于设置的最大值,逻辑上划分一个块。如果输入文件大于设置的最大值且大于两倍,那么以最大值切割一块;当剩余数据大小超过设置的最大值且不大于最大值2 倍,此时将文件均分成2 个虚拟存储块(防止出现太小切片)。 例如setMaxInputSplitSize 值为 4M,输入文件大小为8.02M,则先逻辑上分成一个4M。剩余的大小为 4.02M,如果按照 4M 逻辑划分,就会出现 0.02M 的小的虚拟存储文件,所以将剩余的 4.02M 文件切分成(2.01M 和2.01M)两个文件。

2.

切片过程: (a)判断虚拟存储的文件大小是否大于 setMaxInputSplitSize 值,大于等于则单独形成一个切片。 (b)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。 (c)测试举例:有 4 个小文件大小分别为 1.7M、5.1M、3.4M 以及 6.8M 这四个小文件,则虚拟存储之后形成 6 个文件块,大小分别为:

1.7M,(2.55M、2.55M),3.4M 以及(3.4M、3.4M)



MR工作流程

一文快速入门大数据计算框架MapReduce

一文快速入门大数据计算框架MapReduce

上面两张图详细的记录了MapReduce的整个流程,其中重点是Shuffle过程,主要从第7步开始,一直到13,即从map的输出一直到reduce的输入。

MapReduce流程+Shuffle机制详解

MR执行流程

1.数据经InputFormat处理输入,经切片操作输出(input split)-->

默认每一个input split对应一个maptask进行处理

2.将map()函数处理后得到的(key,value)对在写入到缓冲区-->

以便把map任务处理的结果发送给指定的reducer去执行,从而达到负载均衡,避免数据倾斜

3.写入缓存区的数据会进行partition以及快速排序(首先对partition进行排序,确保分区数据在一块,再对键进行快排,确保同一分区的key是有序的),具体分为两种情况:

第一种情况,map输出的结果会有一个环形内存缓冲区(环形缓存区大小默认100m,当该缓冲区达到80%,锁定该80%内存,并对每个缓冲区按键做快速排序,之后在本地创建一个溢写文件,将缓冲区数据写入该文件),剩余的20%的内存在此期间可以继续写入map输出的键值对。-->

缓冲区内数据在写入磁盘之前,首先会进行排序(快速排序)、合并的目的: 


1.尽量减少写入磁盘的数据量

2.减少数据传输到下一个阶段的数据量

 第二种情况,当一个maptask处理的数据很大,以至于超过了环形缓冲区内存大小,就会生成多个spill文件。此时需要对一个map任务产生的多个spill文件进行归并(归并排序),生成最终的一个已分区且已排序的大文件。-->

4.reduce通过jobtracker维护整个集群宏观信息,找到对应的map输出位置,copy map的输出到reducer上--->(reduce本身也有环形内存缓冲区,阈值80%会生成一个spill文件,reducer读取的map数据先写入内存缓存区,能放的下内存则在内存中(内存到内存的merge),如果内存缓冲区的map输出占据空间达到阈值,启动内存merge,将内存数据写入磁盘(内存到磁盘merge))。与map端的溢写类似,在将buffer中多个map输出合并写入磁盘之前,如果设置了Combiner,则会化简压缩合并的map输出。

5.当属于该reducer的map数据全部拷贝完,则会在reducer上生成多个spil文件(如果拖取的所有map数据总量都没有超过内存缓冲区,则数据就只存在于内存中),后台线程会不断将spill文件合并成一个有序的大文件。这时开始执行合并操作,即磁盘到磁盘merge。(Map的输出数据已经是有序的,Merge进行一次合并排序,所谓Reduce端的sort过程就是这个合并的过程,采取的排序方法跟map阶段不同,因为每个map端传过来的数据是排好序的,因此众多排好序的map输出文件在reduce端进行合并时采用的是归并排序,针对键进行归并排序)-->

目的是节约后续reducer执行reduce函数的时间一般Reduce是一边copy一边sort,即copy和sort两个阶段是重叠而不是完全分开的。最终Reduce shuffle过程会输出一个整体有序的数据块。

6.将合并的数据交给reduce作为reduce文件的输入,通过代码逻辑处理-->

合并成大文件后,Shuffle 的过程也就结束了,后面进入ReduceTask 的逻辑运算过程(从文件中取出一个一个的键值对Group,调用用户自定义的 reduce()方法)

Map端shuffle流程

1-3

Reduce端shuffle流程

4-6

注意:shuffle 中的缓冲区大小会影响到 MapReduce 程序的执行效率,原则上说,缓冲区越大,磁盘io 的次数越少,执行速度就越快。缓冲区的大小可以通过参数调整,参数:io.sort.mb 默认 100M。

一文快速入门大数据计算框架MapReduce

Partition分区

默认partition分区

默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。

分区总结

1.如果ReduceTask的数量> getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;2.如果1<ReduceTask的数量<getPartition的结果数,则有一部分分区数据无处安放,会Exception;3.如果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给ReduceTask,最终也就只会产生一个结果文件 part-r-00000;4.分区号必须从零开始,逐一累加。

案例

MR中的排序

排序是MapReduce框架中最重要的操作之一。MapTask 和ReduceTask 均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。

默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。

对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。

对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。

排序分类

部分排序

MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序。

全排序

最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低。

辅助排序

在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。

二次排序

在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。

Map Task工作机制

1.Read 阶段:MapTask 通过用户编写的 RecordReader,从输入 InputSplit 中解析出一个个key/value。

2.Map 阶段:该节点主要是将解析出的 key/value 交给用户编写map()函数处理,并产生一系列新的key/value。

3.Collect 收集阶段:在用户编写 map()函数中,当数据处理完成后,一般会调用OutputCollector.collect()输出结果。在该函数内部,它会将生成的 key/value 分区(调用Partitioner),并写入一个环形内存缓冲区中。

4.Spill 阶段:即“溢写”,当环形缓冲区满后,MapReduce 会将数据写到本地磁盘上,生成一个临时文件。需要注意的是,将数据写入本地磁盘之前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。

溢写阶段详情:

步骤1:利用快速排序算法对缓存区内的数据进行排序,排序方式是,先按照分区编号Partition 进行排序,然后按照key 进行排序。这样,经过排序后,数据以分区为单位聚集在一起,且同一分区内所有数据按照 key 有序。步骤2:按照分区编号由小到大依次将每个分区中的数据写入任务工作目录下的临时文件 output/spillN.out(N 表示当前溢写次数)中。如果用户设置了 Combiner,则写入文件之前,对每个分区中的数据进行一次聚集操作。步骤3:将分区数据的元信息写到内存索引数据结构SpillRecord 中,其中每个分区的元信息包括在临时文件中的偏移量、压缩前数据大小和压缩后数据大小。如果当前内存索引大小超过1MB,则将内存索引写到文件 output/spillN.out.index 中。

5.Combine 阶段:当所有数据处理完成后,MapTask 对所有临时文件进行一次合并,以确保最终只会生成一个数据文件。

当所有数据处理完后,MapTask 会将所有临时文件合并成一个大文件,并保存到文件output/file.out 中,同时生成相应的索引文件 output/file.out.index。在进行文件合并过程中,MapTask 以分区为单位进行合并。对于某个分区,它将采用多轮递归合并的方式。每轮合并 io.sort.factor(默认 10)个文件,并将产生的文件重新加入待合并列表中,对文件排序后,重复以上过程,直到最终得到一个大文件。 让每个 MapTask 最终只生成一个数据文件,可避免同时打开大量文件和同时读取大量小文件产生的随机读取带来的开销。

Reduce Task工作机制

1.Copy 阶段:ReduceTask 从各个 MapTask 上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。2.Merge 阶段:在远程拷贝数据的同时,ReduceTask 启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。3.Sort 阶段:按照 MapReduce 语义,用户编写 reduce()函数输入数据是按 key 进行聚集的一组数据。为了将 key 相同的数据聚在一起,Hadoop 采用了基于排序的策略。由于各个 MapTask 已经实现对自己的处理结果进行了局部排序,因此,ReduceTask 只需对所有数据进行一次归并排序即可。4.Reduce 阶段:reduce()函数将计算结果写到HDFS 上。

ReduceTask并行度

ReduceTask 的并行度同样影响整个 Job 的执行并发度和执行效率,但与 MapTask 的并发数由切片数决定不同,ReduceTask 数量的决定是可以直接手动设置。

注意事项

1.ReduceTask=0,表示没有Reduce阶段,输出文件个数和Map个数一致。2.ReduceTask默认值就是1,所以输出文件个数为一个。3.如果数据分布不均匀,就有可能在Reduce阶段产生数据倾斜4.ReduceTask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个ReduceTask。5.具体多少个ReduceTask,需要根据集群性能而定。6.如果分区数不是1,但是ReduceTask为1,是否执行分区过程。答案是:不执行分区过程。因为在MapTask的源码中,执行分区的前提是先判断ReduceNum个数是否大于1。不大于1肯定不执行。

关于MR的零碎总结

1.HDFS数据以block形式存储,mapreduce不能直接读取block中数据,只能读取record reader解析后的block的数据内容。

2.block是有顺序记录的。

3.通过record记录读取器,记录上一个block的结尾和下一个block的开头,解决了“跨block问题”,record可以理解为逻辑上的一个记录。对于跨inputsplit问题由RecordReader解决。

4.HDFS中的Block块不能设置太大或者设置太小;

1.设置太大:

  * 影响并发,导致执行过慢
* 数据在网络之间传输时间增长,或者程序卡顿失败,恢复的成本较高

2.设置太小

  * NN 造成过大的内存压力

* 会进行频繁的文件传输,占用网络/cpu资源

关于MapReduce的几个问题

问题1:MrAppMaster、MapTask、ReduceTask这三个进程与RM、AM、NM之间的联系是?

问题2:1G 的数据,启动 8 个 MapTask,可以提高集群的并发处理能力。那么 1K 的数据,也启动8 个MapTask,会提高集群性能吗?MapTask 并行任务是否越多越好呢?哪些因素影响了MapTask并行度?

问题3:MapTask的并行度取决于数据的split个数这个说法对吗?

问题4 hdfs中的block块设置太小为什么会导致频繁的文件传输,占用网络和cpu资源?

以上四个问题大家可以积极思考,有助于消化本篇文章的内容,知道答案大家伙可以访问我的个人blog,在对应文章评论区与本菜鸟一起交流交流~


Blog Link

https://whiteco-okie.github.io/

参考资料

https://www.pianshen.com/article/36891558397/

https://blog.csdn.net/panjiao119/article/details/80139677

https://www.toutiao.com/i6627253838606762503/

尚硅谷MapReduce技术文章