[源码分析] 从实例和源码入手看 Flink 之广播 Broadcast
[源码分析] 从实例和源码入手看 Flink 之广播 Broadcast
0x00 摘要
本文将通过源码分析和实例讲解,带领大家熟悉Flink的广播变量机制。
0x01 业务需求
1. 场景需求
对黑名单中的IP进行检测过滤。IP黑名单的内容会随时增减,因此是可以随时动态配置的。
该黑名单假设存在mysql中,Flink作业启动时候会把这个黑名单从mysql载入,作为一个变量由Flink算子使用。
2. 问题
我们不想重启作业以便重新获取这个变量。所以就需要一个能够动态修改算子里变量的方法。
3. 解决方案
使用广播的方式去解决。去做配置的动态更新。
广播和普通的流数据不同的是:广播流的1条流数据能够被算子的所有分区所处理,而数据流的1条流数据只能够被算子的某一分区处理。因此广播流的特点也决定适合做配置的动态更新。
0x02 概述
广播这部分有三个难点:使用步骤;如何自定义函数;如何存取状态。下面就先为大家概述下。
1. broadcast的使用步骤
-
建立MapStateDescriptor -
通过DataStream.broadcast方法返回广播数据流BroadcastStream -
通过DataStream.connect方法,把业务数据流和BroadcastStream进行连接,返回BroadcastConnectedStream -
通过BroadcastConnectedStream.process方法分别进行processElement及processBroadcastElement处理
2. 用户自定义处理函数
-
BroadcastConnectedStream.process接收两种类型的function:KeyedBroadcastProcessFunction 和 BroadcastProcessFunction -
两种类型的function都定义了processElement、processBroadcastElement抽象方法,只是KeyedBroadcastProcessFunction多定义了一个onTimer方法,默认是空操作,允许子类重写 -
processElement处理业务数据流 -
processBroadcastElement处理广播数据流
3. Broadcast State
-
Broadcast State始终表示为MapState,即map format。 这是Flink提供的最通用的状态原语。是托管状态的一种,托管状态是由Flink框架管理的状态,如ValueState, ListState, MapState等。 -
用户必须创建一个 MapStateDescriptor
,才能得到对应的状态句柄。这保存了状态名称, 状态所持有值的类型,并且可能包含用户指定的函数 -
checkpoint的时候也会checkpoint broadcast state -
Broadcast State只在内存有,没有RocksDB state backend -
Flink 会将state广播到每个task,注意该state并不会跨task传播,对其修改仅仅是作用在其所在的task -
downstream tasks接收到broadcast event的顺序可能不一样,所以依赖其到达顺序来处理element的时候要小心
0x03. 示例代码
1. 示例代码
我们直接从Flink源码入手可以找到理想的示例。以下代码直接摘录 Flink 源码 StatefulJobWBroadcastStateMigrationITCase,我会在里面加上注释说明。
@Test
def testRestoreSavepointWithBroadcast(): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
// 以下两个变量是为了确定广播流发出的数据类型,广播流可以同时发出多种类型的数据
lazy val firstBroadcastStateDesc = new MapStateDescriptor[Long, Long](
"broadcast-state-1",
BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]])
lazy val secondBroadcastStateDesc = new MapStateDescriptor[String, String](
"broadcast-state-2",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO)
env.setStateBackend(new MemoryStateBackend)
env.enableCheckpointing(500)
env.setParallelism(4)
env.setMaxParallelism(4)
// 数据流,这里数据流和广播流的Source都是同一种CheckpointedSource。数据流这里做了一系列算子操作,比如flatMap
val stream = env
.addSource(
new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedSource")
.keyBy(
new KeySelector[(Long, Long), Long] {
override def getKey(value: (Long, Long)): Long = value._1
}
)
.flatMap(new StatefulFlatMapper)
.keyBy(
new KeySelector[(Long, Long), Long] {
override def getKey(value: (Long, Long)): Long = value._1
}
)
// 广播流
val broadcastStream = env
.addSource(
new CheckpointedSource(4)).setMaxParallelism(1).uid("checkpointedBroadcastSource")
.broadcast(firstBroadcastStateDesc, secondBroadcastStateDesc)
// 把数据流和广播流结合起来
stream
.connect(broadcastStream)
.process(new VerifyingBroadcastProcessFunction(expectedFirstState, expectedSecondState))
.addSink(new AccumulatorCountingSink)
}
}
// 用户自定义的处理函数
class TestBroadcastProcessFunction
extends KeyedBroadcastProcessFunction
[Long, (Long, Long), (Long, Long), (Long, Long)] {
// 重点说明,这里的 firstBroadcastStateDesc,secondBroadcastStateDesc 其实和之前广播流的那两个MapStateDescriptor无关。
// 这里两个MapStateDescriptor是为了存取BroadcastState,这样在 processBroadcastElement和processElement之间就可以传递变量了。我们完全可以定义新的MapStateDescriptor,只要processBroadcastElement和processElement之间认可就行。
// 这里参数 "broadcast-state-1" 是name, flink就是用这个 name 来从Flink运行时系统中存取MapStateDescriptor
lazy val firstBroadcastStateDesc = new MapStateDescriptor[Long, Long](
"broadcast-state-1",
BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]],
BasicTypeInfo.LONG_TYPE_INFO.asInstanceOf[TypeInformation[Long]])
val secondBroadcastStateDesc = new MapStateDescriptor[String, String](
"broadcast-state-2",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO)
override def processElement(
value: (Long, Long),
ctx: KeyedBroadcastProcessFunction
[Long, (Long, Long), (Long, Long), (Long, Long)]#ReadOnlyContext,
out: Collector[(Long, Long)]): Unit = {
// 这里Flink源码中是直接把接受到的业务变量直接再次转发出去
out.collect(value)
}
override def processBroadcastElement(
value: (Long, Long),
ctx: KeyedBroadcastProcessFunction
[Long, (Long, Long), (Long, Long), (Long, Long)]#Context,
out: Collector[(Long, Long)]): Unit = {
// 这里是把最新传来的广播变量存储起来,processElement中可以取出再次使用. 具体是通过firstBroadcastStateDesc 的 name 来获取 BroadcastState
ctx.getBroadcastState(firstBroadcastStateDesc).put(value._1, value._2)
ctx.getBroadcastState(secondBroadcastStateDesc).put(value._1.toString, value._2.toString)
}
}
// 广播流和数据流的Source
private class CheckpointedSource(val numElements: Int)
extends SourceFunction[(Long, Long)] with CheckpointedFunction {
private var isRunning = true
private var state: ListState[CustomCaseClass] = _
// 就是简单的定期发送
override def run(ctx: SourceFunction.SourceContext[(Long, Long)]) {
ctx.emitWatermark(new Watermark(0))
ctx.getCheckpointLock synchronized {
var i = 0
while (i < numElements) {
ctx.collect(i, i)
i += 1
}
}
// don't emit a final watermark so that we don't trigger the registered event-time
// timers
while (isRunning) Thread.sleep(20)
}
}
2. 技术难点
MapStateDescriptor
首先要说明一些概念:
-
Flink中包含两种基础的状态:Keyed State和Operator State。 -
Keyed State和Operator State又可以 以两种形式存在:原始状态和托管状态。 -
托管状态是由Flink框架管理的状态,如ValueState, ListState, MapState等。 -
raw state即原始状态,由用户自行管理状态具体的数据结构,框架在做checkpoint的时候,使用byte[]来读写状态内容,对其内部数据结构一无所知。 -
MapState是托管状态的一种:即状态值为一个map。用户通过 put
或putAll
方法添加元素。
回到我们的例子,广播变量就是OperatorState的一部分,是以托管状态的MapState形式保存的。具体getBroadcastState函数就是DefaultOperatorStateBackend中的实现
所以我们需要用MapStateDescriptor描述broadcast state,这里MapStateDescriptor的使用比较灵活,因为是key,value类似使用,所以个人觉得value直接使用类,这样更方便,尤其是对于从其他语言转到scala的同学。
processBroadcastElement
// 因为主要起到控制作用,所以这个函数的处理相对简单
override def processBroadcastElement(): Unit = {
// 这里可以把最新传来的广播变量存储起来,processElement中可以取出再次使用,比如
ctx.getBroadcastState(firstBroadcastStateDesc).put(value._1, value._2)
}
processElement
// 这个函数需要和processBroadcastElement配合起来使用
override def processElement(): Unit = {
// 可以取出processBroadcastElement之前存储的广播变量,然后用此来处理业务变量,比如
val secondBroadcastStateDesc = new MapStateDescriptor[String, String](
"broadcast-state-2",
BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO)
var actualSecondState = Map[String, String]()
for (entry <- ctx.getBroadcastState(secondBroadcastStateDesc).immutableEntries()) {
val v = secondExpectedBroadcastState.get(entry.getKey).get
actualSecondState += (entry.getKey -> entry.getValue)
}
// 甚至这里只要和processBroadcastElement一起关联好,可以存储任意类型的变量。不必须要和广播变量的类型一致。重点是声明新的对应的MapStateDescriptor
// MapStateDescriptor继承了StateDescriptor,其中state为MapState类型,value为Map类型
}
结合起来使用
因为某些限制,所以下面只能从网上找一个例子给大家讲讲。
// 模式始终存储在MapState中,并将null作为键。broadcast state始终表示为MapState,这是Flink提供的最通用的状态原语。
MapStateDescriptor<Void, Pattern> bcStateDescriptor =
new MapStateDescriptor<>("patterns", Types.VOID, Types.POJO(Pattern.class));
// 能看到的是,在处理广播变量时候,存储广播变量到BroadcastState
public void processBroadcastElement(Pattern pattern, Context ctx,
Collector<Tuple2<Long, Pattern>> out) throws Exception {
// store the new pattern by updating the broadcast state
BroadcastState<Void, Pattern> bcState = ctx.getBroadcastState(patternDesc);
// storing in MapState with null as VOID default value
bcState.put(null, pattern);
}
// 能看到的是,在处理业务变量时候,从BroadcastState取出广播变量,存取时候实际都是用"patterns"这个name字符串来作为key。
public void processElement(Action action, ReadOnlyContext ctx,
Collector<Tuple2<Long, Pattern>> out) throws Exception {
// get current pattern from broadcast state
Pattern pattern = ctx.getBroadcastState(this.patternDesc)
// access MapState with null as VOID default value
.get(null);
// get previous action of current user from keyed state
String prevAction = prevActionState.value();
if (pattern != null && prevAction != null) {
// user had an action before, check if pattern matches
if (pattern.firstAction.equals(prevAction) &&
pattern.secondAction.equals(action.action)) {
// MATCH
out.collect(new Tuple2<>(ctx.getCurrentKey(), pattern));
}
}
// update keyed state and remember action for next pattern evaluation
prevActionState.update(action.action);
}
0x04. Flink 源码解析
1. 广播的逻辑流程
* The life cycle of the Broadcast:
* {@code
* -- 初始化逻辑 -> 用一个BroadcastConnectedStream把数据流和广播流结合起来进行拓扑转换
* |
* +----> businessStream = DataStream.filter.map....
* | // 处理业务逻辑的数据流,businessStream 是普通DataStream
* +----> broadcastStream = DataStream.broadcast(broadcastStateDesc)
* | // 处理配置逻辑的广播数据流,broadcastStream是BroadcastStream类型
* +----> businessStream.connect(broadcastStream)
* | .process(new processFunction(broadcastStateDesc))
* | // 把业务流,广播流 结合起来,生成一个BroadcastConnectedStream,然后进行 process
* +----------> process @ BroadcastConnectedStream
* | TwoInputStreamOperator<IN1, IN2, OUT> operator =
* | new CoBroadcastWithNonKeyedOperator<>(clean(function),
* | broadcastStateDescriptors);
* | return transform(outTypeInfo, operator);
* | // 生成一个类型是TwoInputStreamOperator 的 operator,进行 transform
* +----------------> transform @ BroadcastConnectedStream
* | transform = new TwoInputTransformation<>(
* | inputStream1.getTransformation(), // 业务流
* | inputStream2.getTransformation(), // 广播流
* | ifunctionName, // 用户的UDF
* | operator, // 算子 CoBroadcastWithNonKeyedOperator
* | outTypeInfo); // 输出类型
* | returnStream = new SingleOutputStreamOperator(transform);
* | getExecutionEnvironment().addOperator(transform)
* | // 将业务流,广播流与拓扑联合起来形成一个转换,加到 Env 中,这就完成了拓扑转换
* | // 最后返回结果是一个SingleOutputStreamOperator。
* }
* 数据结构:
* -- BroadcastStream.
* 就是简单封装一个DataStream,然后记录这个广播流对应的StateDescriptors
public class BroadcastStream<T> {
private final StreamExecutionEnvironment environment;
private final DataStream<T> inputStream;
private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;
}
* 数据结构:
* -- BroadcastConnectedStream.
* 把业务流,广播流 结合起来,然后会生成算子和拓扑
public class BroadcastConnectedStream<IN1, IN2> {
private final StreamExecutionEnvironment environment;
private final DataStream<IN1> inputStream1;
private final BroadcastStream<IN2> inputStream2;
private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;
}
* 真实计算:
* -- CoBroadcastWithNonKeyedOperator -> 真正对BroadcastProcessFunction的执行,是在这里完成的
public class CoBroadcastWithNonKeyedOperator<IN1, IN2, OUT>
extends AbstractUdfStreamOperator<OUT, BroadcastProcessFunction<IN1, IN2, OUT>>
implements TwoInputStreamOperator<IN1, IN2, OUT> {
private final List<MapStateDescriptor<?, ?>> broadcastStateDescriptors;
private transient TimestampedCollector<OUT> collector;
private transient Map<MapStateDescriptor<?, ?>, BroadcastState<?, ?>> broadcastStates;
private transient ReadWriteContextImpl rwContext;
private transient ReadOnlyContextImpl rContext;
@Override
public void processElement1(StreamRecord<IN1> element) throws Exception {
collector.setTimestamp(element);
rContext.setElement(element);
// 当上游有最新业务数据来的时候,调用用户自定义的processElement
// 在这可以把之前存储的广播配置信息取出,然后对业务数据流进行处理
userFunction.processElement(element.getValue(), rContext, collector);
rContext.setElement(null);
}
@Override
public void processElement2(StreamRecord<IN2> element) throws Exception {
collector.setTimestamp(element);
rwContext.setElement(element);
// 当上游有数据来的时候,调用用户自定义的processBroadcastElement
// 在这可以把最新传送的广播配置信息存起来
userFunction.processBroadcastElement(element.getValue(), rwContext, collector);
rwContext.setElement(null);
}
}
2. DataStream的关键函数
// 就是connect,broadcast,分别生成对应的数据流
public class DataStream<T> {
protected final StreamExecutionEnvironment environment;
protected final Transformation<T> transformation;
@PublicEvolving
public <R> BroadcastConnectedStream<T, R> connect(BroadcastStream<R> broadcastStream) {
return new BroadcastConnectedStream<>(
environment,
this,
Preconditions.checkNotNull(broadcastStream),
broadcastStream.getBroadcastStateDescriptor());
}
@PublicEvolving
public BroadcastStream<T> broadcast(final MapStateDescriptor<?, ?>... broadcastStateDescriptors) {
final DataStream<T> broadcastStream = setConnectionType(new BroadcastPartitioner<>());
return new BroadcastStream<>(environment, broadcastStream, broadcastStateDescriptors);
}
}
3. 关键数据结构MapStateDescriptor
主要是用来声明各种元数据信息。后续可以看出来,系统是通过MapStateDescriptor的name,即第一个参数来存储 / 获取MapStateDescriptor对应的State。
public class MapStateDescriptor<UK, UV> extends StateDescriptor<MapState<UK, UV>, Map<UK, UV>> {
/**
* Create a new {@code MapStateDescriptor} with the given name and the given type serializers.
*
* @param name The name of the {@code MapStateDescriptor}.
* @param keySerializer The type serializer for the keys in the state.
* @param valueSerializer The type serializer for the values in the state.
*/
public MapStateDescriptor(String name, TypeSerializer<UK> keySerializer, TypeSerializer<UV> valueSerializer) {
super(name, new MapSerializer<>(keySerializer, valueSerializer), null);
}
/**
* Create a new {@code MapStateDescriptor} with the given name and the given type information.
*
* @param name The name of the {@code MapStateDescriptor}.
* @param keyTypeInfo The type information for the keys in the state.
* @param valueTypeInfo The type information for the values in the state.
*/
public MapStateDescriptor(String name, TypeInformation<UK> keyTypeInfo, TypeInformation<UV> valueTypeInfo) {
super(name, new MapTypeInfo<>(keyTypeInfo, valueTypeInfo), null);
}
/**
* Create a new {@code MapStateDescriptor} with the given name and the given type information.
*
* <p>If this constructor fails (because it is not possible to describe the type via a class),
* consider using the {@link #MapStateDescriptor(String, TypeInformation, TypeInformation)} constructor.
*
* @param name The name of the {@code MapStateDescriptor}.
* @param keyClass The class of the type of keys in the state.
* @param valueClass The class of the type of values in the state.
*/
public MapStateDescriptor(String name, Class<UK> keyClass, Class<UV> valueClass) {
super(name, new MapTypeInfo<>(keyClass, valueClass), null);
}
}
4. 状态存取
在processBroadcastElement和processElement之间传递的状态,是通过MapStateDescriptor的name为key,来存储在Flink中。即类似调用如下ctx.getBroadcastState(firstBroadcastStateDesc).put(value._1, value._2)
。所以我们接下来需要介绍下Flink的State概念。
State vs checkpoint
首先区分一下两个概念,state一般指一个具体的task/operator的状态。而checkpoint则表示了一个Flink Job,在一个特定时刻的一份全局状态快照,即包含了所有task/operator的状态。Flink通过定期地做checkpoint来实现容错和恢复。
Flink中包含两种基础的状态:Keyed State和Operator State。
Keyed State
顾名思义,就是基于KeyedStream上的状态。这个状态是跟特定的key绑定的,对KeyedStream流上的每一个key,可能都对应一个state。
Operator State
与Keyed State不同,Operator State跟一个特定operator的一个并发实例绑定,整个operator只对应一个state。相比较而言,在一个operator上,可能会有很多个key,从而对应多个keyed state。
举例来说,Flink中的Kafka Connector,就使用了operator state。它会在每个connector实例中,保存该实例中消费topic的所有(partition, offset)映射。
原始状态和Flink托管状态 (Raw and Managed State)
这又是另外一个维度。
Keyed State 和 Operator State 分别有两种存在形式:managed and raw, 即原始状态和托管状态。
托管状态是由 Flink框架运行时 管理的状态,比如内部的 hash table 或者 RocksDB。比如 “ValueState”, “ListState” 等。Flink runtime 会对这些状态进行编码并写入 checkpoint。
比如managed keyed state 接口提供不同类型状态的访问接口,这些状态都作用于当前输入数据的 key 下。换句话说,这些状态仅可在 KeyedStream
上使用,可以通过 stream.keyBy(...)
得到 KeyedStream
。而我们可以通过实现 CheckpointedFunction
或 ListCheckpointed
接口来使用 managed operator state。
Raw state即原始状态,由用户自行管理状态具体的数据结构,保存在算子自己的数据结构中。checkpoint 的时候,Flink 并不知晓具体的内容,仅仅写入一串字节序列到 checkpoint。
通常在DataStream上的状态推荐使用托管的状态,当实现一个用户自定义的operator时,会使用到原始状态。
回到我们的例子,广播变量就是OperatorState的一部分,是以托管状态的MapState形式保存的。具体getBroadcastState函数就是DefaultOperatorStateBackend中的实现。
StateDescriptor
你必须创建一个 StateDescriptor
,才能得到对应的状态句柄。这保存了状态名称(你可以创建多个状态,并且它们必须具有唯一的名称以便可以引用它们), 状态所持有值的类型,并且可能包含用户指定的函数,例如ReduceFunction
。根据不同的状态类型,可以创建ValueStateDescriptor
,ListStateDescriptor
, ReducingStateDescriptor
,FoldingStateDescriptor
或 MapStateDescriptor
。
状态通过 RuntimeContext
进行访问,因此只能在 rich functions 中使用。
OperatorStateBackEnd
OperatorStateBackEnd 主要管理OperatorState. 目前只有一种实现: DefaultOperatorStateBackend。
DefaultOperatorStateBackend
DefaultOperatorStateBackend状态是以Map方式来存储的。其构造出一个 PartitionableListState (属于ListState)。OperatorState都保存在内存中。
public class DefaultOperatorStateBackend implements OperatorStateBackend {
/**
* Map for all registered operator states. Maps state name -> state
*/
private final Map<String, PartitionableListState<?>> registeredOperatorStates;
/**
* Map for all registered operator broadcast states. Maps state name -> state
*/
private final Map<String, BackendWritableBroadcastState<?, ?>> registeredBroadcastStates;
/**
* Cache of already accessed states.
*
* <p>In contrast to {@link #registeredOperatorStates} which may be repopulated
* with restored state, this map is always empty at the beginning.
*
* <p>TODO this map should be moved to a base class once we have proper hierarchy for the operator state backends.
*/
private final Map<String, PartitionableListState<?>> accessedStatesByName;
private final Map<String, BackendWritableBroadcastState<?, ?>> accessedBroadcastStatesByName; // 这里用来缓存广播变量
// 这里就是前文中所说的,存取广播变量的API
public <K, V> BroadcastState<K, V> getBroadcastState(final MapStateDescriptor<K, V> stateDescriptor) throws StateMigrationException {
String name = Preconditions.checkNotNull(stateDescriptor.getName());
// 如果之前有,就取出来
BackendWritableBroadcastState<K, V> previous =
(BackendWritableBroadcastState<K, V>) accessedBroadcastStatesByName.get(
name);
if (previous != null) {
return previous;
}
stateDescriptor.initializeSerializerUnlessSet(getExecutionConfig());
TypeSerializer<K> broadcastStateKeySerializer = Preconditions.checkNotNull(stateDescriptor.getKeySerializer());
TypeSerializer<V> broadcastStateValueSerializer = Preconditions.checkNotNull(stateDescriptor.getValueSerializer());
BackendWritableBroadcastState<K, V> broadcastState =
(BackendWritableBroadcastState<K, V>) registeredBroadcastStates.get(name);
if (broadcastState == null) {
broadcastState = new HeapBroadcastState<>(
new RegisteredBroadcastStateBackendMetaInfo<>(
name,
OperatorStateHandle.Mode.BROADCAST,
broadcastStateKeySerializer,
broadcastStateValueSerializer));
registeredBroadcastStates.put(name, broadcastState);
} else {
// has restored state; check compatibility of new state access
RegisteredBroadcastStateBackendMetaInfo<K, V> restoredBroadcastStateMetaInfo = broadcastState.getStateMetaInfo();
// check whether new serializers are incompatible
TypeSerializerSchemaCompatibility<K> keyCompatibility =
restoredBroadcastStateMetaInfo.updateKeySerializer(broadcastStateKeySerializer);
TypeSerializerSchemaCompatibility<V> valueCompatibility =
restoredBroadcastStateMetaInfo.updateValueSerializer(broadcastStateValueSerializer);
broadcastState.setStateMetaInfo(restoredBroadcastStateMetaInfo);
}
accessedBroadcastStatesByName.put(name, broadcastState); // 如果之前没有,就存入
return broadcastState;
}
}
0x05. 参考
Flink原理与实现:详解Flink中的状态管理 https://yq.aliyun.com/articles/225623
Flink使用广播实现配置动态更新 https://www.jianshu.com/p/c8c99f613f10
Flink Broadcast State实用指南 https://blog.csdn.net/u010942041/article/details/93901918
聊聊flink的Broadcast State https://www.jianshu.com/p/d6576ae67eae
Working with State https://ci.apache.org/projects/flink/flink-docs-stable/zh/dev/stream/state/state.html