Flink 最佳实践之 通过 TiCDC 将 TiDB 数据流入 Flink
背景介绍
本文将介绍如何将 TiDB 中的数据,通过 TiCDC 导入到 Kafka 中,继而被 Flink 消费的案例。
为了能够快速的验证整套流程的功能性,所有的组件都以单机的形式部署。如果需要在生产环境中部署,建议将每一个组件替换成高可用的集群部署方案。
其中,我们单独创建了一套 Zookeeper 单节点环境,Flink、Kafka、等组件共用这个 Zookeeper 环境。
针对于所有需要 JRE 的组件,如 Flink,Kafka,Zookeeper,考虑到升级 JRE 可能会影响到其他的应用,我们选择每个组件独立使用自己的 JRE 环境。
本文分为两个部分,其中,前五小节主要介绍基础环境的搭建,最后一个小节介绍了数据是如何在各个组件中流通的。
应用场景介绍
TiDB + Flink 的结构,支持开发与运行多种不同种类的应用程序。
目前主要的特性主要包括:
-
批流一体化 -
精密的状态管理 -
事件时间支持 -
精确的一次状态一致性保障
Flink 可以运行在包括 YARN、Mesos、Kubernetes 在内的多种资源管理框架上,还支持裸机集群上独立部署。TiDB 可以部署 AWS、Kubernetes、GCP GKE 上,同时也支持使用 TiUP 在裸机集群上独立部署。
TiDB + Flink 结构常见的几类应用如下:
-
事件驱动型应用 -
反欺诈 -
异常检测 -
基于规则的报警 -
业务流程监控 -
数据分析应用 -
网络质量监控 -
产品更新及试验评估分析 -
事实数据即席分析 -
大规模图分析 -
数据管道应用 -
电商实时查询索引构建 -
电商持续 ETL
操作系统环境
[root@r20 topology]# cat /etc/redhat-release
CentOS Stream release 8
软件环境
机器分配
部署 TiDB Cluster
与传统的单机数据库相比,TiDB 具有以下优势:
-
纯分布式架构,拥有良好的扩展性,支持弹性的扩缩容 -
支持 SQL,对外暴露 MySQL 的网络协议,并兼容大多数 MySQL 的语法,在大多数场景下可以直接替换 MySQL -
默认支持高可用,在少数副本失效的情况下,数据库本身能够自动进行数据修复和故障转移,对业务透明 -
支持 ACID 事务,对于一些有强一致需求的场景友好,例如:银行转账 -
具有丰富的工具链生态,覆盖数据迁移、同步、备份等多种场景
在内核设计上,TiDB 分布式数据库将整体架构拆分成了多个模块,各模块之间互相通信,组成完整的 TiDB 系统。对应的架构图如下:
在本文中,我们只做最简单的功能测试,所以部署了一套单节点但副本的 TiDB,涉及到了以下的三个模块:
-
TiDB Server:SQL 层,对外暴露 MySQL 协议的连接 endpoint,负责接受客户端的连接,执行 SQL 解析和优化,最终生成分布式执行计划。 -
PD (Placement Driver) Server:整个 TiDB 集群的元信息管理模块,负责存储每个 TiKV 节点实时的数据分布情况和集群的整体拓扑结构,提供 TiDB Dashboard 管控界面,并为分布式事务分配事务 ID。 -
TiKV Server:负责存储数据,从外部看 TiKV 是一个分布式的提供事务的 Key-Value 存储引擎。
TiUP 部署模板文件
# # Global variables are applied to all deployments and used as the default value of
# # the deployments if a specific deployment value is missing.
global:
user: "tidb"
ssh_port: 22
deploy_dir: "/opt/tidb-c1/"
data_dir: "/opt/tidb-c1/data/"
# # Monitored variables are applied to all the machines.
#monitored:
# node_exporter_port: 19100
# blackbox_exporter_port: 39115
# deploy_dir: "/opt/tidb-c3/monitored"
# data_dir: "/opt/tidb-c3/data/monitored"
# log_dir: "/opt/tidb-c3/log/monitored"
# # Server configs are used to specify the runtime configuration of TiDB components.
# # All configuration items can be found in TiDB docs:
# # - TiDB: https://pingcap.com/docs/stable/reference/configuration/tidb-server/configuration-file/
# # - TiKV: https://pingcap.com/docs/stable/reference/configuration/tikv-server/configuration-file/
# # - PD: https://pingcap.com/docs/stable/reference/configuration/pd-server/configuration-file/
# # All configuration items use points to represent the hierarchy, e.g:
# # readpool.storage.use-unified-pool
# #
# # You can overwrite this configuration via the instance-level `config` field.
server_configs:
tidb:
log.slow-threshold: 300
binlog.enable: false
binlog.ignore-error: false
tikv-client.copr-cache.enable: true
tikv:
server.grpc-concurrency: 4
raftstore.apply-pool-size: 2
raftstore.store-pool-size: 2
rocksdb.max-sub-compactions: 1
storage.block-cache.capacity: "16GB"
readpool.unified.max-thread-count: 12
readpool.storage.use-unified-pool: false
readpool.coprocessor.use-unified-pool: true
raftdb.rate-bytes-per-sec: 0
pd:
schedule.leader-schedule-limit: 4
schedule.region-schedule-limit: 2048
schedule.replica-schedule-limit: 64
pd_servers:
- host: 192.168.12.21
ssh_port: 22
name: "pd-2"
client_port: 12379
peer_port: 12380
deploy_dir: "/opt/tidb-c1/pd-12379"
data_dir: "/opt/tidb-c1/data/pd-12379"
log_dir: "/opt/tidb-c1/log/pd-12379"
numa_node: "0"
# # The following configs are used to overwrite the `server_configs.pd` values.
config:
schedule.max-merge-region-size: 20
schedule.max-merge-region-keys: 200000
tidb_servers:
- host: 192.168.12.21
ssh_port: 22
port: 14000
status_port: 12080
deploy_dir: "/opt/tidb-c1/tidb-14000"
log_dir: "/opt/tidb-c1/log/tidb-14000"
numa_node: "0"
# # The following configs are used to overwrite the `server_configs.tidb` values.
config:
log.slow-query-file: tidb-slow-overwrited.log
tikv-client.copr-cache.enable: true
tikv_servers:
- host: 192.168.12.21
ssh_port: 22
port: 12160
status_port: 12180
deploy_dir: "/opt/tidb-c1/tikv-12160"
data_dir: "/opt/tidb-c1/data/tikv-12160"
log_dir: "/opt/tidb-c1/log/tikv-12160"
numa_node: "0"
# # The following configs are used to overwrite the `server_configs.tikv` values.
config:
server.grpc-concurrency: 4
#server.labels: { zone: "zone1", dc: "dc1", host: "host1" }
#monitoring_servers:
# - host: 192.168.12.21
# ssh_port: 22
# port: 19090
# deploy_dir: "/opt/tidb-c1/prometheus-19090"
# data_dir: "/opt/tidb-c1/data/prometheus-19090"
# log_dir: "/opt/tidb-c1/log/prometheus-19090"
#grafana_servers:
# - host: 192.168.12.21
# port: 13000
# deploy_dir: "/opt/tidb-c1/grafana-13000"
#alertmanager_servers:
# - host: 192.168.12.21
# ssh_port: 22
# web_port: 19093
# cluster_port: 19094
# deploy_dir: "/opt/tidb-c1/alertmanager-19093"
# data_dir: "/opt/tidb-c1/data/alertmanager-19093"
# log_dir: "/opt/tidb-c1/log/alertmanager-19093"
TiDB Cluster 环境
本文重点并非部署 TiDB Cluster,作为快速实验环境,只在一台机器上部署单副本的 TiDB Cluster 集群。不需要部署监控环境。
[root@r20 topology]# tiup cluster display tidb-c1-v409
Starting component `cluster`: /root/.tiup/components/cluster/v1.3.2/tiup-cluster display tidb-c1-v409
Cluster type: tidb
Cluster name: tidb-c1-v409
Cluster version: v4.0.9
SSH type: builtin
Dashboard URL: http://192.168.12.21:12379/dashboard
ID Role Host Ports OS/Arch Status Data Dir Deploy Dir
-- ---- ---- ----- ------- ------ -------- ----------
192.168.12.21:12379 pd 192.168.12.21 12379/12380 linux/x86_64 Up|L|UI /opt/tidb-c1/data/pd-12379 /opt/tidb-c1/pd-12379
192.168.12.21:14000 tidb 192.168.12.21 14000/12080 linux/x86_64 Up - /opt/tidb-c1/tidb-14000
192.168.12.21:12160 tikv 192.168.12.21 12160/12180 linux/x86_64 Up /opt/tidb-c1/data/tikv-12160 /opt/tidb-c1/tikv-12160
Total nodes: 4
创建用于测试的表
mysql> show create table t1;
+-------+-------------------------------------------------------------------------------------------------------------------------------+
| Table | Create Table |
+-------+-------------------------------------------------------------------------------------------------------------------------------+
| t1 | CREATE TABLE `t1` (
`id` int(11) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin |
+-------+-------------------------------------------------------------------------------------------------------------------------------+
1 row in set (0.00 sec)
部署 Zookeeper 环境
在本实验中单独配置 Zookeeper 环境,为 Kafka 和 Flink 环境提供服务。
作为实验演示方案,只部署单机环境。
解压 Zookeeper 包
[root@r24 soft]# tar vxzf apache-zookeeper-3.6.2-bin.tar.gz
[root@r24 soft]# mv apache-zookeeper-3.6.2-bin /opt/zookeeper
部署用于 Zookeeper 的 jre
[root@r24 soft]# tar vxzf jre1.8.0_281.tar.gz
[root@r24 soft]# mv jre1.8.0_281 /opt/zookeeper/jre
修改 /opt/zookeeper/bin/zkEnv.sh 文件,增加 JAVA_HOME 环境变量
## add bellowing env var in the head of zkEnv.sh
JAVA_HOME=/opt/zookeeper/jre
创建 Zookeeper 的配置文件
[root@r24 conf]# cat zoo.cfg | grep -v "#"
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zookeeper/data
clientPort=2181
启动 Zookeeper
[root@r24 bin]# /opt/zookeeper/bin/zkServer.sh start
检查 Zookeeper 的状态
## check zk status
[root@r24 bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: standalone
## check OS port status
[root@r24 bin]# netstat -ntlp
Active Internet connections (only servers)
Proto Recv-Q Send-Q Local Address Foreign Address State PID/Program name
tcp 0 0 0.0.0.0:22 0.0.0.0:* LISTEN 942/sshd
tcp6 0 0 :::2181 :::* LISTEN 15062/java
tcp6 0 0 :::8080 :::* LISTEN 15062/java
tcp6 0 0 :::22 :::* LISTEN 942/sshd
tcp6 0 0 :::44505 :::* LISTEN 15062/java
## use zkCli tool to check zk connection
[root@r24 bin]# ./zkCli.sh -server 192.168.12.24:2181
关于 Zookeeper 的建议
我个人有一个关于 Zookeeper 的不成熟的小建议:
Zookeeper 集群版本一定要开启网络监控。
特别是要关注 system metrics 里面的 network bandwidth。
部署 Kafka
Kafka 是一个分布式流处理平台,主要应用于两大类的应用中:
-
构造实时流数据管道,它可以在系统或应用之间可靠地获取数据。(相当于message queue) -
构建实时流式应用程序,对这些流数据进行转换或者影响。(就是流处理,通过kafka stream topic和topic之间内部进行变化)
Kafka 有四个核心的 API:
-
The Producer API 允许一个应用程序发布一串流式的数据到一个或者多个Kafka topic。 -
The Consumer API 允许一个应用程序订阅一个或多个 topic ,并且对发布给他们的流式数据进行处理。 -
The Streams API 允许一个应用程序作为一个流处理器,消费一个或者多个topic产生的输入流,然后生产一个输出流到一个或多个topic中去,在输入输出流中进行有效的转换。 -
The Connector API 允许构建并运行可重用的生产者或者消费者,将Kafka topics连接到已存在的应用程序或者数据系统。比如,连接到一个关系型数据库,捕捉表(table)的所有变更内容。
在本实验中只做功能性验证,只搭建一个单机版的 Kafka 环境。
下载并解压 Kafka
[root@r22 soft]# tar vxzf kafka_2.13-2.7.0.tgz
[root@r22 soft]# mv kafka_2.13-2.7.0 /opt/kafka
部署用于 Kafka 的 jre
[root@r22 soft]# tar vxzf jre1.8.0_281.tar.gz
[root@r22 soft]# mv jre1.8.0_281 /opt/kafka/jre
修改 Kafka 的 jre 环境变量
[root@r22 bin]# vim /opt/kafka/bin/kafka-run-class.sh
## add bellowing line in the head of kafka-run-class.sh
JAVA_HOME=/opt/kafka/jre
修改 Kafka 配置文件
修改 Kafka 配置文件 /opt/kafka/config/server.properties
## change bellowing variable in /opt/kafka/config/server.properties
broker.id=0
listeners=PLAINTEXT://192.168.12.22:9092
log.dirs=/opt/kafka/logs
zookeeper.connect=i192.168.12.24:2181
启动 Kafka
[root@r22 bin]# /opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
查看 Kafka 的版本信息
Kafka 并没有提供 --version 的 optional 来查看 Kafka 的版本信息。
[root@r22 ~]# ll /opt/kafka/libs/ | grep kafka
-rw-r--r-- 1 root root 4929521 Dec 16 09:02 kafka_2.13-2.7.0.jar
-rw-r--r-- 1 root root 821 Dec 16 09:03 kafka_2.13-2.7.0.jar.asc
-rw-r--r-- 1 root root 41793 Dec 16 09:02 kafka_2.13-2.7.0-javadoc.jar
-rw-r--r-- 1 root root 821 Dec 16 09:03 kafka_2.13-2.7.0-javadoc.jar.asc
-rw-r--r-- 1 root root 892036 Dec 16 09:02 kafka_2.13-2.7.0-sources.jar
-rw-r--r-- 1 root root 821 Dec 16 09:03 kafka_2.13-2.7.0-sources.jar.asc
... ...
其中 2.13 是 scale 的版本信息,2.7.0 是 Kafka 的版本信息。
部署 Flink
Apache Flink 是一个框架和分布式处理引擎,用于在无边界和有边界数据流上进行有状态的计算。Flink 能在所有常见集群环境中运行,并能以内存速度和任意规模进行计算。
支持高吞吐、低延迟、高性能的分布式处理框架Apache Flink是一个框架和分布式处理引擎,用于对无界和有界数据流进行有状态计算。Flink被设计在所有常见的集群环境中运行,以内存执行速度和任意规模来执行计算。
本实验只做功能性测试,仅部署单机 Flink 环境。
下载并分发 Flink
[root@r23 soft]# tar vxzf flink-1.12.1-bin-scala_2.11.tgz
[root@r23 soft]# mv flink-1.12.1 /opt/flink
部署 Flink 的 jre
[root@r23 soft]# tar vxzf jre1.8.0_281.tar.gz
[root@r23 soft]# mv jre1.8.0_281 /opt/flink/jre
添加 Flink 需要的 lib
Flink 消费 Kafka 数据,需要 flink-sql-connector-kafka 包
Flink 链接 MySQL/TiDB,需要 flink-connector-jdbc 包
[root@r23 soft]# mv flink-sql-connector-kafka_2.12-1.12.0.jar /opt/flink/lib/
[root@r23 soft]# mv flink-connector-jdbc_2.12-1.12.0.jar /opt/flink/lib/
修改 Flink 配置文件
## add or modify bellowing lines in /opt/flink/conf/flink-conf.yaml
jobmanager.rpc.address: 192.168.12.23
env.java.home: /opt/flink/jre
启动 Flink
[root@r23 ~]# /opt/flink/bin/start-cluster.sh
Starting cluster.
Starting standalonesession daemon on host r23.
Starting taskexecutor daemon on host r23.
查看 Flink GUI
配置数据流向
ticdc -> Kafka 通路
TiCDC 运行时是一种无状态节点,通过 PD 内部的 etcd 实现高可用。TiCDC 集群支持创建多个同步任务,向多个不同的下游进行数据同步。
TiCDC 的系统架构如下图所示:
TiCDC 的系统角色:
-
TiKV CDC 组件:只输出 key-value (KV) change log。
-
内部逻辑拼装 KV change log。 -
提供输出 KV change log 的接口,发送数据包括实时 change log 和增量扫的 change log。 -
Capture:TiCDC 运行进程,多个 capture 组成一个 TiCDC 集群,负责 KV change log 的同步。 -
每个 capture 负责拉取一部分 KV change log。
-
对拉取的一个或多个 KV change log 进行排序。 -
向下游还原事务或按照 TiCDC Open Protocol 进行输出。
创建一个 Kafka Topic
创建 Kafka Topic ticdc-test
[root@r22 ~]# /opt/kafka/bin/kafka-topics.sh --create \
> --zookeeper 192.168.12.24:2181 \
> --config max.message.bytes=12800000 \
> --config flush.messages=1 \
> --replication-factor 1 \
> --partitions 1 \
> --topic ticdc-test
Created topic ticdc-test.
查看 Kafka 中所有的 Topic
[root@r22 ~]# /opt/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.12.24:2181
ticdc-test
查看 Kafka 中 Topic ticdc-test 的信息
[root@r22 ~]# /opt/kafka/bin/kafka-topics.sh --describe --zookeeper 192.168.12.24:2181 --topic ticdc-test
Topic: ticdc-test PartitionCount: 1 ReplicationFactor: 1 Configs: max.message.bytes=12800000,flush.messages=1
Topic: ticdc-test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
在 TiCDC 中创建 Kafka 的changefeed
创建 changefeed 配置文件,打开 enable-old-value:
## create a changefeed configuration file
[root@r21 ~]# cat /opt/tidb-c1/cdc-18300/conf/cdc-changefeed-old-value-enabled.conf
enable-old-value=true
创建 Kafka 的 changefeed:
## create a changefeed for kafka
[root@r21 ~]# /opt/tidb-c1/cdc-18300/bin/cdc cli changefeed create \
> --pd=http://192.168.12.21:12379 \
> --sink-uri="kafka://192.168.12.22:9092/ticdc-test?kafka-version=2.7.0&partition-num=1&max-message-bytes=67108864&replication-factor=1&enable-old-value=true&protocol=canal-json" \
> --changefeed-id="ticdc-kafka" \
> --config=/opt/tidb-c1/cdc-18300/conf/cdc-changefeed-old-value-enabled.conf
Create changefeed successfully!
ID: ticdc-kafka
Info: {"sink-uri":"kafka://192.168.12.22:9092/ticdc-test?kafka-version=2.7.0\u0026artition-num=1\u0026max-message-bytes=67108864\u0026replication-factor=1\u0026enable-old-value=true\u0026protocol=canal-json","opts":{"max-message-bytes":"67108864"},"create-time":"2021-02-22T00:08:50.185073755-05:00","start-ts":423092690661933057,"target-ts":0,"admin-job-type":0,"sort-engine":"memory","sort-dir":".","config":{"case-sensitive":true,"enable-old-value":true,"force-replicate":false,"check-gc-safe-point":true,"filter":{"rules":["*.*"],"ignore-txn-start-ts":null,"ddl-allow-list":null},"mounter":{"worker-num":16},"sink":{"dispatchers":null,"protocol":"canal-json"},"cyclic-replication":{"enable":false,"replica-id":0,"filter-replica-ids":null,"id-buckets":0,"sync-ddl":false},"scheduler":{"type":"table-number","polling-time":-1}},"state":"normal","history":null,"error":null,"sync-point-enabled":false,"sync-point-interval":600000000000}
[root@r21 ~]# cat /opt/tidb-c1/cdc-18300/conf/cdc-changefeed-old-value-enabled.conf
其中 Kafka 的 sink url 参数配置如下:
查看已经创建的 changefeed:
[root@r21 ~]# /opt/tidb-c1/cdc-18300/bin/cdc cli changefeed --pd=http://192.168.12.21:12379 list
[
{
"id": "ticdc-kafka",
"summary": {
"state": "normal",
"tso": 423092789699936258,
"checkpoint": "2021-02-22 00:15:07.974",
"error": null
}
}
]
查看 ticdc-kafka changefeed 的信息:
[root@r21 ~]# /opt/tidb-c1/cdc-18300/bin/cdc cli changefeed --pd=http://192.168.12.21:12379 query -c ticdc-kafka
{
"info": {
"sink-uri": "kafka://192.168.12.22:9092/ticdc-test?kafka-version=2.7.0\u0026artition-num=1\u0026max-message-bytes=67108864\u0026replication-factor=1\u0026enable-old-value=true\u0026protocol=canal -json",
"opts": {
"max-message-bytes": "67108864"
},
"create-time": "2021-02-22T00:08:50.185073755-05:00",
"start-ts": 423092690661933057,
"target-ts": 0,
"admin-job-type": 0,
"sort-engine": "memory",
"sort-dir": ".",
"config": {
"case-sensitive": true,
"enable-old-value": true,
"force-replicate": false,
"check-gc-safe-point": true,
"filter": {
"rules": [
"*.*"
],
"ignore-txn-start-ts": null,
"ddl-allow-list": null
},
"mounter": {
"worker-num": 16
},
"sink": {
"dispatchers": null,
"protocol": "canal-json"
},
"cyclic-replication": {
"enable": false,
"replica-id": 0,
"filter-replica-ids": null,
"id-buckets": 0,
"sync-ddl": false
},
"scheduler": {
"type": "table-number",
"polling-time": -1
}
},
"state": "normal",
"history": null,
"error": null,
"sync-point-enabled": false,
"sync-point-interval": 600000000000
},
"status": {
"resolved-ts": 423093295690285057,
"checkpoint-ts": 423093295428403201,
"admin-job-type": 0
},
"count": 0,
"task-status": []
}
查看 Kafka 中 consumer 信息
在 TiCDC 中创建 Kafka 的 changefeed,将数据流向 Kafka 中的 ticdc-test topic 后,TiCDC -> Kafka 的通道就已经建立了。
插入一条数据用以测试:
mysql> insert into t1 values(1);
Query OK, 1 row affected (0.00 sec)
可以看到 TiCDC 的日志输出中有以下信息:
[2021/02/22 01:14:02.816 -05:00] [INFO] [statistics.go:118] ["sink replication status"] [name=MQ] [changefeed=ticdc-kafka] [captureaddr=192.168.12.21:18300] [count=1] [qps=0]
此时查看 Kafka 的 consumer 信息,可以看到数据已经过来了:
[root@r22 bin]# /opt/kafka/bin/kafka-console-consumer.sh --bootstrap-server 192.168.12.22:9092 --topic ticdc-test --from-beginning
{"id":0,"database":"test","table":"t1","pkNames":["id"],"isDdl":false,"type":"INSERT","es":1613974420325,"ts":0,"sql":"","sqlType":{"id":-5},"mysqlType":{"id":"int"},"data":[{"id":"1"}],"old":[null]}
Kafka -> Flink 通路
在 Flink 的 sql-client 中,创建 t1 表,connector 使用 kafka 类型:
[root@r23 ~]# /opt/flink/bin/sql-client.sh embedded
## create a test table t1 in
Flink SQL> create table t1(id int)
> WITH (
> 'connector' = 'kafka',
> 'topic' = 'ticdc-test',
> 'properties.bootstrap.servers' = '192.168.12.22:9092',
> 'properties.group.id' = 'cdc-test-consumer-group',
> 'format' = 'canal-json',
> 'scan.startup.mode' = 'latest-offset'
> );
Flink SQL> select * from t1;
在 TiDB 中插入数据,从 Flink 中进行查询:
## insert a test row in TiDB
mysql> insert into test.t1 values(4);
Query OK, 1 row affected (0.00 sec)
## check the result from Flink
SQL Query Result (Table)
Refresh: 1 s Page: Last of 1 Updated: 03:02:28.838
id
4
end
系列文章
点个赞+在看,少个 bug 👇