vlambda博客
学习文章列表

码农手记丨SPARK性能调优


栏目📰:码农手记 💻

撰文✍🏻:极链科技 高级后端开发 吴宏伟

编辑📚:六六

关键词📌 SPARK  开发调优  资源参数调优 


码农手记丨SPARK性能调优


✍️
 码农手记 
将会邀请


一直在幕后
用代码和算法改变世界的
技术大佬们
将会不定期推送他们
所写的在技术专业中的
技术经验/研究/论文

为你呈现
/ 更前沿的技术思考 /
 / 更专业的技术干货 /

#码农手记# 第76
文 | 极链科技Video++ 高级后端开发 吴宏伟



/ spark基本运行原理 /

在进行调优之前,要先了解spark作业基本的运行原理,在次基础上才能对优化有更深入的了解

1、spark-submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程 


2、根据你使用的部署模式(deploy-mode:client/cluster)不同,Driver进程可能在本地启动,也可能在集群中某个工作节点上启动 


3、Driver进程本身会根据我们设置的参数,占有一定数量的内存和CPU Core 


4、Driver进程要做的第一件事情,就是向集群管理器(可以是Spark Standalone5集群,也可以是YARN)申请运行Spark作业需要使用的资源。资源指的就是Executor进程。在各个工作节点上,启动一定数量的Executor进程,每个Executor进程都占有一定数量的内存和CPU Core


5、在申请到了作业执行所需的资源之后,Driver进程就会开始调度和执行我们编写的作业代码了 


6、Driver进程会将我们编写的Spark作业代码分拆为多个Stage,每个Stage执行一部分代码片段,并为每个Stage创建一批Task,然后将这些Task分配到各个Executor进程中执行 


7、Task是最小的计算单元,负责执行一模一样的计算逻辑(也就是我们编写的某个代码片段),只是每个Task处理的数据不同而已。一个Stage的所有Task都执行完毕之后,会在各个节点本地的磁盘文件中写入计算中间结果,然后Driver就会调度运行下一个Stage。下一个Stage的Task的输入数据就是上一个Stage输出的中间结果 


8、Spark是根据Shuffle类算子来进行Stage的划分,Shuffle算子执行之前的代码会被划分为一个Stage


下面将从两个方面对spark调优进行介绍



/ 01  开发调优 /

主要包括:RDD lineage设计、算子的合理使用、特殊操作的优化等。在开发过程中,时时刻刻都应该注意以上原则,并将这些原则根据具体的业务以及实际的应用场景,灵活地运用到自己的Spark作业中。


 1    避免创建重复的RDD  

我们在开发一个Spark作业时,首先是基于某个数据源(比如Hive表或HDFS文件)创建一个初始的RDD; 接着对这个RDD执行某个算子操作,然后得到下一个RDD; 以此类推,循环往复,直到计算出最终我们需要的结果。 在这个过程中,多个RDD会通过不同的算子操作(比如map、reduce等)串起来,这个“RDD串”,就是RDD lineage,也就是“RDD的血缘关系链”。


我们在开发过程中要注意:对于同一份数据,只应该创建一个RDD,不能创建多个RDD来代表同一份数据。


 2    尽可能复用同一个RDD  

除了要避免在开发过程中对一份完全相同的数据创建多个RDD之外, 在对不同的数据执行算子操作时还要尽可能地复用一个RDD 比如说,有一个RDD的数据格式是key-value类型的,另一个是单value类型的,这两个RDD的value数据是完全一样的。 那么此时我们可以只使用key-value类型的那个RDD,因为其中已经包含了另一个的数据。 对于类似这种多个RDD的数据有重叠或者包含的情况,我们应该尽量复用一个RDD,这样可以尽可能地减少RDD的数量,从而尽可能减少算子执行的次数。

 3    尽量避免使用shuffle类算子  

如果有可能的话,要尽量避免使用shuffle类算子。因为Spark作业运行过程中,最消耗性能的地方就是shuffle过程。shuffle过程,简单来说,就是将分布在集群中多个节点上的同一个key,拉取到同一个节点上,进行聚合或join等操作。比如reduceByKey、join等算子,都会触发shuffle操作。


shuffle过程中,各个节点上的相同key都会先写入本地磁盘文件中,然后其他节点需要通过网络传输拉取各个节点上的磁盘文件中的相同key。而且相同key都拉取到同一个节点进行聚合操作时,还有可能会因为一个节点上处理的key过多,导致内存不够存放,进而溢写到磁盘文件中。因此在shuffle过程中,可能会发生大量的磁盘文件读写的IO操作,以及数据的网络传输操作。磁盘IO和网络数据传输也是shuffle性能较差的主要原因。


因此在我们的开发过程中,能避免则尽可能避免使用reduceByKey、join、distinct、repartition等会进行shuffle的算子,尽量使用map类的非shuffle算子。这样的话,没有shuffle操作或者仅有较少shuffle操作的Spark作业,可以大大减少性能开销。


 4  使用高性能的算子 


· 使用reduceByKey/aggregateByKey替代groupByKey

groupByKey没有进行任何本地聚合,所有数据都会在集群节点之间传输reduceByKey每个节点本地的相同key数据,都进行了预聚合,然后才传输到其他节点上进行全局聚合。


· 使用mapPartitions替代普通map

mapPartitions类的算子,一次函数调用会处理一个partition所有的数据,而不是一次函数调用处理一条,性能相对来说会高一些。但是有的时候,使用mapPartitions会出现OOM(内存溢出)的问题。因为单次函数调用就要处理掉一个partition所有的数据,如果内存不够,垃圾回收时是无法回收掉太多对象的,很可能出现OOM异常。所以使用这类操作时要慎重!


· 使用foreachPartitions替代foreach

原理类似于“使用mapPartitions替代map”,也是一次函数调用处理一个partition的所有数据,而不是一次函数调用处理一条数据。在实践中发现,foreachPartitions类的算子,对性能的提升还是很有帮助的。比如在foreach函数中,将RDD中所有数据写MySQL,那么如果是普通的foreach算子,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用foreachPartitions算子一次性处理一个partition的数据,那么对于每个partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。实践中发现,对于1万条左右的数据量写MySQL,性能可以提升30%以上。


· 使用filter之后进行coalesce操作

通常对一个RDD执行filter算子过滤掉RDD中较多数据后(比如30%以上的数据),建议使用coalesce算子,手动减少RDD的partition数量,将RDD中的数据压缩到更少的partition中去。因为filter之后,RDD的每个partition中都会有很多数据被过滤掉,此时如果照常进行后续的计算,其实每个task处理的partition中的数据量并不是很多,有一点资源浪费,而且此时处理的task越多,可能速度反而越慢。因此用coalesce减少partition数量,将RDD中的数据压缩到更少的partition之后,只要使用更少的task即可处理完所有的partition。在某些场景下,对于性能的提升会有一定的帮助。


· 使用repartitionAndSortWithinPartitions替代repartition与sort类操作

repartitionAndSortWithinPartitions是Spark官网推荐的一个算子,官方建议,如果需要在repartition重分区之后,还要进行排序,建议直接使用repartitionAndSortWithinPartitions算子。因为该算子可以一边进行重分区的shuffle操作,一边进行排序。shuffle与sort两个操作同时进行,比先shuffle再sort来说,性能可能是要高的。



/ 02 资源参数调优 /


 1  num-executors 

设置Spark作业总共要用多少个Executor进程来执行Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽可能按照你的设置来在集群的各个工作节点上,启动相应数量的Executor进程

每个Spark作业的运行一般据集群的规模设Executor进程比较合适,设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源


 2    executor-memory  

设置每个Executor进程的内存Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联


每个Executor进程的内存设置4G~8G较为合适,num-executors乘以executor-memory,是不能超过队列的最大内存量的,Spark集群可以设置每个executor最多使用的内存大小。如果你是跟团队里其他人共享这个资源队列,那么申请的内存量最好不要超过资源队列最大总内存的1/3~1/2


 3  executor-cores 


设置每个Executor进程的CPU core数量决定了每个Executor进程并行执行task线程的能力


数量设置为2~4个较为合适,依据资源队列的最大CPU Core限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到几个CPU Core


 4    driver-memory  

设置Driver进程的内存Driver的内存通常来说不设置,或者设置1G左右应该就够了


如果需要使用 collect 算子将RDD的数据全部拉取到Driver上进行处理,那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题


 5    spark.default.parallelism  

设置每个stage的默认task数量不去设置这个参数,那么Spark根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task,通常来说,Spark默认设置的数量是偏少的


设置该参数为num-executors * executor-cores的2~3倍较为合适如果task数量偏少的话,Executor进程可能根本就没有task执行,也就是白白浪费了资源


 6    spark.storage.memoryFraction  

设置RDD持久化数据在Executor内存中能占的比例,默认是0.6根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘


如果Spark作业中,有较多的RDD持久化操作,该参数的值可以适当提高一些如果Spark作业中的Shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适如果发现作业由于频繁的GC导致运行缓慢(通过Spark WebUI可以观察到作业的GC耗时),意味着Task执行用户代码的内存不够用,那么同样建议调低这个参数的值


 7    spark.shuffle.memoryFraction  

设置Shuffle过程中一个task拉取到上个Stage的Task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2Shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能


如果Spark作业中的RDD持久化操作较少,Shuffle操作较多时,建议降低持久化操作的内存占比,提高Shuffle操作的内存占比比例如果发现作业由于频繁的GC导致运行缓慢,意味着Task执行用户代码的内存不够用,那么同样建议调低这个参数的值。




🚀

 极链科技Video++ 
 聚焦于AI消费的科技公司 

👇🏻点击「阅读原文」查看更多深度技术好文