vlambda博客
学习文章列表

一次JVM GC引发的Spark调优大全(建议收藏)

一般在我们开发spark程序的时候,从代码开发到上线以及后期的维护中,在整个过程中都需要涉及到调优的问题,即一开始需要考虑如何把代码写的更简洁高效调优(即代码优化),待开发测试完成后,提交任务时综合考量该任务所需的资源(这里涉及到资源调优),上线后是否会出现数据倾斜问题(即倾斜调优),以及是否出现频繁GC问题(这里涉及到GC调优)。

那么本篇通过反推的模式,即通过GC调优进行延伸扩展,比如出现GC问题是不是可能出现了倾斜?如果没有出现倾斜,是不是我们给的资源不足?如果资源充足的话,那么是不是我们代码写的有问题呢(比如频繁创建对象等操作)?按照这样一个思路展开来总结spark的调优。

JVM的堆、栈、方法区

如上图所示,JVM主要由类加载器系统、运行时数据区、执行引擎和本地接口等组成。

其中运行时数据区又由方法区、堆、Java栈、PC寄存器、本地方法栈组成。

当JVM加载一个class文件后,class中的参数、类型等信息会存储到方法区中,程序运行时所创建的对象存储在堆中(堆中不放基本类型和对象引用,只存放对象本身)。当每个新线程启动时,会有自己的程序计数器(Program Counter Register)和栈,当线程调用方法时,程序计数器表明下一条执行的指令,同时线程栈会存储线程的方法调用状态(包括局部变量、被调用的参数、中间结果等)。本地方法调用存储在独立的本地方法栈中,或其他独立的内存区域中。

栈区由栈桢组成,每个栈桢就是每个调用的方法的栈,当方法调用结束后,JVM会弹栈,即抛弃此方法的栈桢。

JVM内存划分

一次JVM GC引发的Spark调优大全(建议收藏)上图中的划分是基于JDK7和JDK8,其中有一些变动(主要是永久代的移除)。

JVM内存从大体上划分为三部分:年轻代、老年代、永久代(元空间)

年轻代:所有新生成的对象都会先放到年轻代,年轻代又分为三个区:Eden区、两个Survivor,三者之间的比例为8:1:1。

  Eden区:大部分对象会在该区生成,当在Eden区申请空间失败后,会触发Scavenge GC,对Eden区进行GC,清除非存活对象,并把还存活的对象复制到其中一个Survivor区中。这里可能会有一个问题,由于默认情况下Eden:Survivor1:Survivor2的内存占比是8:1:1,如果存活下来的对象是1.5,一个Survivor区域放不下,那么这个时候就会利用JVM的担保机制,将多余的对象直接放入老年代,会出现老年代囤积一大堆短生命周期的,导致老年代频繁溢满,频繁进行Full GC去回收老年代中的对象

  Survivor区:当Eden区满后,会把还存活的对象复制到其中一个S区中,且两个S区之间没有先后顺序关系,同时根据程序需要Survivor区是可以配置多个的,这样可以增加对象在年轻代存在的时间,减少被放到老年代的可能。JVM每次只会使用Eden和其中一块Survivor区域来为对象服务,所以无论什么时候总会有一块Survivor区域是空闲的,也就是说年轻代实际可用的内存空间为9/10的年轻代空间。

老年代:在年轻代中经历了N次GC之后仍然存活的对象,就会被放到老年代中。该区域通常存放一些生命周期较长的对象。默认情况下,年轻代和老年代的比值为1:2,即老年代占用堆空间大小的2/3,当然这个值可以通过-XX:NewRation来调整

持久代:主要存放静态文件、Java类、方法等。在Java 8中该区域已经被移除了,开始使用本地化的内存来存放类的元数据,也称之元空间

JVM GC

一次JVM GC引发的Spark调优大全(建议收藏)JVM主要管理两种类型的内存:堆和非堆,简单来说,堆就是Java代码可及的内存,是留给开发人员用的,非堆就是JVM留给自己用的。

  1. 当Eden满了之后,一个小型的GC就会被触发(Minor GC),Eden和Survivor1中幸存仍被使用的对象被复制到Survivor2。
  2. Survivor1和Survivor2区域进行交换,当一个对象生存的时间足够长或者Survivor2满了之后,就会被转移到Old代
  3. 当Old空间快满的时候,这个时候会进行Full GC

一般以下几种情况可能会导致Full GC:

  1. 当Old空间被写满时
  2. System.GC()被显式调用
  3. 上一次GC之后,Heap的各个区域分配策略动态变化

以上简单说明一下jvm相关知识点,其实spark GC的目的就是要确保老年代只保存长生命周期RDD,同时年轻代的空间又能够保存短生命周期的对象,这样就能避免启动Full GC

Spark对JVM的使用

一次JVM GC引发的Spark调优大全(建议收藏)基于上篇Tungsten on spark 文章的整理,Executor对内存的使用主要有以下几个部分:

  1. RDD存储。当对RDD调用persist或Cache方法时,RDD的partitons会被存储到内存里,那么这块内存也就是Storage内存。
  2. Shuffle操作。当发生Shuffle时,需要缓冲区来存储Shuffle的输出和聚合的中间结果,该块内存称之为Execution内存。
  3. 用户代码。用户编写的代码能够使用的内存空间,也就是其他内存(用户内存)

在统一内存模式下,整个堆空间分为Spark Memory和User Memory,其中Spark Memory包括Storage Memory和Execution Memory,而且两者之间可以互相借用空间。

通过spark.memory.fraction参数来控制Spark Memory在整个堆空间所占的比例

通过spark.memory.storageFraction来设置Storage Memory占Spark Memory的比例,如果Spark作业中有较多的RDD持久化操作,该参数值可以适当调高,保证持久化的数据能够容纳在内存中,避免内存不够缓存所有的数据,只能写入磁盘中,降低性能。如果Spark作业中Shuffle类操作比较多,持久化类操作比较少,那么可以适当降低该参数值。

这里给出一个实际的例子来说明一下spark是如何分配内存的

/usr/local/spark-current/bin/spark-submit \
--master yarn \
--deploy-mode client \
--executor-memory 1G \
--queue root.default \
--class my.Application \
--conf spark.ui.port=4052 \
--conf spark.port.maxRetries=100 \
--num-executors 2 \
--jars mongo-spark-connector_2.11-2.3.1.jar \
App.jar 20201118000000

#
 这里配置两个Executor,每个Executor内存给1G

一次JVM GC引发的Spark调优大全(建议收藏)如图所示,spark申请到了两个Executor,每个Executor得到的Storage Memory内存分别为384.1MB(注意:这里Storage Memory其实就是Storage+Execution的总和内存),这里有一个疑惑,我们分配的是每个Executor内存为1G,为什么只得到384MB呢?这里给出具体的计算公式:

  1. 我们申请为1G内存,但是真正拿到内存会比这个少,这里涉及到一个Runtime.getRuntime.maxMemory 值的计算(在上篇文章中关于UnifiedMemoryManager源码分析中提到过),Runtime.getRuntime.maxMemory对应的值才是程序能够使用的最大内存,上面也提到了堆划分了Eden,Survivor,Tenured区域,所以该值计算公式为:

    ExecutorMemory =  Eden + 2 * Survivor + Tenured = 1GB  = 1073741824 字节

    systemMemory = Runtime.getRuntime.maxMemory  = Eden + Survivor + Tenured = 954437176.888888888888889 字节

    //org.apache.spark.memory.UnifiedMemoryManager(这里讨论的还是动态内存模型)
    private def getMaxMemory(conf: SparkConf): Long = {
      val systemMemory = conf.getLong("spark.testing.memory"Runtime.getRuntime.maxMemory)
      val reservedMemory = conf.getLong("spark.testing.reservedMemory",
            if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
      val usableMemory = systemMemory - reservedMemory
      val memoryFraction = conf.getDouble("spark.memory.fraction"0.6)
      
      //这里即获取最大的内存值
      (usableMemory * memoryFraction).toLong
    }
  2. 基于Spark的动态内存模型设计,其中有300MB的预留内存,因此剩余可用内存为总申请得到的内存-预留内存

    reservedMemory = 300MB = 314572800字节

    usableMemory = systemMemory - reservedMemory = 954437176.888888888888889 -  314572800 = 639864376.888888888888889字节

  3. Spark Web UI界面上虽然显示的是Storage Memory,但其实是Execution+Storage内存,即该部分占用60%比例

    Storage + Execution = usableMemory * 0.6 = 639864376.888888888888889 * 0.6 = 383918626.133333333333333 字节

  4. 通过第三步骤即可看出实际的内存分配情况了,注意:web ui界面得到的结果计算是除于1000转换得到的值。一次JVM GC引发的Spark调优大全(建议收藏)

GC调优步骤

  1. 统计一下GC启动的频率和GC使用的总时间,即在spark-submit提交的时候设置参数即可一次JVM GC引发的Spark调优大全(建议收藏)如图所示,这里提高了spark.memory.fraction参数值,则每个Exectuor实际可用的内存也随之增加了.

    /usr/local/spark-current/bin/spark-submit \
    --master yarn \
    --deploy-mode client \
    --executor-memory 1G \
    --driver-memory 1G \
    --queue root.default \
    --class my.Application \
    --conf spark.ui.port=4052 \
    --conf spark.port.maxRetries=100 \
    --num-executors 2 \
    --jars mongo-spark-connector_2.11-2.3.1.jar \
    --conf "spark.executor.extraJavaOptions=-XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
    --conf spark.memory.fraction=0.8 \
    App.jar

一次JVM GC引发的Spark调优大全(建议收藏)如图所示,出现了多次Full GC,首先考虑的是可能配置的Executor内存较低,这个时候需要增加Executor Memory来调节。

  1. 检查GC日志中是否有过于频繁的GC。如果一个任务结束前,Full GC执行多次,说明老年代空间被占满了,那么有可能是没有分配足够的内存。

    1.调整executor的内存,配置参数executor-memory
    2.调整老年代所占比例:配置-XX:NewRatio的比例值
    3.降低spark.memory.storageFraction减少用于缓存的空间
  2. 如果有太多Minor GC,但是Full GC不多,可以给Eden分配更多的内存.

    1.比如Eden代的内存需求量为E,可以设置Young代的内存为-Xmn=4/3*E,设置该值也会导致Survivor区域扩张
    2.调整Eden在年轻代所占的比例,配置-XX:SurvivorRatio的比例值
  3. 调整垃圾回收器,通常使用G1GC,即配置-XX:+UseG1GC。当Executor的堆空间比较大时,可以提升G1 region size(-XX:G1HeapRegionSize)

    /usr/local/spark-current/bin/spark-submit \
    --master yarn \
    --deploy-mode client \
    --executor-memory 1G \
    --driver-memory 1G \
    --queue root.default \
    --class my.Application \
    --conf spark.ui.port=4052 \
    --conf spark.port.maxRetries=100 \
    --num-executors 2 \
    --jars mongo-spark-connector_2.11-2.3.1.jar \
    --conf "spark.executor.extraJavaOptions=-XX:+UseG1GC -XX:G1HeapRegionSize=16M -XX:+PrintGCDetails -XX:+PrintGCTimeStamps" \
    --conf spark.memory.fraction=0.8 \
    App.jar
  4. 优化代码,尽量多使用array和string,并使用kyro序列,让每个Partition都成为字节数组

  5. 结合实际的需求,调整缓存和shuffle计算所占的内存比例,即当代码中出现shuffle类操作比较多,而不需要太多缓存的话,则可以适当降低Storage Memory所占比例;当缓存操作比较多,而Shuffle类操作比较少的话,可以适当调低Execution Memory所占比例。主要是通过spark.storage.storageFraction来控制

  6. 开启堆外内存,设置堆外内存大小,这里为了避免OOM

    spark.memory.offHeap.size=4G
    spark.memory.offHeap.enabled=true

注意:这里需要说明一下spark.executor.memoryOverhead 和spark.memory.offHeap.size之间的区别

spark.executor.memoryOverhead是属于JVM堆外内存,用于JVM自身的开销、内部的字符串还有一些本地开销,spark不会对这块内存进行管理。默认大小为ExecutorMemory的10%,在spark2.4.5之前,该参数的值应该包含spark.memory.offHeap.size的值。比如spark.memory.offHeap.size配置500M,spark.executor.memoryOverhead默认为384M,那么memoryOverhead的值应该为884M。

//spark2.4.5之前的
// Executor memory in MB.
protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt

// Additional memory overhead.
protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
  math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt
protected val pysparkWorkerMemory: Int = if (sparkConf.get(IS_PYTHON_APP)) {
  sparkConf.get(PYSPARK_EXECUTOR_MEMORY).map(_.toInt).getOrElse(0)
else {
  0
}

// Resource capability requested for each executors
private[yarn] val resource = Resource.newInstance(
  executorMemory + memoryOverhead + pysparkWorkerMemory,
  executorCores)

//由于memoryOverHead的参数值理解起来比较困难,而且不易于用户对每个特定的内存区域进行自定义配置,所以在Spark3.0之后进行了拆分
//spark3.0之后的资源申请更改为
private[yarn] val resource: Resource = {
    val resource = Resource.newInstance(
      executorMemory + executorOffHeapMemory + memoryOverhead + pysparkWorkerMemory, executorCores)
    ResourceRequestHelper.setResourceRequests(executorResourceRequests, resource)
    logDebug(s"Created resource capability: $resource")
    resource
  }

spark.memory.offHeap.size这个参数指定的内存(广义上是指所有堆外的),这部分内存的申请和释放是直接进行的,不由JVM管理,所以这块是没有GC的。

倾斜调优

倾斜部分的调优可以阅读下面两篇文章,相对来说已经比较全了



开发调优

相信有很多读者应该非常熟悉以下这几种使用姿势了,这里就不再重复详细说明了

  1. 避免创建重复的RDD

  2. 尽可能复用同一个RDD

  3. 对多次使用的RDD进行持久化

  4. 尽量避免使用Shuffle算子

  5. 使用map-side预聚合的shuffle操作

  6. 使用高性能的算子

    6.1: 使用reduceByKey/aggregateByKey替代groupByKey

    6.2: 使用mapPartitions替代普通map

    6.3: 使用foreachPartitions替代foreach

    6.4: 使用filter之后进行coalesce操作

    6.5: 使用repartitionAndSortWithinPartitions替代repartition与sort类操作

  7. 广播大变量

    val list1 = ...
    val list1Broadcast = sc.broadcast(list1)
    rdd1.map(list1Broadcast...)
  8. 使用kryo优化序列化性能

    // 创建SparkConf对象
    conf.set("spark.serializer""org.apache.spark.serializer.KryoSerializer")

    // 设置序列化器为KryoSerializer。
    conf.set("spark.serializer""org.apache.spark.serializer.KryoSerializer")

    // 注册要序列化的自定义类型。
    conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
  9. 优化数据结构,尽量使用字符串代替对象,使用原始类型(如int,Long)代替字符串,使用数组代替集合类型

资源参数调优

众所周知,引起GC主要是内存资源问题,一般情况下是不需要对GC进行调优的。当出现GC问题时,那么就需要思考是哪个环节造成内存紧张。首先想到的应该是配置的内存不足,直接加资源,这里整理了一些配置参数,仅供读者参考。

应用行为 属性名 默认值 属性描述 生效版本
driver行为 spark.driver.cores 1 driver程序运行需要的cpu内核数 1.3.0
driver行为 spark.driver.maxResultSize 1G 每个Spark action(如collect)所有分区的序列化结果的总大小限制。设置的值应该不小于1m,0代表没有限制。如果总大小超过这个限制,程序将会终止。大的限制值可能导致driver出现内存溢出错误(依赖于spark.driver.memory和JVM中对象的内存消耗) 1.2.0
driver行为 spark.driver.memory 1G driver进程使用的内存数 1.1.1
driver行为 spark.driver.memoryOverhead driverMemory * 0.10,with minimum of 384 driver端分配的堆外内存 2.3.0
driver行为 spark.driver.extraClassPath None 附加到driver的classpath的额外的classpath实体 1.0.0
driver行为 spark.driver.defaultJavaOptions None 默认传递给driver的JVM选项字符串。注意这个配置不能直接在代码中使用SparkConf来设置,因为这个时候driver JVM已经启动了,可以在命令行通过--driver-java-options参数来设置 3.0.0
driver行为 spark.driver.extraJavaOptions None 传递给driver的JVM选项字符串。例如GC设置或者其它日志设置。注意,在这个选项中设置Spark属性或者堆大小是不合法的。Spark属性需要用--driver-class-path设置 1.0.0
driver行为 spark.driver.extraLibraryPath None 指定启动driver的JVM时用到的库路径 1.0.0
driver行为 spark.driver.userClassPathFirst false 当在driver中加载类时,是否用户添加的jar比Spark自己的jar优先级高。这个属性可以降低Spark依赖和用户依赖的冲突,现在还是一个实验性的特征 1.3.0
executor行为 spark.executor.memory 1G 每个executor进程使用的内存数 0.7.0
executor行为 spark.executor.memoryOverhead executorMemory * 0.10, with minimum of 384 Executor JVM堆外内存设置,用于解决JVM开销,内部字符串,其他本机开销等问题 2.3.0
executor行为 spark.executor.extraClassPath None 附加到executors的classpath的额外的classpath实体。这个设置存在的主要目的是Spark与旧版本的向后兼容问题。用户一般不用设置这个选项 1.0.0
executor行为 spark.executor.defaultJavaOptions None 默认的JVM选项,以附加到spark.executor.extraJavaOptions 3.0.0
executor行为 spark.executor.extraJavaOptions None 传递给executors的JVM选项字符串。例如GC设置或者其它日志设置。注意,在这个选项中设置Spark属性或者堆大小是不合法的。Spark属性需要用SparkConf对象或者spark-submit脚本用到的spark-defaults.conf文件设置。堆内存可以通过spark.executor.memory设置 1.0.0
executor行为 spark.executor.extraLibraryPath None 指定启动executor的JVM时用到的库路径 1.0.0
executor行为 spark.executor.userClassPathFirst false (实验性)与spark.driver.userClassPathFirst相同的功能,但应用于执行程序实例. 1.3.0
executor行为 spark.executor.cores 1 每个executor使用的核数 1.0.0
executor行为 spark.default.parallelism 本地模式:机器核数;Mesos:8;其他:max(executor的core,2) 默认并行度 0.5.0
shuffle行为 spark.reducer.maxSizeInFlight 48m 从每个reduce中获取的最大容量,该参数值如果过低时,会导致Shuffle过程中产生的数据溢出到磁盘 1.4.0
shuffle行为 spark.reducer.maxReqsInFlight Int.MaxValue 此配置限制了获取块的远程请求的数量 2.0.0
shuffle行为 spark.reducer.maxBlocksInFlightPerAddress Int.MaxValue 该配置限制了reduce任务从其他机器获取远程块的数量 2.2.1
shuffle行为 spark.shuffle.compress true 是否压缩map操作的输出文件 0.6.0
shuffle行为 spark.shuffle.file.buffer 32k 每个shuffle文件输出缓存的大小 1.4.0
shuffle行为 spark.shuffle.io.maxRetries 3 (Netty only)自动重试次数 1.2.0
shuffle行为 spark.shuffle.io.numConnectionsPerPeer 1 (Netty only)机器之间的连接复用 1.2.1
shuffle行为 spark.shuffle.io.preferDirectBufs true (Netty only)直接堆外内存,用于减少随机和高速缓存块传输期间的GC 1.2.0
shuffle行为 spark.shuffle.io.retryWait 5s (Netty only)重试提取之间要等待多长时间;默认情况下重试导致的最大延迟为15s 1.2.1
shuffle行为 spark.shuffle.service.enabled false 启用外部shuffle服务 1.2.0
shuffle行为 spark.shuffle.service.index.cache.size 100m 缓存条目限制为指定的内存占用,以字节为单位 2.3.0
shuffle行为 spark.shuffle.sort.bypassMergeThreshold 200 如果shuffle map task的数量小于这个阀值200,且不是聚合类的shuffle算子(比如reduceByKey),则不会进行排序 1.1.1
shuffle行为 spark.shuffle.spill.compress true 在shuffle时,是否将spilling的数据压缩。压缩算法通过spark.io.compression.codec指定 0.9.0
shuffle行为 spark.shuffle.accurateBlockThreshold 100 * 1024 * 1024 高于该阈值时,HighlyCompressedMapStatus中的混洗块的大小将被准确记录。通过避免在获取随机块时低估随机块的大小,有助于防止OOM 2.2.1
shuffle行为 spark.shuffle.registration.timeout 5000 注册到外部shuffle服务的超时时间 2.3.0
shuffle行为 spark.shuffle.registration.maxAttempts 3 注册到外部shuffle服务的重试次数 2.3.0
压缩序列化 spark.broadcast.compress true 是否压缩广播变量 0.6.0
压缩序列化 spark.checkpoint.compress false 是否开启RDD压缩checkpoint 2.2.0
压缩序列化 spark.io.compression.codec lz4 RDD压缩方式org.apache.spark.io.LZ4CompressionCodec, org.apache.spark.io.LZFCompressionCodec, org.apache.spark.io.SnappyCompressionCodec, and org.apache.spark.io.ZStdCompressionCodec. 0.8.0
压缩序列化 spark.io.compression.lz4.blockSize 32k LZ4压缩中使用的块大小 1.4.0
压缩序列化 spark.io.compression.snappy.blockSize 32k Snappy压缩中使用的块大小 1.4.0
压缩序列化 spark.kryo.classesToRegister None 如果你用Kryo序列化,给定的用逗号分隔的自定义类名列表表示要注册的类 1.2.0
压缩序列化 spark.kryo.registrator None 如果你用Kryo序列化,设置这个类去注册你的自定义类。如果你需要用自定义的方式注册你的类,那么这个属性是有用的。否则spark.kryo.classesToRegister会更简单。它应该设置一个继承自KryoRegistrator的类 0.5.0
压缩序列化 spark.kryo.registrationRequired false 是否需要注册为Kyro可用 1.1.0
压缩序列化 spark.kryoserializer.buffer.max 64m Kryo序列化缓存允许的最大值 1.4.0
压缩序列化 spark.kryoserializer.buffer 64k Kyro序列化缓存的大小 1.4.0
压缩序列化 spark.rdd.compress False 是否压缩序列化的RDD分区 0.6.0
压缩序列化 spark.serializer org.apache.spark.serializer.
JavaSerializer
序列化对象使用的类 0.5.0
压缩序列化 spark.serializer.objectStreamReset 100 当用org.apache.spark.serializer.JavaSerializer序列化时,序列化器通过缓存对象防止写多余的数据,然而这会造成这些对象的垃圾回收停止。通过请求’reset’,你从序列化器中flush这些信息并允许收集老的数据。为了关闭这个周期性的reset,你可以将值设为-1。默认情况下,每一百个对象reset一次 1.0.0
动态分配 spark.dynamicAllocation.enabled false 是否开启动态分配 1.2.0
动态分配 spark.dynamicAllocation.executorIdleTimeout 60s 当某个executor空间超过该值时,则会remove掉该executor 1.2.0
动态分配 spark.dynamicAllocation.cachedExecutorIdleTimeout infinity 当executor内有缓存数据并且空闲了该值后,则remove掉该executor 1.4.0
动态分配 spark.dynamicAllocation.initialExecutors spark.dynamicAllocation.minExecutors 初始executor数量,默认和executor数量一样 1.3.0
动态分配 spark.dynamicAllocation.maxExecutors infinity executor上限,默认无限制 1.2.0
动态分配 spark.dynamicAllocation.minExecutors 0 executor下限,默认是0个 1.2.0
动态分配 spark.dynamicAllocation.executorAllocationRatio 1 默认情况下,动态分配将要求足够的执行者根据要处理的任务数量最大化并行性。虽然这可以最大程度地减少作业的等待时间,但是对于小型任务,此设置可能会由于执行程序分配开销而浪费大量资源,因为某些执行程序甚至可能无法执行任何工作。此设置允许设置一个比率,该比率将用于减少执行程序的数量。完全并行。默认为1.0以提供最大的并行度。0.5将执行者的目标数量除以2由dynamicAllocation计算的执行者的目标数量仍然可以被spark.dynamicAllocation.minExecutors和spark.dynamicAllocation.maxExecutors设置覆盖 2.4.0
动态分配 spark.dynamicAllocation.schedulerBacklogTimeout 1s 如果启用了动态分配,并且有待解决的任务积压的时间超过了此期限,则将请求新的执行者。 1.2.0
动态分配 spark.dynamicAllocation.sustainedSchedulerBacklogTimeout schedulerBacklogTimeout 与spark.dynamicAllocation.schedulerBacklogTimeout相同,但仅用于后续执行程序请求 1.2.0
动态分配 spark.dynamicAllocation.shuffleTracking.enabled false 实验功能。为执行程序启用随机文件跟踪,从而无需外部随机服务即可动态分配。此选项将尝试保持为活动作业存储随机数据的执行程序 3.0.0
动态分配 spark.dynamicAllocation.shuffleTracking.timeout infinity 启用随机跟踪时,控制保存随机数据的执行程序的超时。默认值意味着Spark将依靠垃圾回收中的shuffle来释放执行程序。如果由于某种原因垃圾回收无法足够快地清理随机数据,则此选项可用于控制执行者何时超时,即使它们正在存储随机数据。 3.0.0


--end--


扫描下方二维码
添加好友,备注【 交流
可私聊交流,也可进资源丰富学习群