Flink CDC 系列 - Flink MongoDB CDC 在 XTransfer 的生产实践
-
Flink CDC -
MongoDB 复制机制 -
Flink MongoDB CDC -
生产实践 -
后续规划
前言
一、Flink CDC
在 Flink 内部,changelog 记录由 RowData 表示,RowData 包括 4 种类型:+I (INSERT), -U (UPDATE_BEFORE),+U (UPDATE_AFTER), -D (DELETE)。根据 changelog 产生记录类型的不同,又可以分为 3 种 changelog mode。
-
INSERT_ONLY: 只包含 +I,适用于批处理和事件流。
-
ALL: 包含 +I, -U, +U, -D 全部的 RowKind,如 MySQL binlog。
-
UPSERT: 只包含 +I, +U, -D 三种类型的 RowKind,不包含 -U,但必须按唯一键的幂等更新 , 如 MongoDB Change Streams。
二、MongoDB 复制机制
2.1 副本集和分片集群
2.2 Replica Set Oplog
{
"ts" : Timestamp(1640190995, 3),
"t" : NumberLong(434),
"h" : NumberLong(3953156019015894279),
"v" : 2,
"op" : "u",
"ns" : "db.firm",
"ui" : UUID("19c72da0-2fa0-40a4-b000-83e038cd2c01"),
"o2" : {
"_id" : ObjectId("61c35441418152715fc3fcbc")
},
"wall" : ISODate("2021-12-22T16:36:35.165Z"),
"o" : {
"$v" : 1,
"$set" : {
"address" : "Shanghai China"
}
}
}
字段 |
是否可空 |
描述 |
ts | N | 操作时间,BsonTimestamp |
t | Y | 对应raft协议里面的term,每次发生节点down掉,新节点加入,主从切换,term都会自增。 |
h | Y | 操作的全局唯一id的hash结果 |
v | N | oplog版本 |
op | N | 操作类型:"i" insert, "u" update, "d" delete, "c" db cmd, "n" no op |
ns | N | 命名空间,表示操作对应的集合全称 |
ui | N | session id |
o2 | Y | 在更新操作中记录_id和sharding key |
wall |
N | 操作时间,精确到毫秒 |
o | N | 变更数据描述 |
2.3 Change Streams
■ 2.3.1 使用条件
-
WiredTiger 存储引擎
-
副本集 (测试环境下,也可以使用单节点的副本集) 或分片集群部署
-
副本集协议版本:pv1 (默认)
-
4.0 版本之前允许 Majority Read Concern: replication.enableMajorityReadConcern = true (默认允许)
-
MongoDB 用户拥有 find 和 changeStream 权限
■ 2.3.2 Change Events
{
_id : { <BSON Object> },
"operationType" : "<operation>",
"fullDocument" : { <document> },
"ns" : {
"db" : "<database>",
"coll" : "<collection>"
},
"to" : {
"db" : "<database>",
"coll" : "<collection>"
},
"documentKey" : { "_id" : <value> },
"updateDescription" : {
"updatedFields" : { <document> },
"removedFields" : [ "<field>", ... ],
"truncatedArrays" : [
{ "field" : <field>, "newSize" : <integer> },
...
]
},
"clusterTime" : <Timestamp>,
"txnNumber" : <NumberLong>,
"lsid" : {
"id" : <UUID>,
"uid" : <BinData>
}
}
字段 | 类型 | 描述 |
_id | document | 表示resumeToken |
operationType | string | 操作类型,包括:insert, delete, replace, update, drop, rename, dropDatabase, invalidate |
fullDocument | document | 完整文档记录,insert, replace默认包含,update需要开启updateLookup,delete和其他操作类型不包含 |
ns | document | 操作记录对应集合的完全名称 |
to | document | 当操作类型为rename时,to表示重命名后的完全名称 |
documentKey | document | 包含变更文档的主键 _id,如果该集合是一个分片集合,documentKey中也会包含分片建 |
updateDescription | document | 当操作类型为update时,描述有变更的字段和值 |
clusterTime | Timestamp | 操作时间 |
txnNumber | NumberLong | 事务号 |
lsid | Document | session id |
■ 2.3.3 Update Lookup
三、Flink MongoDB CDC
支持特性
-
支持 Exactly-Once 语义
-
支持全量、增量订阅
-
支持 Snapshot 数据过滤
-
支持从检查点、保存点恢复
-
支持元数据提取
四、生产实践
4.1 使用 RocksDB State Backend
4.2 合适的 oplog 容量和过期时间
db.adminCommand(
{
replSetResizeOplog: 1, // 固定值1
size: 20480, // 单位为MB,范围在990MB到1PB
minRetentionHours: 168 // 可选项,单位为小时
}
)
4.3 变更慢的表开启心跳事件
WITH (
'connector' = 'mongodb-cdc',
'heartbeat.interval.ms' = '60000'
)
4.4 自定义 MongoDB 连接参数
WITH (
'connector' = 'mongodb-cdc',
'connection.options' = 'authSource=authDB&maxPoolSize=3'
)
4.5 Change Stream 参数调优
可以在 Flink DDL 中通过 poll.await.time.ms 和 poll.max.batch.size 精细化配置变更事件的拉取。
poll.await.time.ms
变更事件拉取时间间隔,默认为 1500ms。对于变更频繁的集合,可以适当调小拉取间隔,提升处理时效;对于变更缓慢的集合,可以适当调大拉取时间间隔,减轻数据库压力。
poll.max.batch.size
每一批次拉取变更事件的最大条数,默认为 1000 条。调大改参数会加快从 Cursor 中拉取变更事件的速度,但会提升内存的开销。
4.6 订阅整库、集群变更
MongoDBSource.<String>builder()
.hosts("127.0.0.1:27017")
.database("")
.collection("")
.pipeline("[{'$match': {'ns.db': {'$regex': '/^(sandbox|firewall)$/'}}}]")
.deserializer(new JsonDebeziumDeserializationSchema())
.build();
4.7 权限控制
MongoDB 支持对用户、角色、权限进行细粒度的管控,开启 Change Stream 的用户需要拥有 find 和 changeStream 两个权限。
单集合
{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }
单库
{ resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] }
集群
{ resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }
use admin;
// 创建用户
db.createUser(
{
user: "flink",
pwd: "flinkpw",
roles: []
}
);
// 创建角色
db.createRole(
{
role: "flink_role",
privileges: [
{ resource: { db: "inventory", collection: "products" }, actions: [ "find", "changeStream" ] }
],
roles: []
}
);
// 给用户授予角色
db.grantRolesToUser(
"flink",
[
// 注意:这里的db指角色创建时的db,在admin下创建的角色可以包含不同database的访问权限
{ role: "flink_role", db: "admin" }
]
);
// 给角色追加权限
db.grantPrivilegesToRole(
"flink_role",
[
{ resource: { db: "inventory", collection: "orders" }, actions: [ "find", "changeStream" ] }
]
);
use admin;
db.createUser({
user: "flink",
pwd: "flinkpw",
roles: [
{ role: "read", db: "admin" },
{ role: "readAnyDatabase", db: "admin" }
]
});
五、后续规划
-
支持增量 Snapshot
目前,MongoDB CDC Connector 还不支持增量 Snapshot,对于数据量较大的表还不能很好发挥 Flink 并行计算的优势。后续将实现 MongoDB 的增量 Snapshot 功能,使其支持 Snapshot 阶段的 checkpoint,和并发度设置。
-
支持从指定时间进行变更订阅 目前,MongoDB CDC Connector 仅支持从当前时间开始 Change Stream 的订阅,后续将提供从指定时间点的 Change Stream 订阅。
-
支持库和集合的筛选 目前,MongoDB CDC Connector 支持集群、整库的变更订阅和筛选,但对于是否需要进行 Snapshot 的集合的筛选还不支持,后续将完善这个功能。
2022 年 1 月 8-9 日,FFA 2021 重磅开启,全球 40+ 多行业一线厂商,80+ 干货议题,带来专属于开发者的技术盛宴。
戳我,预约 FFA 2021~