vlambda博客
学习文章列表

flink源码分析之kafka consumer的执行流程

背景

线上flink任务稳定运行了两个多月了,突然之间收到了消息堆积较多的报警,kafka上看到的现象是消息堆积较多。问过业务人员得知,对应的流表在前一天重新刷了一遍数据,在我们的这个任务中有两次维表关联,而且内层有一个split操作会造成外层维表关联的数据量膨胀(最大可能为80倍,即split之后产生了80条新记录)。开始了问题分析之路。

问题

查看taskmanager的log时发现有如下报警信息:

WARN org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher - Committing offsets to Kafka takes longer than the checkpoint interval. Skipping commit of previous offsets because newer complete checkpoint offsets are available. This does not compromise Flink's checkpoint integrity.

问题是说在flink执行checkpoint的间隔内,从kafka中拉取到的数据还没有处理完成,导致offset没办法提交,而下一次的checkpoint已经开始了,这样flink会跳过对offset的提交。

分析

我们的场景是业务刷了大量的数据,导致短时间内生产了大量的数据,flink从kafka拉取的第一批还没有处理完成时,下一次checkpoint开始了,此时检查到上一次的checkpoint还未提交就会报这个警告并跳过当前这次checkpoint时对offset的提交。由于kafka中堆积的数据量足够,下一批还是会拉取一批数据在我们这里是500条(外层膨胀后会有几万条),然后仍然会处理超时,长此以往会频繁跳过offfset的提交,在kafka控制台上看到的结果是该消费者对应的consumer Group消息堆积越来越多(实际上是有消费,只是消费得比较慢)。为什么慢?库里同时有大量写入的操作,维表关联的性能急剧下降。这里不讨论维表性能的优化,我们主要基于问题来分析下flink中消费kafka的源码流程。

针对kafka消费的流程,我们来从头分析一下:

Task角度

Task是一个Runnable对象,它的run方法定义如下:

 /** * The core work method that bootstraps the task and executes its code. */ @Override public void run() { try { doRun(); } finally { terminationFuture.complete(executionState); } }

在它的doRun方法中会真正执行StreamTask的逻辑,StreamTask同时也是AbstractInvokable的子类。Task的doRun方法的部分代码如下:

它会初始化invokable实例并调用invokable的invoke方法。invokable实例是StreamTask类型的。

StreamTask角度

kafka源对应的StreamTask为SourceStreamTask,它的结构为:

flink源码分析之kafka consumer的执行流程

我们来看SourceStreamTask的invokable过程,它的invokable方法为org.apache.flink.streaming.runtime.tasks.StreamTask#invoke:

try { beforeInvoke(); // final check to exit early before starting to run if (canceled) { throw new CancelTaskException(); } // let the task do its work runMailboxLoop(); // if this left the run() method cleanly despite the fact that this was canceled, // make sure the "clean shutdown" is not attempted if (canceled) { throw new CancelTaskException(); } afterInvoke(); } //----------省略部分代码

在这里我们不深究StreamTask的完整的初始化流程,只关注下我们本文要关注的重点,其他内容后面再在专门的篇幅中具体分析。

beforeInvoke方法

我们来看beforeInvoke方法:

flink源码分析之kafka consumer的执行流程

我们主要关注它的两个动作:

执行SourceStreamTask的init方法。在init方法中主要执行一些和checkpoint和operator的udf相关的信息。执行operatorChain.initializeStateAndOpenOperators方法。在org.apache.flink.streaming.runtime.tasks.OperatorChain#initializeStateAndOpenOperators方法中主要执行operator的open操作,代码如下:

 protected void initializeStateAndOpenOperators(StreamTaskStateInitializer streamTaskStateInitializer) throws Exception { for (StreamOperatorWrapper<?, ?> operatorWrapper : getAllOperators(true)) { StreamOperator<?> operator = operatorWrapper.getStreamOperator(); operator.initializeState(streamTaskStateInitializer); operator.open(); } }

kafka Source对应的Operator为StreamSource类型的,是AbstractUdfStreamOperator的一个子类,它的open方法代码如下:

 @Override public void open() throws Exception { super.open(); FunctionUtils.openFunction(userFunction, new Configuration()); }

需要注意的是,这个userFunction是FlinkKafkaConsumer的一个实例。FlinkKafkaConsumer是FlinkKafkaConsumerBase类型的,openFunction方法会调用到org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase#open方法,在该方法中会使用partitionDiscoverer获取到分区信息,然后尝试去state中获取,如果restoreState不为空则将partition信息与restoreState进行同步,将放入到subscribedPartitionsToStartOffsets容器中;如果restoreState为空则根据StartupMode来按照相应的模式处理partition列表中的信息。

flink源码分析之kafka consumer的执行流程

在FlinkKafkaConsumerBase的open方法中还有一点需要注意的是,对于offset的处理逻辑。我们来看下第一行代码:

 // determine the offset commit mode this.offsetCommitMode = OffsetCommitModes.fromConfiguration( getIsAutoCommitEnabled(), enableCommitOnCheckpoints, ((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());

flink源码分析之kafka consumer的执行流程

如果启用了checkpoint,直接通过是否启动了enableCommitOnCheckpoint来决定提交的模式,enableCommitOnCheckpoint默认为true,可以通过org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase#setCommitOffsetsOnCheckpoints方法来修改。如果enableCommitOnCheckpoint为false则不进行offset的提交。如果禁用了checkpoint,则根据是否启动了自动提交来判断,如果没有启动则不进行offset提交。

runMailboxLoop方法

我们直接来看org.apache.flink.streaming.runtime.tasks.StreamTask#runMailboxLoop代码:

 public void runMailboxLoop() throws Exception { mailboxProcessor.runMailboxLoop(); }

我们先来看下在StreamTask的构造方法中对mailboxProcessor的定义:

 // 创建 mailboxProcessor this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);

第一个入参为MailboxDefaultAction,第二个入参为一个mailbox队列,第三人入参为线程执行器。其中MailboxDefaultAction对象为lambda表达式。

接下来我们来看org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor#runMailboxLoop方法:

 /** * Runs the mailbox processing loop. This is where the main work is done. */ public void runMailboxLoop() throws Exception { // 邮箱 final TaskMailbox localMailbox = mailbox;
Preconditions.checkState( localMailbox.isMailboxThread(), "Method must be executed by declared mailbox thread!");
assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!"; // 邮箱controller,与processor绑定 final MailboxController defaultActionContext = new MailboxController(this); // 当mailbox循环处于运行状态时,会一直消费Mailbox中的message(实际上是一个FIFO的队列) while (isMailboxLoopRunning()) { // The blocking `processMail` call will not return until default action is available. processMail(localMailbox, false); if (isMailboxLoopRunning()) { // 进行 task 的 default action,也就是调用 processInput() // 这里的defaultAction是在StreamTask的构造方法中的this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor)中的this::processInput。 mailboxDefaultAction.runDefaultAction(defaultActionContext); // lock is acquired inside default action as needed } } }

可以看到在该方法中会先循环执行processMail方法,然后执行mailboxDefaultAction.runDefaultAction(defaultActionContext)方法。这里我们主要关心后者,通过上文我们知道mailboxDefaultAction初始化为一个lambda表达式,在执行runDefaultAction时实际调用的是org.apache.flink.streaming.runtime.tasks.StreamTask#processInput方法。在我们本文的分析中它对应的是org.apache.flink.streaming.runtime.tasks.SourceStreamTask#processInput方法:

flink源码分析之kafka consumer的执行流程

这里会启动sourceThread线程,sourceThread线程为LegacySourceFunctionThread类型的,我们来看下它run方法中的运行逻辑:

flink源码分析之kafka consumer的执行流程

mainOperator的run方法的调用链为:

org.apache.flink.streaming.api.operators.StreamSource#run(java.lang.Object, org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer, org.apache.flink.streaming.runtime.tasks.OperatorChain) 到org.apache.flink.streaming.api.operators.StreamSource#run(java.lang.Object, org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer, org.apache.flink.streaming.api.operators.Output<org.apache.flink.streaming.runtime.streamrecord.StreamRecord >, org.apache.flink.streaming.runtime.tasks.OperatorChain),然后调用里面的userFunction.run(ctx)方法,继而调用到org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase#run方法。

FlinkKafkaConsumerBase的run方法部分代码如下:

flink源码分析之kafka consumer的执行流程

主要有两种操作:

1.创建KafkaFetcher;2.执行KafkaFetcher的loop操作,最多是增加一路partition discovery的操作。

KafkaFetcher

构造方法的部分代码如下:

flink源码分析之kafka consumer的执行流程

主要的操作是设置上下文信息、watermark信息、checkpoint信息、checkpointLock、类加载器、handover缓存、consumerThread、kafkaCollector等。这里我们来关注下对于consumerThread的初始化操作:

this.consumerThread = new KafkaConsumerThread( LOG, handover, kafkaProperties, unassignedPartitionsQueue, getFetcherName() + " for " + taskNameWithSubtasks, pollTimeout, useMetrics, consumerMetricGroup, subtaskMetricGroup);

这里我们主要关注下unassignedPartitionsQueue,它是在AbstractFetcher中初始化的,AbstractFetcher的构造方法部分代码为:

flink源码分析之kafka consumer的执行流程

也就是说在初始化Fetcher时会将所有的partition信息放到unassignedPartitionsQueue中,意思是未分配的partition队列。

我们接着来看KafkaFetcher的runFetchLoop方法:

flink源码分析之kafka consumer的执行流程

主要操作是启动consumerThread,然后执行loop操作从handover中消费数据。

拉取数据与提交offset的核心逻辑在ConsumerThread中,我们来看对应代码:

1.run方法的初始部分

 public void run() { // early exit check if (!running) { return; }
// this is the means to talk to FlinkKafkaConsumer's main thread final Handover handover = this.handover;
// This method initializes the KafkaConsumer and guarantees it is torn down properly. // This is important, because the consumer has multi-threading issues, // including concurrent 'close()' calls. try { // 获取consumer,每次来获取都是new一个新的KafkaConsumer this.consumer = getConsumer(kafkaProperties); } catch (Throwable t) { handover.reportError(t); return; } // -------------省略-------

初始化实例的handover对象;获取一个新的consumer实例(因为kafkaConsumer是线程不安全的)。

1.fetch loop

 // -------省略代码--------------- // main fetch loop while (running) {
// check if there is something to commit if (!commitInProgress) {// 默认为false // get and reset the work-to-be committed, so we don't repeatedly commit the same final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback = nextOffsetsToCommit.getAndSet(null);
if (commitOffsetsAndCallback != null) {// 如果不为空,代表上次的提交没有完成,这里会继续进行异步提交 log.debug("Sending async offset commit request to Kafka broker");
// also record that a commit is already in progress // the order here matters! first set the flag, then send the commit command. commitInProgress = true; consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1)); } } try { if (hasAssignedPartitions) { // 如果已经分配好分区,查看是否有未分配的partition,分配分区的 newPartitions = unassignedPartitionsQueue.pollBatch(); } else { // if no assigned partitions block until we get at least one // instead of hot spinning this loop. We rely on a fact that // unassignedPartitionsQueue will be closed on a shutdown, so // we don't block indefinitely newPartitions = unassignedPartitionsQueue.getBatchBlocking(); } if (newPartitions != null) { reassignPartitions(newPartitions); } } catch (AbortedReassignmentException e) { continue; } if (!hasAssignedPartitions) { // Without assigned partitions KafkaConsumer.poll will throw an exception continue; } // get the next batch of records, unless we did not manage to hand the old batch over if (records == null) { try { records = consumer.poll(pollTimeout); } catch (WakeupException we) { continue; } } try { handover.produce(records); records = null; } //---------省略代码

这个部分需要注意以下几点:

commitInProgress的状态变化:默认为false,在执行consumer.commitAsync之前会置为true,在consumer.commitAsync操作callback通知后会置为false以便进行下一次的consumer.commitAsync操作;nextOffsetsToCommit变量的变化点在org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread#setOffsetsToCommit方法中,该方法的调用链路为:

这个发生在notifyCheckpointComplete方法的调用中,也就是说在一次checkpoint完成后会执行setOffsetsToCommit方法。同时这里提一点题外话,这个过程是在processMail中执行的,也证明了flink在处理event processing、Processing-Time的定时器和checkpoint使用mailbox后的改进(题外话,可以参考之前写过的StreamTask线程模型分析的文章)。同时在执行一次consumer.commitAsync操作后会将nextOffsetsToCommit的值置为null。

我们来看下setOffsetsToCommit的一段代码:

 // record the work to be committed by the main consumer thread and make sure the consumer notices that if (nextOffsetsToCommit.getAndSet(Tuple2.of(offsetsToCommit, commitCallback)) != null) {//设置新值,返回老值,老值是否为null log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " + "Skipping commit of previous offsets because newer complete checkpoint offsets are available. " + "This does not compromise Flink's checkpoint integrity."); }

如果执行checkpoint操作时,上一次拉取的数据的offset还没有提交,这里会抛出警告并跳过本次offset的提交。这里需要注意的是consumer每次拉取数据会自己维护offset的变化,不依赖于kafka broker上当前消费者组的offset(如下图所示),但是在consumer重新初始化时会依赖这个。

newPartitions的初始化,第一次进入时hasAssignedPartitions为false,会依赖unassignedPartitionsQueue.getBatchBlocking()方法进行初始化,并进入reassignPartitions方法进行分区的分配逻辑,将hasAssignedPartitions置为true,后面loop到这段代码时会执行 unassignedPartitionsQueue.pollBatch(),将一些新加入的或者之前分配失败的分区进行分配。consumer.poll 执行kafkaConsumer的拉取数据的操作。handover.produce:将数据放入到handover中,这里的数据会被KafkaFetcher中的Loop操作消费掉。