vlambda博客
学习文章列表

[源码分析] 从实例和源码入手看 Flink 之广播 Broadcast

[源码分析] 从实例和源码入手看 Flink 之广播 Broadcast

0x00 摘要

本文将通过源码分析和实例讲解,带领大家熟悉Flink的广播变量机制。

0x01 业务需求

1. 场景需求

对黑名单中的IP进行检测过滤。IP黑名单的内容会随时增减,因此是可以随时动态配置的。

该黑名单假设存在mysql中,Flink作业启动时候会把这个黑名单从mysql载入,作为一个变量由Flink算子使用。

2. 问题

我们不想重启作业以便重新获取这个变量。所以就需要一个能够动态修改算子里变量的方法。

3. 解决方案

使用广播的方式去解决。去做配置的动态更新。

广播和普通的流数据不同的是:广播流的1条流数据能够被算子的所有分区所处理,而数据流的1条流数据只能够被算子的某一分区处理。因此广播流的特点也决定适合做配置的动态更新。

0x02 概述

广播这部分有三个难点:使用步骤;如何自定义函数;如何存取状态。下面就先为大家概述下。

1. broadcast的使用步骤

  • 建立MapStateDescriptor
  • 通过DataStream.broadcast方法返回广播数据流BroadcastStream
  • 通过DataStream.connect方法,把业务数据流和BroadcastStream进行连接,返回BroadcastConnectedStream
  • 通过BroadcastConnectedStream.process方法分别进行processElement及processBroadcastElement处理

2. 用户自定义处理函数

  • BroadcastConnectedStream.process接收两种类型的function:KeyedBroadcastProcessFunction 和 BroadcastProcessFunction
  • 两种类型的function都定义了processElement、processBroadcastElement抽象方法,只是KeyedBroadcastProcessFunction多定义了一个onTimer方法,默认是空操作,允许子类重写
  • processElement处理业务数据流
  • processBroadcastElement处理广播数据流

3. Broadcast State

  • Broadcast State始终表示为MapState,即map format。 这是Flink提供的最通用的状态原语。是托管状态的一种,托管状态是由Flink框架管理的状态,如ValueState, ListState, MapState等
  • 用户必须创建一个 MapStateDescriptor,才能得到对应的状态句柄。这保存了状态名称, 状态所持有值的类型,并且可能包含用户指定的函数
  • checkpoint的时候也会checkpoint broadcast state
  • Broadcast State只在内存有,没有RocksDB state backend
  • Flink 会将state广播到每个task,注意该state并不会跨task传播,对其修改仅仅是作用在其所在的task
  • downstream tasks接收到broadcast event的顺序可能不一样,所以依赖其到达顺序来处理element的时候要小心

0x03. 示例代码

1. 示例代码

我们直接从Flink源码入手可以找到理想的示例。以下代码直接摘录 Flink 源码 StatefulJobWBroadcastStateMigrationITCase,我会在里面加上注释说明。

 @Test def testRestoreSavepointWithBroadcast(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 以下两个变量是为了确定广播流发出的数据类型,广播流可以同时发出多种类型的数据 lazy val firstBroadcastStateDesc = new MapStateDescriptor[Long, Long]( "broadcast-state-1", BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]])
lazy val secondBroadcastStateDesc = new MapStateDescriptor[String, String]( "broadcast-state-2", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)
env.setStateBackend(new MemoryStateBackend) env.enableCheckpointing(500) env.setParallelism(4) env.setMaxParallelism(4)
// 数据流,这里数据流和广播流的Source都是同一种CheckpointedSource。数据流这里做了一系列算子操作,比如flatMap val stream = env .addSource( new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource") .keyBy( new KeySelector[(Long, Long), Long] { override def getKey(value: (Long, Long)): Long = value._1 } ) .flatMap(new StatefulFlatMapper) .keyBy( new KeySelector[(Long, Long), Long] { override def getKey(value: (Long, Long)): Long = value._1 } )
// 广播流 val broadcastStream = env .addSource( new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedBroadcastSource") .broadcast(firstBroadcastStateDesc, secondBroadcastStateDesc)
// 把数据流和广播流结合起来 stream .connect(broadcastStream) .process(new VerifyingBroadcastProcessFunction(expectedFirstState, expectedSecondState)) .addSink(new AccumulatorCountingSink) }}
// 用户自定义的处理函数class TestBroadcastProcessFunction extends KeyedBroadcastProcessFunction [Long, (Long, Long), (Long, Long), (Long, Long)] {
// 重点说明,这里的 firstBroadcastStateDesc,secondBroadcastStateDesc 其实和之前广播流的那两个MapStateDescriptor无关。 // 这里两个MapStateDescriptor是为了存取BroadcastState,这样在 processBroadcastElement和processElement之间就可以传递变量了。我们完全可以定义新的MapStateDescriptor,只要processBroadcastElement和processElement之间认可就行。 // 这里参数 "broadcast-state-1" 是name, flink就是用这个 name 来从Flink运行时系统中存取MapStateDescriptor lazy val firstBroadcastStateDesc = new MapStateDescriptor[Long, Long]( "broadcast-state-1", BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]], BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]])
val secondBroadcastStateDesc = new MapStateDescriptor[String, String]( "broadcast-state-2", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO)
override def processElement( value: (Long, Long), ctx: KeyedBroadcastProcessFunction [Long, (Long, Long), (Long, Long), (Long, Long)]#ReadOnlyContext, out: Collector[(Long, Long)]): Unit = {
// 这里Flink源码中是直接把接受到的业务变量直接再次转发出去 out.collect(value) }
override def processBroadcastElement( value: (Long, Long), ctx: KeyedBroadcastProcessFunction [Long, (Long, Long), (Long, Long), (Long, Long)]#Context, out: Collector[(Long, Long)]): Unit = { // 这里是把最新传来的广播变量存储起来,processElement中可以取出再次使用. 具体是通过firstBroadcastStateDesc 的 name 来获取 BroadcastState ctx.getBroadcastState(firstBroadcastStateDesc).put(value._1, value._2) ctx.getBroadcastState(secondBroadcastStateDesc).put(value._1.toString, value._2.toString) }}
// 广播流和数据流的Sourceprivate class CheckpointedSource(val numElements: Int) extends SourceFunction[(Long, Long)] with CheckpointedFunction {
private var isRunning = true private var state: ListState[CustomCaseClass] = _
// 就是简单的定期发送 override def run(ctx: SourceFunction.SourceContext[(Long, Long)]) { ctx.emitWatermark(new Watermark(0)) ctx.getCheckpointLock synchronized { var i = 0 while (i < numElements) { ctx.collect(i, i) i += 1 } } // don't emit a final watermark so that we don't trigger the registered event-time // timers while (isRunning) Thread.sleep(20) }}

2. 技术难点

MapStateDescriptor

首先要说明一些概念:

  • Flink中包含两种基础的状态:Keyed State和Operator State。
  • Keyed State和Operator State又可以 以两种形式存在:原始状态和托管状态。
  • 托管状态是由Flink框架管理的状态,如ValueState, ListState, MapState等。
  • raw state即原始状态,由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态内容,对其内部数据结构一无所知。
  • MapState是托管状态的一种:即状态值为一个map。用户通过 putputAll方法添加元素。

回到我们的例子,广播变量就是OperatorState的一部分,是以托管状态的MapState形式保存的。具体getBroadcastState函数就是DefaultOperatorStateBackend中的实现

所以我们需要用MapStateDescriptor描述broadcast state,这里MapStateDescriptor的使用比较灵活,因为是key,value类似使用,所以个人觉得value直接使用类,这样更方便,尤其是对于从其他语言转到scala的同学。

processBroadcastElement

// 因为主要起到控制作用,所以这个函数的处理相对简单override def processBroadcastElement(): Unit = { // 这里可以把最新传来的广播变量存储起来,processElement中可以取出再次使用,比如 ctx.getBroadcastState(firstBroadcastStateDesc).put(value._1, value._2)}

processElement

// 这个函数需要和processBroadcastElement配合起来使用override def processElement(): Unit = { // 可以取出processBroadcastElement之前存储的广播变量,然后用此来处理业务变量,比如 val secondBroadcastStateDesc = new MapStateDescriptor[String, String]( "broadcast-state-2", BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO) 
var actualSecondState = Map[String, String]() for (entry <- ctx.getBroadcastState(secondBroadcastStateDesc).immutableEntries()) { val v = secondExpectedBroadcastState.get(entry.getKey).get actualSecondState += (entry.getKey -> entry.getValue) }
// 甚至这里只要和processBroadcastElement一起关联好,可以存储任意类型的变量。不必须要和广播变量的类型一致。重点是声明新的对应的MapStateDescriptor // MapStateDescriptor继承了StateDescriptor,其中state为MapState类型,value为Map类型}

结合起来使用

因为某些限制,所以下面只能从网上找一个例子给大家讲讲。

// 模式始终存储在MapState中,并将null作为键。broadcast state始终表示为MapState,这是Flink提供的最通用的状态原语。MapStateDescriptor<Void, Pattern> bcStateDescriptor =  new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));
// 能看到的是,在处理广播变量时候,存储广播变量到BroadcastState public void processBroadcastElement(Pattern pattern, Context ctx, Collector<Tuple2<Long, Pattern>> out) throws Exception { // store the new pattern by updating the broadcast state BroadcastState<Void, Pattern> bcState = ctx.getBroadcastState(patternDesc); // storing in MapState with null as VOID default value bcState.put(null, pattern); }
// 能看到的是,在处理业务变量时候,从BroadcastState取出广播变量,存取时候实际都是用"patterns"这个name字符串来作为key。 public void processElement(Action action, ReadOnlyContext ctx, Collector<Tuple2<Long, Pattern>> out) throws Exception { // get current pattern from broadcast state Pattern pattern = ctx.getBroadcastState(this.patternDesc) // access MapState with null as VOID default value .get(null); // get previous action of current user from keyed state String prevAction = prevActionState.value(); if (pattern != null && prevAction != null) { // user had an action before, check if pattern matches if (pattern.firstAction.equals(prevAction) && pattern.secondAction.equals(action.action)) { // MATCH out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern)); } } // update keyed state and remember action for next pattern evaluation prevActionState.update(action.action); }

0x04. Flink 源码解析

1. 广播的逻辑流程

 * The life cycle of the Broadcast: * {@code * -- 初始化逻辑 -> 用一个BroadcastConnectedStream把数据流和广播流结合起来进行拓扑转换 * |  * +----> businessStream = DataStream.filter.map.... * | // 处理业务逻辑的数据流,businessStream 是普通DataStream  * +----> broadcastStream = DataStream.broadcast(broadcastStateDesc) * | // 处理配置逻辑的广播数据流,broadcastStream是BroadcastStream类型 * +----> businessStream.connect(broadcastStream) * | .process(new processFunction(broadcastStateDesc)) * | // 把业务流,广播流 结合起来,生成一个BroadcastConnectedStream,然后进行 process * +----------> process @ BroadcastConnectedStream  * | TwoInputStreamOperator<IN1, IN2, OUT> operator = * | new CoBroadcastWithNonKeyedOperator<>(clean(function), * | broadcastStateDescriptors); * | return transform(outTypeInfo, operator);  * | // 生成一个类型是TwoInputStreamOperator 的 operator,进行 transform * +----------------> transform @ BroadcastConnectedStream  * | transform = new TwoInputTransformation<>( * | inputStream1.getTransformation(), // 业务流 * | inputStream2.getTransformation(), // 广播流 * | ifunctionName, // 用户的UDF * | operator, // 算子 CoBroadcastWithNonKeyedOperator * | outTypeInfo); // 输出类型 * | returnStream = new SingleOutputStreamOperator(transform); * | getExecutionEnvironment().addOperator(transform) * | // 将业务流,广播流与拓扑联合起来形成一个转换,加到 Env 中,这就完成了拓扑转换  * | // 最后返回结果是一个SingleOutputStreamOperator。 * }

* 数据结构: * -- BroadcastStream. * 就是简单封装一个DataStream,然后记录这个广播流对应的StateDescriptors public class BroadcastStream<T> { private final StreamExecutionEnvironment environment; private final DataStream<T> inputStream; private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors; } * 数据结构: * -- BroadcastConnectedStream. * 把业务流,广播流 结合起来,然后会生成算子和拓扑public class BroadcastConnectedStream<IN1, IN2> { private final StreamExecutionEnvironment environment; private final DataStream<IN1> inputStream1; private final BroadcastStream<IN2> inputStream2; private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;}
* 真实计算:* -- CoBroadcastWithNonKeyedOperator -> 真正对BroadcastProcessFunction的执行,是在这里完成的public class CoBroadcastWithNonKeyedOperator<IN1, IN2, OUT> extends AbstractUdfStreamOperator<OUT, BroadcastProcessFunction<IN1, IN2, OUT>> implements TwoInputStreamOperator<IN1, IN2, OUT> { private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors; private transient TimestampedCollector<OUT> collector; private transient Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates; private transient ReadWriteContextImpl rwContext; private transient ReadOnlyContextImpl rContext; @Override public void processElement1(StreamRecord<IN1> element) throws Exception { collector.setTimestamp(element); rContext.setElement(element); // 当上游有最新业务数据来的时候,调用用户自定义的processElement // 在这可以把之前存储的广播配置信息取出,然后对业务数据流进行处理 userFunction.processElement(element.getValue(), rContext, collector); rContext.setElement(null); }
@Override public void processElement2(StreamRecord<IN2> element) throws Exception { collector.setTimestamp(element); rwContext.setElement(element); // 当上游有数据来的时候,调用用户自定义的processBroadcastElement // 在这可以把最新传送的广播配置信息存起来 userFunction.processBroadcastElement(element.getValue(), rwContext, collector); rwContext.setElement(null); } }

2. DataStream的关键函数

// 就是connect,broadcast,分别生成对应的数据流public class DataStream<T> { protected final StreamExecutionEnvironment environment; protected final Transformation<T> transformation;
@PublicEvolving public <R> BroadcastConnectedStream<T, R> connect(BroadcastStream<R> broadcastStream) { return new BroadcastConnectedStream<>( environment, this, Preconditions.checkNotNull(broadcastStream), broadcastStream.getBroadcastStateDescriptor()); } @PublicEvolving public BroadcastStream<T> broadcast(final MapStateDescriptor<?, ?>... broadcastStateDescriptors) { final DataStream<T> broadcastStream = setConnectionType(new BroadcastPartitioner<>()); return new BroadcastStream<>(environment, broadcastStream, broadcastStateDescriptors); }}

3. 关键数据结构MapStateDescriptor

主要是用来声明各种元数据信息。后续可以看出来,系统是通过MapStateDescriptor的name,即第一个参数来存储 / 获取MapStateDescriptor对应的State。

public class MapStateDescriptor<UK, UV> extends StateDescriptor<MapState<UK, UV>, Map<UK, UV>> { /** * Create a new {@code MapStateDescriptor} with the given name and the given type serializers. * * @param name The name of the {@code MapStateDescriptor}. * @param keySerializer The type serializer for the keys in the state. * @param valueSerializer The type serializer for the values in the state. */  public MapStateDescriptor(String name, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) { super(name, new MapSerializer<>(keySerializer, valueSerializer), null); }
/** * Create a new {@code MapStateDescriptor} with the given name and the given type information. * * @param name The name of the {@code MapStateDescriptor}. * @param keyTypeInfo The type information for the keys in the state. * @param valueTypeInfo The type information for the values in the state. */ public MapStateDescriptor(String name, TypeInformation<UK> keyTypeInfo, TypeInformation<UV> valueTypeInfo) { super(name, new MapTypeInfo<>(keyTypeInfo, valueTypeInfo), null); }
/** * Create a new {@code MapStateDescriptor} with the given name and the given type information. * * <p>If this constructor fails (because it is not possible to describe the type via a class), * consider using the {@link #MapStateDescriptor(String, TypeInformation, TypeInformation)} constructor. * * @param name The name of the {@code MapStateDescriptor}. * @param keyClass The class of the type of keys in the state. * @param valueClass The class of the type of values in the state. */ public MapStateDescriptor(String name, Class<UK> keyClass, Class<UV> valueClass) { super(name, new MapTypeInfo<>(keyClass, valueClass), null); }}

4. 状态存取

在processBroadcastElement和processElement之间传递的状态,是通过MapStateDescriptor的name为key,来存储在Flink中。即类似调用如下ctx.getBroadcastState(firstBroadcastStateDesc).put(value._1, value._2)。所以我们接下来需要介绍下Flink的State概念。

State vs checkpoint

首先区分一下两个概念,state一般指一个具体的task/operator的状态。而checkpoint则表示了一个Flink Job,在一个特定时刻的一份全局状态快照,即包含了所有task/operator的状态。Flink通过定期地做checkpoint来实现容错和恢复。

Flink中包含两种基础的状态:Keyed State和Operator State。

Keyed State

顾名思义,就是基于KeyedStream上的状态。这个状态是跟特定的key绑定的,对KeyedStream流上的每一个key,可能都对应一个state。

Operator State

与Keyed State不同,Operator State跟一个特定operator的一个并发实例绑定,整个operator只对应一个state。相比较而言,在一个operator上,可能会有很多个key,从而对应多个keyed state。

举例来说,Flink中的Kafka Connector,就使用了operator state。它会在每个connector实例中,保存该实例中消费topic的所有(partition, offset)映射。

原始状态和Flink托管状态 (Raw and Managed State)

这又是另外一个维度。

Keyed StateOperator State 分别有两种存在形式:managed and raw, 即原始状态和托管状态。

托管状态是由 Flink框架运行时 管理的状态,比如内部的 hash table 或者 RocksDB。比如 “ValueState”, “ListState” 等。Flink runtime 会对这些状态进行编码并写入 checkpoint。

比如managed keyed state 接口提供不同类型状态的访问接口,这些状态都作用于当前输入数据的 key 下。换句话说,这些状态仅可在 KeyedStream 上使用,可以通过 stream.keyBy(...) 得到 KeyedStream。而我们可以通过实现 CheckpointedFunctionListCheckpointed 接口来使用 managed operator state。

Raw state即原始状态,由用户自行管理状态具体的数据结构,保存在算子自己的数据结构中。checkpoint 的时候,Flink 并不知晓具体的内容,仅仅写入一串字节序列到 checkpoint。

通常在DataStream上的状态推荐使用托管的状态,当实现一个用户自定义的operator时,会使用到原始状态。

回到我们的例子,广播变量就是OperatorState的一部分,是以托管状态的MapState形式保存的。具体getBroadcastState函数就是DefaultOperatorStateBackend中的实现

StateDescriptor

你必须创建一个 StateDescriptor,才能得到对应的状态句柄。这保存了状态名称(你可以创建多个状态,并且它们必须具有唯一的名称以便可以引用它们), 状态所持有值的类型,并且可能包含用户指定的函数,例如ReduceFunction。根据不同的状态类型,可以创建ValueStateDescriptorListStateDescriptorReducingStateDescriptorFoldingStateDescriptorMapStateDescriptor

状态通过 RuntimeContext 进行访问,因此只能在 rich functions 中使用。

OperatorStateBackEnd

OperatorStateBackEnd 主要管理OperatorState. 目前只有一种实现: DefaultOperatorStateBackend。

DefaultOperatorStateBackend

DefaultOperatorStateBackend状态是以Map方式来存储的。其构造出一个 PartitionableListState (属于ListState)。OperatorState都保存在内存中。

public class DefaultOperatorStateBackend implements OperatorStateBackend {  /** * Map for all registered operator states. Maps state name -> state */ private final Map<String, PartitionableListState<?>> registeredOperatorStates;
/** * Map for all registered operator broadcast states. Maps state name -> state */ private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates; /** * Cache of already accessed states. * * <p>In contrast to {@link #registeredOperatorStates} which may be repopulated * with restored state, this map is always empty at the beginning. * * <p>TODO this map should be moved to a base class once we have proper hierarchy for the operator state backends. */ private final Map<String, PartitionableListState<?>> accessedStatesByName;
private final Map<String, BackendWritableBroadcastState<?, ?>> accessedBroadcastStatesByName; // 这里用来缓存广播变量 // 这里就是前文中所说的,存取广播变量的API public <K, V> BroadcastState<K, V> getBroadcastState(final MapStateDescriptor<K, V> stateDescriptor) throws StateMigrationException {
String name = Preconditions.checkNotNull(stateDescriptor.getName());
// 如果之前有,就取出来 BackendWritableBroadcastState<K, V> previous = (BackendWritableBroadcastState<K, V>) accessedBroadcastStatesByName.get( name);
if (previous != null) { return previous; }
stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig()); TypeSerializer<K> broadcastStateKeySerializer = Preconditions.checkNotNull(stateDescriptor.getKeySerializer()); TypeSerializer<V> broadcastStateValueSerializer = Preconditions.checkNotNull(stateDescriptor.getValueSerializer());
BackendWritableBroadcastState<K, V> broadcastState = (BackendWritableBroadcastState<K, V>) registeredBroadcastStates.get(name);
if (broadcastState == null) { broadcastState = new HeapBroadcastState<>( new RegisteredBroadcastStateBackendMetaInfo<>( name, OperatorStateHandle.Mode.BROADCAST, broadcastStateKeySerializer, broadcastStateValueSerializer)); registeredBroadcastStates.put(name, broadcastState); } else { // has restored state; check compatibility of new state access
RegisteredBroadcastStateBackendMetaInfo<K, V> restoredBroadcastStateMetaInfo = broadcastState.getStateMetaInfo();
// check whether new serializers are incompatible TypeSerializerSchemaCompatibility<K> keyCompatibility = restoredBroadcastStateMetaInfo.updateKeySerializer(broadcastStateKeySerializer);
TypeSerializerSchemaCompatibility<V> valueCompatibility = restoredBroadcastStateMetaInfo.updateValueSerializer(broadcastStateValueSerializer);
broadcastState.setStateMetaInfo(restoredBroadcastStateMetaInfo); }
accessedBroadcastStatesByName.put(name, broadcastState); // 如果之前没有,就存入 return broadcastState; } }

0x05.  参考

Flink原理与实现:详解Flink中的状态管理 https://yq.aliyun.com/articles/225623

Flink使用广播实现配置动态更新 https://www.jianshu.com/p/c8c99f613f10

Flink Broadcast State实用指南 https://blog.csdn.net/u010942041/article/details/93901918

聊聊flink的Broadcast State https://www.jianshu.com/p/d6576ae67eae

Working with State https://ci.apache.org/projects/flink/flink-docs-stable/zh/dev/stream/state/state.html