反思Spark将数据导出到Kafka的方案
假设有N个结构化的文本文件,通过Spark任务接入后发往kafka集群。
目前大数据平台中利用RDD foreachPartition方法,在各个partition中构造KafkaProducer对象,最终利用KafkaProducer.send方法将数据发送出去,在partition数据处理结束后,便释放连接。
代码形如:
rdd.foreachPartition(partion => {
val kafkaPro = new KafkaProducer[String, String](properties)
partion.foreach({ elem =>
val promsg = new ProducerRecord[String, String](topic, null, elem)
kafkaPro.send(promsg)
})
kafkaPro.close()
})
一般来说,Spark任务数由文件个数决定(这边就不展开了)。在本次测试场景中,上述代码会构造出N个KafkaProducer,且KafkaProducer会和所有的kafka broker构造TCP连接,因此上述程序会构造出N*brokers个连接。
2、解决方案
是否可以在driver端构造KafkaProducer,然后利用spark broadcast方法广播到executor端呢?(从KafkaProducer源码来看,KafkaProducer不具备序列化的条件。)
public class KafkaProducer<K, V> implements Producer<K, V>
public interface Producer<K, V> extends Closeable
但可以通过Scala 懒值方式让KafkaProducer间接广播到executor端,代码如下:
OfflineKafkaSink类封装了KafkaProducer的构造过程。在Spark广播OfflineKafkaSink对象时,并没有初始化KafkaProducer,因此该对象是可以广播到executor的,从而保证在一个executor上,仅仅会构造一个KafkaProducer对象。
// @SerialVersionUID相当于混入了Serializable接口
@SerialVersionUID(138518511865139159L)
class OfflineKafkaSink[K, V](createProducer: () => KafkaProducer[K, V]) {
// 使用lazy,让创建KafkaProducer的过程在executor端执行.
lazy val kafkaProducer = createProducer()
def send(topic: String, key: K, value: V) = {
kafkaProducer.send(new ProducerRecord[K, V](topic, key, value))
}
}
object OfflineKafkaSink {
def apply[K, V](config: Properties): OfflineKafkaSink[K, V] = {
val createProducer = () => {
val producer = new KafkaProducer[K, V](config)
// executor在关闭时,会执行KafkaProducer.close代码,关闭资源.
sys.addShutdownHook {
producer.close()
}
producer
}
new OfflineKafkaSink(createProducer)
}
}
广播OfflineKafkaSink对象
val offlineKafkaSink = SparkContext.broadcast(OfflineKafkaSink[String,String](config))
发送数据
rdd.foreachPartition(partion => {
println(offlineKafkaSink.value.toString)
partion.foreach({ elem =>
// 为了让数据可以均匀的分配到topic的分区上,key置为null.
offlineKafkaSink.value.send(topic, null, elem)
})
})
3、测试结果
在executor 6上运行了两个task,打开对应的container日志,找到如下日志:
task index 0
2020-12-02 15:59:31,370 | INFO | [Executor task launch worker for task 1] | kafka producer is :org.apache.kafka.clients.producer.KafkaProducer@464fdcdc | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
task index 5
2020-12-02 15:59:31,370 | INFO | [Executor task launch worker for task 1] | kafka producer is :org.apache.kafka.clients.producer.KafkaProducer@464fdcdc | org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54)
可知,不同task在同一个executor上,会共用一个KafkaProducer对象,确实可以减少kafka broker的连接数。
4、新方案的缺点
新方案虽然可以减少spark 和kafka broker的连接数,但KafkaProducer仅在executor关闭时才会执行资源释放动作,因此在以下两种场景下,依然存在问题:
-
Spark使用静态资源分配,则executor可能长时间都不会退出,那么Kafka链接就永远不会被释放。 -
业务流程定时运行时,变量是反复广播的,在executor端也会不断的创建连接,此种情况下并未复用KafkaProducer。
5、结论
鉴于大数据平台目前采用yarn client方式,多个离线任务是复用driver/sparkContext的,application长时间运行时存在连接不释放的问题,因此新方案并不适用我司平台。