vlambda博客
学习文章列表

​反思Spark将数据导出到Kafka的方案

1、背景

假设有N个结构化的文本文件,通过Spark任务接入后发往kafka集群。

目前大数据平台中利用RDD foreachPartition方法,在各个partition中构造KafkaProducer对象,最终利用KafkaProducer.send方法将数据发送出去,在partition数据处理结束后,便释放连接。

代码形如:

rdd.foreachPartition(partion => {
       val kafkaPro = new KafkaProducer[StringString](properties)
       partion.foreach({ elem =>
         val promsg = new ProducerRecord[StringString](topic, null, elem)
         kafkaPro.send(promsg)
       })
       kafkaPro.close()
     })

一般来说,Spark任务数由文件个数决定(这边就不展开了)。在本次测试场景中,上述代码会构造出N个KafkaProducer,且KafkaProducer会和所有的kafka broker构造TCP连接,因此上述程序会构造出N*brokers个连接。

2、解决方案

是否可以在driver端构造KafkaProducer,然后利用spark broadcast方法广播到executor端呢?(从KafkaProducer源码来看,KafkaProducer不具备序列化的条件。)

public class KafkaProducer<KVimplements Producer<KV>
public interface Producer<KVextends Closeable

但可以通过Scala 懒值方式让KafkaProducer间接广播到executor端,代码如下:

OfflineKafkaSink类封装了KafkaProducer的构造过程。在Spark广播OfflineKafkaSink对象时,并没有初始化KafkaProducer,因此该对象是可以广播到executor的,从而保证在一个executor上,仅仅会构造一个KafkaProducer对象。

// @SerialVersionUID相当于混入了Serializable接口
@SerialVersionUID(138518511865139159L)
class OfflineKafkaSink[KV](createProducer: (=> KafkaProducer[KV]) {
  // 使用lazy,让创建KafkaProducer的过程在executor端执行.
  lazy val kafkaProducer = createProducer()
  def send(topic: String, key: K, value: V) = {
    kafkaProducer.send(new ProducerRecord[KV](topic, key, value))
  }
}

object OfflineKafkaSink {
  def apply[KV](config: Properties): OfflineKafkaSink[KV] = {
    val createProducer = () => {
      val producer = new KafkaProducer[KV](config)
      // executor在关闭时,会执行KafkaProducer.close代码,关闭资源.
      sys.addShutdownHook {
        producer.close()
      }
      producer
    }
    new OfflineKafkaSink(createProducer)
  }
}

广播OfflineKafkaSink对象

val offlineKafkaSink = SparkContext.broadcast(OfflineKafkaSink[String,String](config))

发送数据

rdd.foreachPartition(partion => {
    println(offlineKafkaSink.value.toString)
    partion.foreach({ elem =>
      // 为了让数据可以均匀的分配到topic的分区上,key置为null.
      offlineKafkaSink.value.send(topic, null, elem)
    })
})

3、测试结果

在executor 6上运行了两个task,打开对应的container日志,找到如下日志:

task index 0
2020-12-02 15:59:31,370 | INFO  | [Executor task launch worker for task 1] | kafka producer is :org.apache.kafka.clients.producer.KafkaProducer@464fdcdc | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
task index 5
2020-12-02 15:59:31,370 | INFO  | [Executor task launch worker for task 1] | kafka producer is :org.apache.kafka.clients.producer.KafkaProducer@464fdcdc | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)

可知,不同task在同一个executor上,会共用一个KafkaProducer对象,确实可以减少kafka broker的连接数。

4、新方案的缺点

新方案虽然可以减少spark 和kafka broker的连接数,但KafkaProducer仅在executor关闭时才会执行资源释放动作,因此在以下两种场景下,依然存在问题:

  • Spark使用静态资源分配,则executor可能长时间都不会退出,那么Kafka链接就永远不会被释放。
  • 业务流程定时运行时,变量是反复广播的,在executor端也会不断的创建连接,此种情况下并未复用KafkaProducer。

5、结论

鉴于大数据平台目前采用yarn client方式,多个离线任务是复用driver/sparkContext的,application长时间运行时存在连接不释放的问题,因此新方案并不适用我司平台。