vlambda博客
学习文章列表

Spark基础:性能调优


往期链接:






大多数时候只要申请的内存足够,数据可以存储在内存中,一般瓶颈都在网络传输上;如果内存不足,则需要进行内存方面的调优。一般优化主要有两点:数据序列化和内存优化。其中前者不仅可以节省内存还能减轻网络传输的压力。

数据序列化

Spark支持两种序列化的库:

  1. java serialiation,默认spark使用java ObjectOutputStream框架进行序列化,你只需要实现 java.io.Serializable即可。你也可以通过扩展java.io.Externalizable接口提高性能。

  2. Kryo序列化,spark也可以使用kryo进行序列化,速度更快。Kryo要比Java序列化更快也更简洁,一般都要比Java快10倍,但是需要在代码中注册class。

可以在SparkConf中配置属性启用Kryo,唯一不使用kryo当做默认的原因就是因为它需要注册操作,从spark2开始,当RDD是简单类型如string时,会使用Kryo作为默认的序列化器。注册Kryo序列化的方法:

val conf = new SparkConf().setMaster(...).setAppName(...)
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
val sc = new SparkContext(conf)

如果你的对象很大,可以增加 spark.kryoserializer.buffer 大小。如果没有注册自定义的类,kryo也可以工作,但是对象存储的名称是全路径名,浪费空间。

内存优化

关于内存使用优化有三个必须考虑的点:对象使用的内存,使用对象的代价,垃圾回收的压力。默认java对象可以很快的使用,但是相对于原始数据却需要占用2-5倍的存储空间,这主要是因为以下几点:

  1. 每个Java对象都有对象头,大概16个字节,包含内部的指针信息(网上的资料普通对象8个字节,数组对象16个字节)。对于一个原始数据很小的对象,比如Int,很有可能对象头比原始数据还大。

  2. Java字符串相比原始数据多了40个字节,因为内部使用char数组保存,并且需要额外保存长度信息,存储每个character需要两个字节,由于java字符串内部使用UTF-16编码。因此10个字符的字符串就轻松消耗60个字节。

  3. 一般集合类,如HashMap和LinkedList,使用链表结构,都会在外面包装一层,比如Map.Entry。这个对象没有header信息,却有指针信息,指针一般占用8字节,指向内部的对象。

  4. 一些基本类型的集合需要进行装箱操作,比如java.lang.Integer。

内存管理

spark中的内存可以简单的分成两大类:执行和存储。执行内存涉及shuffle计算、join关联、sort排序、aggregation聚合等;存储包括cache缓存和网络传输。在spark中,执行内存和存储内存共享同一的存储空间M。当没有执行操作使用内存时,存储可以占用所有的内存。执行操作在必要的时候可以抢占存储的内存,但是会保留至少R的内存用来做存储。换句话说,R描述的是M中只为存储使用的内存大小。存储不会抢占执行的内存。

这种设计通过几个参数实现,首先应用如果不需要进行缓存,那么可以让execution使用全部的内存,避免不必要的磁盘溢出;其次,应用可以保留至少R的内存,这样可以存储一些不可变的数据避免被抢占。最后,为了方便使用用户不需要关心内部具体的内存使用分布。最后spark还是保留了两个参数,不过一般使用默认值就行,用户不需要手动调整:

  1. spark.memory.fraction 描述了M的比例,一般是 (JVM-300M)* 该比例,默认是0.6。剩余的40%用来存储用户数据结构、saprk内部的元数据、为了防止超大对象或稀疏对象时OOM预留的空间

  2. spark.memory.storageFraction 描述了R的比例,默认0.5,是M内存中为存储保留的内存空间,不会被执行操作抢占。

评估内存消耗

最好的评估数据集的内存使用方法就是创建一个RDD,然后缓存到内存中,在web UI中的存储界面查看内存占用情况。如果想要评估特定对象的内存占用,可以使用 SizeEstimator的 estimate方法,可以评估内存的使用,以及广播变量在executor中占用的内存。

优化数据结构

首先考虑的节省内存消耗的方法就是避免冗余的java特性,比如指针类型的数据结构和包装类,有下面几种方法:

  1. 使用对象数组,原始类型,代替标准的Java和scala集合框架,如hashMap。fastutil库可以提供简单类型的集合类,并且与java标准库兼容。

  2. 避免使用嵌套类型,内部包含大量小对象和指针

  3. 使用数字类型或者集合类型作为标识,代替字符串

  4. 如果低于32G的内存,可以设置JVM标记:-XX:+UseCompressedOops 使用4字节指针代替8字节。可以在spark-env.sh中配置。

序列化RDD存储

通过上面的方式如果还不能有效的减少内存使用,可以考虑进行序列化,使用序列化级别的存储方式,比如 MEMORY_ONLY_SER。Spark会使用一个超大的字节数组存储整个RDD的数据。序列化有一个缺点是会使应用程序变慢,使用的时候还需要反序列化。我们强烈推荐使用kryo进行序列化,这样可以比java序列化使用更少的内存。

垃圾回收优化

JVM垃圾回收是一个很复杂的问题,它不是简单的读取一个RDD在上面执行一个操作。Java为了移除旧的对象给新的对象保留空间,需要扫描所有的对象判断哪些不再使用。关键点是 垃圾回收方法的性能,因此使用简单对象数组代替LinkedList要大大降低垃圾回收的消耗,另一个更有效的方式是使用序列化存储数据,序列化后每个分区只有一个字节数组对象,在尝试使用其他技术之前,首先可以尝试的就是垃圾回收。

GC会影响节点上任务执行和数据存储,接下来讨论下RDD缓存的空间分配:

衡量GC的影响

首先GC优化需要搜集垃圾回收发生的频率信息,以及GC时使用的时间。可以通过 -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCTimeStamps 输出GC信息,详细参考spark配置。接下来启动spark应用,查看垃圾回收时输出的信息。注意这些信息将会在集群的工作节点上,而不是在driver程序上。

高级GC优化

为了优化垃圾回收,需要理解JVM中内存管理的基础知识: java对空间分割成 年轻代 和 年老代,年轻代用于存放刚创建的对象,年老代用于存放“生存时间”长的对象,比如经过多次垃圾回收还存活的对象。年轻代划分成三个部分:Eden、Survivor1、Survivor2 。垃圾回收流程:当Eden区满了之后,会触发minor GC,把Eden区和Survivor1中还存活的对象拷贝到Survivor2中。如果Survivor2存储下,则会存储到年老中;最后如果年老代满了,就会触发Full GC。

Spark中的GC优化主要的目的就是让长期存活的对象存储在年老中,年轻代中尽可能存储短期的对象。这样可以避免触发Full GC来搜集任务执行时的临时对象。可以按照下面的方法:

  1. 检查目前垃圾回收的GC信息,如果多次触发 Full GC,就意味着执行任务缺乏足够的内存。

  2. 如果有很多次minor GC而full GC次数比较少,可以多分配Eden区的内存。如果eden区配置成E,可以设置年轻代的大小为 -Xmn=4/3E。

  3. 在GC的输出信息中,如果年老代接近满了,降低 spark.memory.faction 来减少存储使用的内存。减少缓存对象优于把任务执行变慢。另外,可以考虑降低年轻代的大小,即降低 -Xmn 配置。如果前面没有配置,可以调整 NewRatio 参数,一般JVM默认都是2,意味着年老代占用 2/3 的内存。这部分的内存需要远远大于 spark.memory.faction 。

  4. 尝试使用 G1GC ,使用方法为 -XX:+UseG1GC,可以改善垃圾回收的瓶颈,注意仅当executor的内存很大时,适合用G1。可以通过 -XX:G1HeapRegionSize 配置G1中Region的大小。

  5. 如果读取hdfs的数据,可以通过hdfs中的block评估任务使用的内存大小。注意block经过压缩后大小一般比真实大小小2-3倍,因此如果你有3-4个任务,hdfs上的block大小为128M,那么总的内存消耗大概是 43*128M。

  6. 监控修改完配置后,通过监控观察垃圾回收的变化。executor上的GC配置可以通过 spark.executor.extraJavaOptions 控制。

其他

并行度等级

除非每个并行度都配置的非常高,不然集群不会全部利用起来。spark根据大小自动配置map任务的并行度,比如 SparkContext.textFile 中的参数,对于分布式的reduce操作,比如groupByKey和reduceByKey,将会使用比较大的分区数。你可以通过配置 spark.PairRDDFunction 中的第二个参数,或配置 spark.default.parallelism 改变默认值。一般建议是每个CPU核分配2-3个任务。

Reduce任务的内存使用

有时候,RDD无法存储在内存时会遇到OOM的问题,由于任务中的某一个遇到很大的数据。Spark的shuffle操作,如sortByKey, groupByKey, reduceByKey, join等,会为每个任务构建hash表优化分组。最简单的方法是增加并行度,这样每个task获取的输入结果会比较小。Spark在200ms以内的任务运行效率比较高,因为它可以重用JVM,降低任务启动的开销,因此你可以在集群中增加远远多于核数的并行数量。

广播大变量

在SparkContext中使用广播可以节省每个任务的内存大小,减少在群中启动任务消耗的性能。如果你的任务使用了很大的对象,driver程序内部使用的lookup表,可以把它存储到广播变量。Spark会在master输出大小,因此可以查看并且决定任务是否很大,如果超过20kB,就可以考虑。

数据本地性

数据本地性对spark任务有很大的影响,数据和代码在一起性能会更快,如果数据和代码分离,就需要把其中一个移动向另一个。一般移动代码都会比较快,因为代码的大小要远远比数据小得多。Spark根据数据本地性的准则来进行不同的调度。数据本地性描述了数据和代码的关系,从上到下速度越来越慢:

  1. PROCESS_LOCAL 数据和代码在同一JVM中,性能最佳

  2. NODE_LOCAL 数据在同一个节点,比如在HDFS中的某个节点,或者相同节点中的另一个executor。比PROCESS_LOCAL慢,因为需要在不同的进程中拷贝数据。

  3. NO_PREF 数据读取比较快,没有本地性

  4. RACK_LOCAL 数据在相同的机架上,不同的机架有可能需要通过网关,如交换机

  5. ANY 数据在网络中的任意地方,而非同一机架

Spark 倾向于选择最好的本地性分配任务,但是有时候这并不重要,比如在空闲的 executor 上未处理的数据,spark会降低它的优先级。有两种办法:等待 CPU 释放开启任务;立即从另一台机器开启任务。Spark 一般会尝试等待繁忙的 CPU,但是一旦超时,就会使用其他 executor 空闲的 CPU。每个等级的本地性都有专门的超时参数控制,参考配置篇的 spark.locality。