spark改七行源码实现高效处理kafka数据积压
1. 劳力士
2.常见积压问题
3.浪尖的骚操作
override def getPartitions: Array[Partition] = {
offsetRanges.zipWithIndex.map { case (o, i) =>
new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset)
}.toArray
}
val offsetRanges: Array[OffsetRange],
OffsetRange存储一个kafka分区元数据及其offset范围,然后进行map操作,转化为KafkaRDDPartition。实际上,我们可以在这里下手,将map改为flatmap,然后对offsetrange的范围进行拆分,但是这个会引发一个问题,浪尖在这里就不赘述了,你可以测测。
是否开启自动重分区分区
sparkConf.set("enable.auto.repartition","true")
避免不必要的重分区操作,增加个阈值,只有该批次要消费的kafka的分区内数据大于该阈值才进行拆分
sparkConf.set("per.partition.offsetrange.threshold","300")
拆分后,每个kafkardd 的分区数据量。
sparkConf.set("per.partition.after.partition.size","100")
val repartitionStep = _ssc.conf.getInt("per.partition.offsetrange.size",1000)
val repartitionThreshold = _ssc.conf.getLong("per.partition.offsetrange.threshold",1000)
val enableRepartition = _ssc.conf.getBoolean("enable.auto.repartition",false)
val offsetRanges = untilOffsets.flatMap{ case (tp, uo) =>
val fo = currentOffsets(tp)
val delta = uo -fo
if(enableRepartition&&(repartitionThreshold < delta)){
val offsets = fo to uo by repartitionStep
offsets.map(each =>{
val tmpOffset = each + repartitionStep
OffsetRange(tp.topic, tp.partition, each, Math.min(tmpOffset,uo))
}).toList
}else{
Array(OffsetRange(tp.topic, tp.partition, fo, uo))
}
}
import bigdata.spark.config.Config
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.{SparkConf, TaskContext}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
/*
1. 直接消费新数据,数据离线修补。
2. repartition(10---->100),给足够多的资源,以便任务逐渐消除滞后的数据。
3. directDstream api 生成的是kafkardd,该rdd与kafka分区一一对应。
*/
object kafka010Repartition {
def main(args: Array[String]) {
// 创建一个批处理时间是2s的context 要增加环境变量
val sparkConf = new SparkConf().setAppName(this.getClass.getName).setMaster("local[*]")
sparkConf.set("enable.auto.repartition","true")
sparkConf.set("per.partition.offsetrange.threshold","300")
sparkConf.set("per.partition.offsetrange.step","100")
val ssc = new StreamingContext(sparkConf, Seconds(5))
// 使用broker和topic创建DirectStream
val topicsSet = "test1".split(",").toSet
val kafkaParams = Map[String, Object]("bootstrap.servers" -> Config.kafkaHost,
"key.deserializer"->classOf[StringDeserializer],
"value.deserializer"-> classOf[StringDeserializer],
"group.id"->"test1",
"auto.offset.reset" -> "earliest",
"enable.auto.commit"->(false: java.lang.Boolean))
val messages = KafkaUtils.createDirectStream[String, String](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams))
messages.transform(rdd=>{
println("partition.size : "+rdd.getNumPartitions)
rdd
}).foreachRDD(rdd=>{
// rdd.foreachPartition(each=>println(111))
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
offsetRanges.foreach(o=>{
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
})
})
ssc.start()
ssc.awaitTermination()
}
}
67 :
test1 0 447 547
test1 0 547 647
test1 0 647 747
test1 0 747 847
test1 0 847 947
test1 0 947 1047
test1 0 1047 1147
test1 0 1147 1247
test1 0 1247 1347
test1 0 1347 1447
test1 0 1447 1547
test1 0 1547 1647
test1 0 1647 1747
test1 0 1747 1847
test1 0 1847 1947
test1 0 1947 2047
test1 0 2047 2147
test1 0 2147 2247
test1 0 2247 2347
test1 0 2347 2447
test1 0 2447 2547
test1 0 2547 2647
test1 0 2647 2747
test1 0 2747 2847
test1 0 2847 2947
test1 0 2947 3047
test1 0 3047 3147
test1 0 3147 3247
test1 0 3247 3347
test1 0 3347 3447
test1 0 3447 3547
test1 0 3547 3647
test1 0 3647 3747
test1 0 3747 3847
test1 0 3847 3947
test1 0 3947 4047
test1 0 4047 4147
test1 0 4147 4247
test1 0 4247 4347
test1 0 4347 4447
test1 0 4447 4547
test1 0 4547 4647
test1 0 4647 4747
test1 0 4747 4847
test1 0 4847 4947
test1 0 4947 5047
test1 0 5047 5147
test1 0 5147 5247
test1 0 5247 5347
test1 0 5347 5447
test1 0 5447 5547
test1 0 5547 5647
test1 0 5647 5747
test1 0 5747 5847
test1 0 5847 5947
test1 0 5947 6047
test1 0 6047 6147
test1 0 6147 6247
test1 0 6247 6347
test1 0 6347 6447
test1 0 6447 6547
test1 0 6547 6647
test1 0 6647 6747
test1 0 6747 6847
test1 0 6847 6947
test1 0 6947 7047
test1 0 7047 7124
别忘了点个在看哦!转发那就太好了!