vlambda博客
学习文章列表

Spark系列(十)Shuffle的技术难点问题--Spark比MapReduce快的真正原因

「写在前面:」 我是「nicedays」,一枚喜爱「做特效,听音乐,分享技术」的大数据开发猿。这名字是来自「world order」乐队的一首HAVE A NICE DAY。如今,走到现在很多坎坷和不顺,如今终于明白「nice day」是需要自己赋予的。「白驹过隙,时光荏苒,珍惜当下」~~ 写博客一方面是对自己学习的一点点总结及记录,另一方面则是希望能够帮助更多对大数据感兴趣的朋友。如果你也对 大数据与机器学习感兴趣,可以关注我的「动态」 https://blog.csdn.net/qq_35050438,让我们一起挖掘数据与人工智能的价值~


  • Spark Shuffle(一):Shuffle的技术难点问题

    • 一:解决shuffle的分区问题:

    • 二:解决shuffle的数据聚合问题:

    • 三:解决Map端combine问题:

    • 四:解决shuffle的sort问题:

    • 五:解决shuffle内存不足的问题:

    • 六:根据算子智能判定是否聚合和排序

    • 七:Spark与MaprReduce的不同之处(spark快的原因)


Spark Shuffle(一):Shuffle的技术难点问题

一:解决shuffle的分区问题:

map端write时的数据分区发往reduce的情形

如何确定分区个数?

每个map task的输出数据的分区个数可由用户自定义,但一般定义为集群可用cpu的1-2倍。

相对应reduce stage开启对应数量的task来处理。

如何对map task输出数据进行分区?

对每个map task 输出的<k,v>对,根据key值hash(key)%2计算partitionid,不同id不同分区。

二:解决shuffle的数据聚合问题:

reduce端read拉取map端数据把不同map端相同key值聚合的情形

  • 在线聚合法:

    mapreduce做不到在线聚合,它通常要把key相同的全部放到list后,才能再执行聚合。

    「spark当从map端拿到的每个record(k,v)记录进入HashMap时,就进行聚合」

三:解决Map端combine问题:

提前在map端用hashmap进行聚合,网络发送时,减少网络io

四:解决shuffle的sort问题:

首先要明确当数据需要排序时,reduce端一定要排序,writer也就是map端可排可不排,看情况,排了减少reduce端排序复杂度,但是增加计算时间成本。不排刚好相反,依情况而定。

map如果需要排序,何时排序?聚合前还是聚合后?

由mapreduce可知,它是「先排序再聚合」的,且排序在源码写死,不管如何都必须排序。「好处是」排序完的数据可以直接从前往后聚合,就用不着hashmap了。「缺点是」排序的数据需要大量的连续内存存储,且两个过程无法同时进行。

spark对此进行了优化,是「先聚合后排序」,设计了特殊的HashMap进行聚合,并将record(k,v)引用放入线性数据结构进行排序,灵活性高,「可惜需要复制record的引用,空间占用较大。」

五:解决shuffle内存不足的问题:

当数据量大时,这个问题即可能出现在write端,也可能出现在read端。

解决方案:内存+磁盘混合存储

先将数据在内存HashMap进行聚合,不断塞入后内存空间不足,将聚合的数据split到磁盘上,空闲的内存处理后面塞入的数据,split到磁盘的数据只是部分聚合,最后要将所有的数据进行全局聚合,要对数据进行排序,减少磁盘I/O

六:根据算子智能判定是否聚合和排序

包含shuffle的算子操作 write 端 combine write 端 sort read 端 combine read 端 sort
partitionBy() × × × ×
groupBy(),cogroup(),join(),
coalesce(),intersection(),subtract(),
subtractByKey()
× × ×
reduceByKey(),aggregateByKey(),
combineByKey(),foldByKey(),
distinct()
× × ×
sortByKey(),sortBy(),
repartitionAndSortWithinPartition()
× ×

七:Spark与MaprReduce的不同之处(spark快的原因)

  • spark通过「RDD和DAG图,「有效的表示了数据在计算过程中的一些」有关性和无关性」,spark将数据与数据之间相互独立的关系把其优化成了一个stage做一个「流水线计算」,使得cpu在面对一个并行化操作时,不是漫无目的的去东做执行一个计算,西做执行一个计算,「使得当前存储的数据一定是下一步需要用到的,」 而不会去存一些做了n步之后才用到的数据,同时也是通过RDD和DAG图的宽窄依赖是的它也确定了「数据的shuffle应该出现在它最应该出现shuffle的时候」,spark不存储计算中间结果。

  • spark是「多线程模型」,mapreduce是「多进程模型」,多进程每个reduce,maptask都是一个jvm进程,而多线程一个executor才是一个jvm进程,可以运行多个线程,每个线程执行一个task,虽然多进程这样「每个独立的jvm管理一个task」,每个task的任务占用资源可以更好控制,但是「由于每次启动jvm进程都需要重新申请资源」,job任务一多,这个启动时间会非常非常长。

    Spark则是通过「复用线程池中的线程来减少启动」、关闭task所需要的开销。(多线程模型也有缺点,由于同节点上所有任务运行在一个进程中,因此,「会出现严重的资源争用,难以细粒度控制每个任务占用资源」

  • spark Shuffle将Shuffle过程的聚合和排序的选择与过程进行了优化,需要排序的时候在排序,支持基于hash的分布式聚合,mapreduce排序不可避免。会浪费大量时间