Flink CDC 系列 - Flink MongoDB CDC 在 XTransfer 的生产实践
-
Flink CDC -
MongoDB 复制机制 -
Flink MongoDB CDC -
生产实践 -
后续规划
前言
一、Flink CDC
在 Flink 内部,changelog 记录由 RowData 表示,RowData 包括 4 种类型:+I (INSERT), -U (UPDATE_BEFORE),+U (UPDATE_AFTER), -D (DELETE)。根据 changelog 产生记录类型的不同,又可以分为 3 种 changelog mode。
-
INSERT_ONLY: 只包含 +I,适用于批处理和事件流。
-
ALL: 包含 +I, -U, +U, -D 全部的 RowKind,如 MySQL binlog。
-
UPSERT: 只包含 +I, +U, -D 三种类型的 RowKind,不包含 -U,但必须按唯一键的幂等更新 , 如 MongoDB Change Streams。
二、MongoDB 复制机制
2.1 副本集和分片集群
2.2 Replica Set Oplog
{"ts" : Timestamp(1640190995, 3),"t" : NumberLong(434),"h" : NumberLong(3953156019015894279),"v" : 2,"op" : "u","ns" : "db.firm","ui" : UUID("19c72da0-2fa0-40a4-b000-83e038cd2c01"),"o2" : {"_id" : ObjectId("61c35441418152715fc3fcbc")},"wall" : ISODate("2021-12-22T16:36:35.165Z"),"o" : {"$v" : 1,"$set" : {"address" : "Shanghai China"}}}
| 字段 |
是否可空 |
描述 |
| ts | N | 操作时间,BsonTimestamp |
| t | Y | 对应raft协议里面的term,每次发生节点down掉,新节点加入,主从切换,term都会自增。 |
| h | Y | 操作的全局唯一id的hash结果 |
| v | N | oplog版本 |
| op | N | 操作类型:"i" insert, "u" update, "d" delete, "c" db cmd, "n" no op |
| ns | N | 命名空间,表示操作对应的集合全称 |
| ui | N | session id |
| o2 | Y | 在更新操作中记录_id和sharding key |
wall |
N | 操作时间,精确到毫秒 |
| o | N | 变更数据描述 |
2.3 Change Streams
■ 2.3.1 使用条件
-
WiredTiger 存储引擎
-
副本集 (测试环境下,也可以使用单节点的副本集) 或分片集群部署
-
副本集协议版本:pv1 (默认)
-
4.0 版本之前允许 Majority Read Concern: replication.enableMajorityReadConcern = true (默认允许)
-
MongoDB 用户拥有 find 和 changeStream 权限
■ 2.3.2 Change Events
{_id : { <BSON Object> },"operationType" : "<operation>","fullDocument" : { <document> },"ns" : {"db" : "<database>","coll" : "<collection>"},"to" : {"db" : "<database>","coll" : "<collection>"},"documentKey" : { "_id" : <value> },"updateDescription" : {"updatedFields" : { <document> },"removedFields" : [ "<field>", ... ],"truncatedArrays" : [{ "field" : <field>, "newSize" : <integer> },...]},"clusterTime" : <Timestamp>,"txnNumber" : <NumberLong>,"lsid" : {"id" : <UUID>,"uid" : <BinData>}}
| 字段 | 类型 | 描述 |
| _id | document | 表示resumeToken |
| operationType | string | 操作类型,包括:insert, delete, replace, update, drop, rename, dropDatabase, invalidate |
| fullDocument | document | 完整文档记录,insert, replace默认包含,update需要开启updateLookup,delete和其他操作类型不包含 |
| ns | document | 操作记录对应集合的完全名称 |
| to | document | 当操作类型为rename时,to表示重命名后的完全名称 |
| documentKey | document | 包含变更文档的主键 _id,如果该集合是一个分片集合,documentKey中也会包含分片建 |
| updateDescription | document | 当操作类型为update时,描述有变更的字段和值 |
| clusterTime | Timestamp | 操作时间 |
| txnNumber | NumberLong | 事务号 |
| lsid | Document | session id |
■ 2.3.3 Update Lookup
三、Flink MongoDB CDC
支持特性
-
支持 Exactly-Once 语义
-
支持全量、增量订阅
-
支持 Snapshot 数据过滤
-
支持从检查点、保存点恢复
-
支持元数据提取
四、生产实践
4.1 使用 RocksDB State Backend
4.2 合适的 oplog 容量和过期时间
db.adminCommand({replSetResizeOplog: 1, // 固定值1size: 20480, // 单位为MB,范围在990MB到1PBminRetentionHours: 168 // 可选项,单位为小时})
4.3 变更慢的表开启心跳事件
WITH ('connector' = 'mongodb-cdc','heartbeat.interval.ms' = '60000')
4.4 自定义 MongoDB 连接参数
WITH ('connector' = 'mongodb-cdc','connection.options' = 'authSource=authDB&maxPoolSize=3')
4.5 Change Stream 参数调优
可以在 Flink DDL 中通过 poll.await.time.ms 和 poll.max.batch.size 精细化配置变更事件的拉取。
poll.await.time.ms
变更事件拉取时间间隔,默认为 1500ms。对于变更频繁的集合,可以适当调小拉取间隔,提升处理时效;对于变更缓慢的集合,可以适当调大拉取时间间隔,减轻数据库压力。
poll.max.batch.size
每一批次拉取变更事件的最大条数,默认为 1000 条。调大改参数会加快从 Cursor 中拉取变更事件的速度,但会提升内存的开销。
4.6 订阅整库、集群变更
MongoDBSource.<String>builder().hosts("127.0.0.1:27017").database("").collection("").pipeline("[{'$match': {'ns.db': {'$regex': '/^(sandbox|firewall)$/'}}}]").deserializer(new JsonDebeziumDeserializationSchema()).build();
4.7 权限控制
MongoDB 支持对用户、角色、权限进行细粒度的管控,开启 Change Stream 的用户需要拥有 find 和 changeStream 两个权限。
单集合
{ resource: { db: <dbname>, collection: <collection> }, actions: [ "find", "changeStream" ] }
单库
{ resource: { db: <dbname>, collection: "" }, actions: [ "find", "changeStream" ] }
集群
{ resource: { db: "", collection: "" }, actions: [ "find", "changeStream" ] }
use admin;// 创建用户db.createUser({user: "flink",pwd: "flinkpw",roles: []});// 创建角色db.createRole({role: "flink_role",privileges: [{ resource: { db: "inventory", collection: "products" }, actions: [ "find", "changeStream" ] }],roles: []});// 给用户授予角色db.grantRolesToUser("flink",[// 注意:这里的db指角色创建时的db,在admin下创建的角色可以包含不同database的访问权限{ role: "flink_role", db: "admin" }]);// 给角色追加权限db.grantPrivilegesToRole("flink_role",[{ resource: { db: "inventory", collection: "orders" }, actions: [ "find", "changeStream" ] }]);
use admin;db.createUser({user: "flink",pwd: "flinkpw",roles: [{ role: "read", db: "admin" },{ role: "readAnyDatabase", db: "admin" }]});
五、后续规划
-
支持增量 Snapshot
目前,MongoDB CDC Connector 还不支持增量 Snapshot,对于数据量较大的表还不能很好发挥 Flink 并行计算的优势。后续将实现 MongoDB 的增量 Snapshot 功能,使其支持 Snapshot 阶段的 checkpoint,和并发度设置。
-
支持从指定时间进行变更订阅 目前,MongoDB CDC Connector 仅支持从当前时间开始 Change Stream 的订阅,后续将提供从指定时间点的 Change Stream 订阅。
-
支持库和集合的筛选 目前,MongoDB CDC Connector 支持集群、整库的变更订阅和筛选,但对于是否需要进行 Snapshot 的集合的筛选还不支持,后续将完善这个功能。
2022 年 1 月 8-9 日,FFA 2021 重磅开启,全球 40+ 多行业一线厂商,80+ 干货议题,带来专属于开发者的技术盛宴。
戳我,预约 FFA 2021~
