源码解析 | 万字长文详解 Flink 中的 CopyOnWriteStateTable
声明:笔者的源码分析都是基于 flink-1.9.0 release 分支,其实阅读源码不用非常在意版本的问题,各版本的主要流程基本都是类似的。如果熟悉了某个版本的源码,之后新版本有变化,我们重点看一下变化之处即可。
-
1、hash 结构为了保证读写数据的高性能,都需要有扩容策略,CopyOnWriteStateMap 的扩容策略是一个渐进式 rehash 的策略,即:不是一下子将数据全迁移的新的 hash 表,而是慢慢去迁移数据到新的 hash 表中。 -
2、Checkpoint 时 CopyOnWriteStateMap 支持异步快照,即:Checkpoint 时可以在做快照的同时,仍然对 CopyOnWriteStateMap 中数据进行修改。问题来了:数据修改了,怎么保证快照数据的准确性呢?
1.StateTable 简介
-
CopyOnWriteStateTable 属于 Flink 自己定制化的数据结构,Checkpoint 时支持异步 Snapshot。 -
NestedMapsStateTable 直接嵌套 Java 的两层 HashMap 来存储数据,Checkpoint 时需要同步快照。
2.CopyOnWriteStateTable
-
CopyOnWriteStateMap 的扩容策略是渐进式 rehash,而不是一下子扩容完 -
为了支持异步的 Snapshot,需要将 Snapshot 时 StateMap 的快照保存下来,具体的保存策略怎么实现的? -
为了支持 CopyOnWrite 功能,所以在修改数据时,要进行一系列 copy 的操作,不能修改原始数据,否则会影响 Snapshot。 -
Snapshot 异步快照流程及 Snapshot 完成时,如何 release 掉旧版本数据?
3.CopyOnWriteStateMap 的渐进式 rehash 策略
3.1 扩容简述
3.2 选择 Table 的策略
// 选择当前元素到底使用 primaryTable 还是 incrementalRehashTable
private StateMapEntry<K, N, S>[] selectActiveTable(int hashCode) {
// 计算 hashCode 应该被分到 primaryTable 的哪个桶中
int curIndex = hashCode & (primaryTable.length - 1);
// 大于等于 rehashIndex 的桶还未迁移,应该去 primaryTable 中去查找。
// 小于 rehashIndex 的桶已经迁移完成,应该去 incrementalRehashTable 中去查找。
return curIndex >= rehashIndex ? primaryTable : incrementalRehashTable;
}
3.3 迁移过程
-
检测是否处于 rehash 中,如果正在 rehash 就会调用 incrementalRehash 迁移一波数据 -
计算 key 和 namespace 对应的 hashCode
private void incrementalRehash() {
StateMapEntry<K, N, S>[] oldMap = primaryTable;
StateMapEntry<K, N, S>[] newMap = incrementalRehashTable;
int oldCapacity = oldMap.length;
int newMask = newMap.length - 1;
int requiredVersion = highestRequiredSnapshotVersion;
int rhIdx = rehashIndex;
// 记录本次迁移了几个元素
int transferred = 0;
// 每次至少迁移 MIN_TRANSFERRED_PER_INCREMENTAL_REHASH 个元素到新桶、
// MIN_TRANSFERRED_PER_INCREMENTAL_REHASH 默认为 4
while (transferred < MIN_TRANSFERRED_PER_INCREMENTAL_REHASH) {
// 遍历 oldMap 的第 rhIdx 个桶
StateMapEntry<K, N, S> e = oldMap[rhIdx];
// 每次 e 都指向 e.next,e 不为空,表示当前桶中还有元素未遍历,需要继续遍历
// 每次迁移必须保证,整个桶被迁移完,不能是某个桶迁移到一半
while (e != null) {
// 遇到版本比 highestRequiredSnapshotVersion 小的元素,则 copy 一份
if (e.entryVersion < requiredVersion) {
e = new StateMapEntry<>(e, stateMapVersion);
}
// 保存下一个要迁移的节点节点到 n
StateMapEntry<K, N, S> n = e.next;
// 迁移当前元素 e 到新的 table 中,插入到链表头部
int pos = e.hash & newMask;
e.next = newMap[pos];
newMap[pos] = e;
// e 指向下一个要迁移的节点
e = n;
// 迁移元素数 +1
++transferred;
}
oldMap[rhIdx] = null;
// rhIdx 之前的桶已经迁移完,rhIdx == oldCapacity 就表示迁移完成了
// 做一些初始化操作
if (++rhIdx == oldCapacity) {
XXX
return;
}
}
// primaryTableSize 中减去 transferred,增加 transferred
primaryTableSize -= transferred;
incrementalRehashTableSize += transferred;
rehashIndex = rhIdx;
}
4.StateMap 的 Snapshot 策略
4.1 浅拷贝原理分析
上图中也是省略了 Entry,Entry 引用的浅拷贝与之前没有扩容的情况类似。
4.2 浅拷贝源码详解
List<CopyOnWriteStateMapSnapshot<K, N, S>> getStateMapSnapshotList() {
List<CopyOnWriteStateMapSnapshot<K, N, S>> snapshotList =
new ArrayList<>(keyGroupedStateMaps.length);
// 调用所有 CopyOnWriteStateMap 的 stateSnapshot 方法
// 生成 CopyOnWriteStateMapSnapshot 保存到 list 中
for (int i = 0; i < keyGroupedStateMaps.length; i++) {
CopyOnWriteStateMap<K, N, S> stateMap =
(CopyOnWriteStateMap<K, N, S>) keyGroupedStateMaps[i];
snapshotList.add(stateMap.stateSnapshot());
}
return snapshotList;
}
public CopyOnWriteStateMapSnapshot<K, N, S> stateSnapshot() {
return new CopyOnWriteStateMapSnapshot<>(this);
}
CopyOnWriteStateMapSnapshot(CopyOnWriteStateMap<K, N, S> owningStateMap) {
super(owningStateMap);
// 对 StateMap 的数据进行浅拷贝,生成 snapshotData
this.snapshotData = owningStateMap.snapshotMapArrays();
// 记录当前的 StateMap 版本到 snapshotVersion 中
this.snapshotVersion = owningStateMap.getStateMapVersion();
this.numberOfEntriesInSnapshotData = owningStateMap.size();
}
public class CopyOnWriteStateMap<K, N, S> extends StateMap<K, N, S> {
// 当前 StateMap 的 version
private int stateMapVersion;
// 所有 正在进行中的 snapshot 的 version
private final TreeSet<Integer> snapshotVersions;
// 正在进行中的那些 snapshot 的最大版本号
private int highestRequiredSnapshotVersion;
StateMapEntry<K, N, S>[] snapshotMapArrays() {
// 1、stateMapVersion 版本 + 1,赋值给 highestRequiredSnapshotVersion,
// 并加入snapshotVersions
synchronized (snapshotVersions) {
++stateMapVersion;
highestRequiredSnapshotVersion = stateMapVersion;
snapshotVersions.add(highestRequiredSnapshotVersion);
}
// 2、 将现在 primary 和 Increment 的元素浅拷贝一份到 copy 中
// copy 策略:copy 数组长度为 primary 中剩余的桶数 + Increment 中有数据的桶数
// primary 中剩余的数据放在 copy 数组的前面,Increment 中低位数据随后,
// Increment 中高位数据放到 copy 数组的最后
StateMapEntry<K, N, S>[] table = primaryTable;
final int totalMapIndexSize = rehashIndex + table.length;
final int copiedArraySize = Math.max(totalMapIndexSize, size());
final StateMapEntry<K, N, S>[] copy = new StateMapEntry[copiedArraySize];
if (isRehashing()) {
final int localRehashIndex = rehashIndex;
final int localCopyLength = table.length - localRehashIndex;
// for the primary table, take every index >= rhIdx.
System.arraycopy(table, localRehashIndex, copy, 0, localCopyLength);
table = incrementalRehashTable;
System.arraycopy(table, 0, copy, localCopyLength, localRehashIndex);
System.arraycopy(table, table.length >>> 1, copy,
localCopyLength + localRehashIndex, localRehashIndex);
} else {
System.arraycopy(table, 0, copy, 0, table.length);
}
return copy;
}
}
-
stateMapVersion:表示当前 StateMap 的版本,每次 Snapshot 时版本号加一 -
snapshotVersions:存放所有正在进行中的 snapshot 的版本号(因为可能存在多个同时进行的 Snapshot) -
highestRequiredSnapshotVersion:表示正在进行中的那些 snapshot 的最大版本号,如果当前没有正在进行中的 Snapshot,那么赋值为 0
注:copy 数组的长度与上述原理分析不完全一致,原理分析时应该是 copiedArraySize = totalMapIndexSize;实际上 copiedArraySize = Math.max(totalMapIndexSize, size())。
源码注释写到:理论上 totalMapIndexSize 就够了,这里考虑 size 主要是为了兼容 StateMap 的 TransformedSnapshotIterator 功能。
5.CopyOnWrite 实现原理
5.1 CopyOnWrite 原理简述
5.2 CopyOnWrite 原理详解
■ 5.2.1 修改链表头部节点的场景
-
深拷贝一个 Entry a 对象为 Entry a copy -
将 Entry a copy 放到 primaryTable 的链表中,且 next 指向 Entry b -
应用层修改 Entry a copy 的 data,将 data1 修改为设定的 data2
■ 5.2.2 修改链表中间节点的场景
-
深拷贝一个 Entry b 对象为 Entry b copy -
将 Entry b copy 串在 primaryTable 的链表中,且 next 指向 Entry c -
应用层修改 Entry b copy 的 data,将 data 修改为设定的 data2
-
深拷贝 Entry a 和 b 对象为 Entry a copy 和 b copy -
将 Entry a copy 和 b copy 串在 primaryTable 的链表中,且 Entry b 的 next 指向 Entry c -
应用层修改 Entry b copy 的 data,将 data 修改为设定的 data2
■ 5.2.3 插入新数据的场景
注:这里必须是插入新数据的场景,对于 Map 类型,插入旧数据对应的可能是修改操作
■ 5.2.4 链表头部有新节点再修改链表中间节点的场景
-
深拷贝 Entry a 和 b 对象为 Entry a copy 和 b copy -
将 Entry a copy 和 b copy 串在 Entry d 的链表中,且 Entry b 的 next 指向 Entry c -
应用层修改 Entry b copy 的 data,将 data 修改为设定的 data2
■ 5.2.5 get 链表中间节点的场景
■ 5.2.6 remove 数据的场景
■ 5.2.7 COW 原理小结
-
插入新的 Entry 使用头插法插入到链表中 -
假设要修改 Entry b,那么要将 Entry b 以及链表中 Entry b 之前的 Entry 必须全部 copy 一份(新插入的数据不需要拷贝),Entry b 之后的 Entry 可以共享 -
访问 Entry b 的场景与修改 Entry b 的场景类似 -
假如修改或访问的数据是 copy 后的数据,那么实际上不需要再 copy 了,因为 copy 后的数据已经保证是 primaryTable 独占的数据,不与 Snapshot 共享 -
remove 数据的场景,分为两种 case: -
如果 remove 的 Entry 是链表头节点,将桶直接指向头结点的 next 节点即可。 -
如果 remove 的 Entry 不是链表头节点,需要将目标 Entry 之前的所有 Entry 拷贝一份,且目标 Entry 前一个节点的副本直接指向目标 Entry 的下一个节点。当然如果前继节点已经是新版本了,则不需要拷贝,直接修改前继 Entry 的 next 指针即可。
5.3 CopyOnWriteStateMap 各种操作源码详解
■ 5.3.1 CopyOnWriteStateMap 介绍
public class CopyOnWriteStateMap<K, N, S> extends StateMap<K, N, S> {
// 默认容量 128,即:hash 表中桶的个数默认 128
public static final int DEFAULT_CAPACITY = 128;
// hash 扩容迁移数据时,每次最少要迁移 4 条数据
private static final int MIN_TRANSFERRED_PER_INCREMENTAL_REHASH = 4;
// State 的序列化器
protected final TypeSerializer<S> stateSerializer;
// 空表:提前创建好
private static final StateMapEntry<?, ?, ?>[] EMPTY_TABLE =
new StateMapEntry[MINIMUM_CAPACITY >>> 1];
// 当前 StateMap 的 version,每次创建一个 Snapshot 时,StateMap 的版本号加一
private int stateMapVersion;
// 所有 正在进行中的 snapshot 的 version
// 每次创建出一个 Snapshot 时,都需要将 Snapshot 的 version 保存到该 Set 中
private final TreeSet<Integer> snapshotVersions;
// 正在进行中的那些 snapshot 的最大版本号
// 这里保存的就是 TreeSet<Integer> snapshotVersions 中最大的版本号
private int highestRequiredSnapshotVersion;
// 主表:用于存储数据的 table
private StateMapEntry<K, N, S>[] primaryTable;
// 扩容时的新表,扩容期间数组长度为 primaryTable 的 2 倍。
// 非扩容期间为 空表
private StateMapEntry<K, N, S>[] incrementalRehashTable;
// primaryTable 中元素个数
private int primaryTableSize;
// incrementalRehashTable 中元素个数
private int incrementalRehashTableSize;
// primary table 中增量 rehash 要迁移的下一个 index
// 即:primaryTable 中 rehashIndex 之前的数据全部搬移完成
private int rehashIndex;
// 扩容阈值,与 HashMap 类似,当元素个数大于 threshold 时,就会开始扩容。
// 默认 threshold 为 StateMap 容量 * 0.75
private int threshold;
// 用于记录元素修改的次数,遍历迭代过程中,发现 modCount 修改了,则抛异常
private int modCount;
}
■ 5.3.2 StateMapEntry
static class Node<K,V> implements Map.Entry<K,V> {
// 当前 key 对应的 hash 值
final int hash;
final K key;
V value;
// next 指向当前桶中下一个 Node
Node<K,V> next;
}
-
根据 key 计算 hash 值,定位到具体的桶 -
遍历当前桶的一个个 Entry,先比较 hash 值是否相同,在比较 key 是否相同(使用 equals 判断 key 是否相同) -
如果 hash 值和 key 的 equals 方法都能匹配,表示找到了对应的 Entry,返回 Entry 中的 value 即可
protected static class StateMapEntry<K, N, S> implements StateEntry<K, N, S> {
final K key;
final N namespace;
S state;
final int hash;
StateMapEntry<K, N, S> next;
// new entry 时的版本号
int entryVersion;
// state (数据)更新时的 版本号
int stateVersion;
}
-
namespace:namespace 是 Flink 中的概念,用于区分不同的 Window,在 StateMapEntry 中 key 和 namespace 组合起来作为共同的主键,state 作为 value -
entryVersion:表示创建 entry 时的版本号 -
stateVersion:表示当前 StateMapEntry 中 state (数据)更新时的版本号
■ 5.3.3 插入新数据源码流程
public void put(K key, N namespace, S value) {
// putEntry 用于找到对应的 Entry,
// 包括了修改数据或插入新数据的场景
final StateMapEntry<K, N, S> e = putEntry(key, namespace);
// 将 value set 到 Entry 中
e.state = value;
// state 更新了,所以要更新 stateVersion
e.stateVersion = stateMapVersion;
}
private StateMapEntry<K, N, S> putEntry(K key, N namespace) {
// 计算当前对应的 hash 值,选择 primaryTable 或 incrementalRehashTable
final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);
final StateMapEntry<K, N, S>[] tab = selectActiveTable(hash);
int index = hash & (tab.length - 1);
// 遍历当前桶中链表的一个个 Entry
for (StateMapEntry<K, N, S> e = tab[index]; e != null; e = e.next) {
// 如果根据 key 和 namespace 找到了对应的 Entry,则认为是修改数据
// 普通的 HashMap 结构有一个 Key ,而这里 key 和 namespace 的组合当做 key
if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) {
// 修改数据逻辑(暂时忽略)
if (e.entryVersion < highestRequiredSnapshotVersion) {
e = handleChainedEntryCopyOnWrite(tab, index, e);
}
// 修改数据,直接返回对应的 Entry
return e;
}
}
// 代码走到这里,说明原始的链表中没找到对应 Entry,即:插入新数据的逻辑
++modCount;
if (size() > threshold) {
doubleCapacity();
}
// 链中没有找到 key 和 namespace 的数据
return addNewStateMapEntry(tab, key, namespace, hash);
}
这里注意,普通的 HashMap 结构有一个 Key 一个 value。而这里 key 和 namespace 的组合当做 Map 的 key,value 仍然是原来的 value。
■ 5.3.4 修改数据源码流程
// 如果根据 key 和 namespace 找到了对应的 Entry,则认为是修改数据
// 普通的 HashMap 结构有一个 Key ,而这里 key 和 namespace 的组合当做 key
if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) {
// entryVersion 表示 entry 创建时的版本号
// highestRequiredSnapshotVersion 表示 正在进行中的那些 snapshot 的最大版本号
// entryVersion 小于 highestRequiredSnapshotVersion,说明 Entry 的版本小于当前某些 Snapshot 的版本号,
// 即:当前 Entry 是旧版本的数据,当前 Entry 被其他 snapshot 持有。
// 为了保证 Snapshot 的数据正确性,这里必须为 e 创建新的副本,且 e 之前的某些元素也需要 copy 副本
// handleChainedEntryCopyOnWrite 方法将会进行相应的 copy 操作,并返回 e 的新副本
// 然后将返回 handleChainedEntryCopyOnWrite 方法返回的 e 的副本返回给上层,进行数据的修改操作。
if (e.entryVersion < highestRequiredSnapshotVersion) {
e = handleChainedEntryCopyOnWrite(tab, index, e);
}
// 反之,entryVersion >= highestRequiredSnapshotVersion
// 说明当前 Entry 创建时的版本比所有 Snapshot 的版本高
// 即:当前 Entry 是新版本的数据,不被任何 Snapshot 持有
// 注:Snapshot 不可能引用高版本的数据
// 此时,e 是新的 Entry,不存在共享问题,所以直接修改当前 Entry 即可,所以返回当前 e
return e;
}
-
entryVersion 小于 highestRequiredSnapshotVersion,说明 Entry 创建时的版本小于当前某些 Snapshot 的版本号,即:当前 Entry 是旧版本的数据,当前 Entry 被其他 Snapshot 持有。为了保证 Snapshot 的数据正确性,这里必须为 e 创建新的副本,且 e 之前的某些元素也需要 copy 副本,handleChainedEntryCopyOnWrite 方法将会进行相应的 copy 操作,并返回 e 的新副本。最后将 e 的副本返回给上层,进行数据的修改操作。 -
反之,entryVersion >= highestRequiredSnapshotVersion,说明当前 Entry 创建时的版本比所有 Snapshot 的版本高。Snapshot 不可能引用高版本的数据,所以当前 Entry 是新版本的数据不被任何 Snapshot 持有。此时 e 是新的 Entry,不存在共享问题,所以直接修改当前 Entry 即可,所以返回当前 e。
private StateMapEntry<K, N, S> handleChainedEntryCopyOnWrite(
StateMapEntry<K, N, S>[] tab,
int mapIdx,
StateMapEntry<K, N, S> untilEntry) {
// current 指向当前桶的头结点
StateMapEntry<K, N, S> current = tab[mapIdx];
StateMapEntry<K, N, S> copy;
// 判断头结点创建时的版本是否低于 highestRequiredSnapshotVersion
// 如果低于,则 current 节点被 Snapshot 引用,所以需要 new 一个新的 Entry
if (current.entryVersion < highestRequiredSnapshotVersion) {
copy = new StateMapEntry<>(current, stateMapVersion);
tab[mapIdx] = copy;
} else {
copy = current;
}
// 依次遍历当前桶的元素,直到遍历到 untilEntry 节点,也就是我们要修改的 Entry 节点
while (current != untilEntry) {
current = current.next;
// current 版本小于 highestRequiredSnapshotVersion,则需要拷贝,
// 否则不用拷贝
if (current.entryVersion < highestRequiredSnapshotVersion) {
// entryVersion 表示创建 Entry 时的 version,
// 所以新创建的 Entry 对应的 entryVersion 要更新为当前 StateMap 的 version
copy.next = new StateMapEntry<>(current, stateMapVersion);
copy = copy.next;
} else {
copy = current;
}
}
return copy;
}
-
如果低于则 current 节点被 Snapshot 引用,所以需要 new 一个新的 Entry,也就是所谓的拷贝一个副本。 -
否则不用拷贝。
■ 5.3.5 访问数据源码流程
if ((e.hash == hash && key.equals(eKey) && namespace.equals(eNamespace))) {
// 一旦 get 当前数据,为了防止应用层修改数据内部的属性值,
// 所以必须保证这是一个最新的 Entry,并更新其 stateVersion
// 首先检查当前的 State,也就是 value 值是否是旧版本数据,
// 如果 value 是旧版本,则必须深拷贝一个 value
// 否则 value 是新版本,直接返回给应用层
if (e.stateVersion < requiredVersion) {
// 此时还有两种情况,
// 1、如果当前 Entry 是旧版本的,则 Entry 也需要拷贝一份,
// 按照之前分析过的 handleChainedEntryCopyOnWrite 策略拷贝即可
// 2、当前 Entry 是新版本数据,则不需要拷贝,直接修改其 State 即可
if (e.entryVersion < requiredVersion) {
e = handleChainedEntryCopyOnWrite(tab, hash & (tab.length - 1), e);
}
// 更新其 stateVersion
e.stateVersion = stateMapVersion;
// 通过序列化器,深拷贝一个数据
e.state = getStateSerializer().copy(e.state);
}
return e.state;
}
-
如果 value 是旧版本,则必须深拷贝一个 value -
否则 value 是新版本,直接返回给应用层
-
1、如果当前 Entry 是旧版本的,则 Entry 也需要拷贝一份,按照之前分析过的 handleChainedEntryCopyOnWrite 策略拷贝即可 -
2、当前 Entry 是新版本数据,则不需要拷贝,直接修改其 State 即可
■ 5.3.6 remove 数据源码流程
private StateMapEntry<K, N, S> removeEntry(K key, N namespace) {
final int hash = computeHashForOperationAndDoIncrementalRehash(key, namespace);
final StateMapEntry<K, N, S>[] tab = selectActiveTable(hash);
int index = hash & (tab.length - 1);
for (StateMapEntry<K, N, S> e = tab[index], prev = null;
e != null; prev = e, e = e.next) {
if (e.hash == hash && key.equals(e.key) && namespace.equals(e.namespace)) {
// 如果要删除的 Entry 不存在前继节点,说明要删除的 Entry 是头结点,
// 直接将桶直接指向头结点的 next 节点即可。
if (prev == null) {
tab[index] = e.next;
} else {
// 如果 remove 的 Entry 不是链表头节点,需要将目标 Entry 之前的所有 Entry 拷贝一份,
// 且目标 Entry 前一个节点的副本直接指向目标 Entry 的下一个节点。
// 当然如果前继节点已经是新版本了,则不需要拷贝,直接修改前继 Entry 的 next 指针即可。
// copy-on-write check for entry
if (prev.entryVersion < highestRequiredSnapshotVersion) {
prev = handleChainedEntryCopyOnWrite(tab, index, prev);
}
prev.next = e.next;
}
// 修改一些计数器
++modCount;
if (tab == primaryTable) {
--primaryTableSize;
} else {
--incrementalRehashTableSize;
}
return e;
}
}
return null;
}
-
如果 remove 的 Entry 是链表头节点,将桶直接指向头结点的 next 节点即可。 -
如果 remove 的 Entry 不是链表头节点,需要将目标 Entry 之前的所有 Entry 拷贝一份,且目标 Entry 前一个节点的副本直接指向目标 Entry 的下一个节点。当然如果前继节点已经是新版本了,则不需要拷贝,直接修改前继 Entry 的 next 指针即可。
6.Snapshot 流程及完成后的 release 操作
public void writeStateInKeyGroup(@Nonnull DataOutputView dov, int keyGroupId) {
// 获取 KeyGroupId 对应的 CopyOnWriteStateMapSnapshot
StateMapSnapshot<K, N, S, ? extends StateMap<K, N, S>> stateMapSnapshot =
getStateMapSnapshotForKeyGroup(keyGroupId);
// 将 stateMapSnapshot 中的 State 数据进行序列化输出
stateMapSnapshot.writeState(localKeySerializer, localNamespaceSerializer,
localStateSerializer, dov, stateSnapshotTransformer);
// stateMapSnapshot 对应的数据已经遍历完了,所以可以释放该快照
stateMapSnapshot.release();
}
void releaseSnapshot(int snapshotVersion) {
synchronized (snapshotVersions) {
// 将 相应的 snapshotVersion 从 snapshotVersions 中 remove
snapshotVersions.remove(snapshotVersion);
// 将 snapshotVersions 的最大值更新到 highestRequiredSnapshotVersion,
// 如果snapshotVersions 为空,则 highestRequiredSnapshotVersion 更新为 0
highestRequiredSnapshotVersion = snapshotVersions.isEmpty() ?
0 : snapshotVersions.last();
}
}
7.总结
本文涉及的 github 仓库,都在 feature/source-code-read-1-9-0 分支,之后也会持续更新:
https://github.com/1996fanrui/flink/tree/feature/source-code-read-1-9-0注释