vlambda博客
学习文章列表

基于Flink CDC打通数据实时入湖

作者 | 数据社        责编 | 欧阳姝黎

在构建实时数仓的过程中,如何快速、正确的同步业务数据是最先面临的问题,本文主要讨论一下如何使用实时处理引擎 Flink 和数据湖 Apache Iceberg 两种技术,来解决业务数据实时入湖相关的问题。

基于Flink CDC打通数据实时入湖


基于Flink CDC打通数据实时入湖

Flink CDC介绍


CDC 全称是 Change Data Capture,捕获变更数据,是一个比较广泛的概念,只要是能够捕获所有数据的变化,比如数据库捕获完整的变更日志记录增、删、改等,都可以称为 CDC。该功能被广泛应用于数据同步、更新缓存、微服务间同步数据等场景,本文主要介绍基于 Flink CDC 在数据实时同步场景下的应用。

Flink 在 1.11 版本开始引入了 Flink CDC 功能,并且同时支持 Table & SQL 两种形式。Flink SQL CDC 是以 SQL 的形式编写实时任务,并对 CDC 数据进行实时解析同步。相比于传统的数据同步方案,该方案在实时性、易用性等方面有了极大的改善。下图是基于 Flink SQL CDC 的数据同步方案的示意图。

Oracle 的变更日志的采集有多种方案,这里采用的 Debezium 实时同步工具作为示例,该工具能够解析 Oracle 的 changlog 数据,并实时同步数据到下游 Kafka。Flink SQL 通过创建 Kafka 映射表并指定 format 格式为 debezium-json,然后通过 Flink 进行解析后直接插入到其他外部数据存储系统,例如图中外部数据源以 Apache Iceberg 为例。

基于Flink CDC打通数据实时入湖

下面详细解析一下数据同步过程。首先了解一下 Debezium 抽取的 Oracle 的 change log 的格式,以 update 为例,变更日志上记录了更新之前的数据和更新以后的数据,在 Kafka 下游的 Flink 接受到这样的数据以后,一条 update 操作记录就转变为了先 delete、后 insert 两条记录。日志格式如下所示,该 update 操作的内容的 name 字段从 tom 更新为了 jerry。

{
  "before": {          --更新之前的数据
    "id": 001,
    "name""tom"
  },
  "after": {           --更新之后的数据
    "id": 001,
    "name""jerry"
  },
  "source": {...},
  "op""u",
  "ts_ms": 1589362330904,
  "transaction": null
}

其次再来看一下 Flink SQL 内部是如何处理 update 记录的。Flink 在 1.11 版本支持了完整的 changelog 机制,对于每条数据本身只要是携带了相应增、删、改的标志,Flink 就能识别这些数据,并对结果表做出相应的增、删、改的动作,如下图所示 changlog 数据流经过 Flink 解析,同步到下游 Sink Database。

基于Flink CDC打通数据实时入湖

通过以上分析,基于Flink SQL CDC的数据同步有如下优点:

  • 业务解耦:无需入侵业务,和业务完全解耦,也就是业务端无感知数据同步的存在。

  • 性能消耗:业务数据库性能消耗小,数据同步延迟低。

  • 同步易用:使用SQL方式执行CDC同步任务,极大的降低使用维护门槛。

  • 数据完整:完整的数据库变更记录,不会丢失任何记录,Flink 自身支持 Exactly Once。


基于Flink CDC打通数据实时入湖

Apache Iceberg介绍

通常认为数据湖是一种支持存储多种原始数据格式、多种计算引擎、高效的元数据统一管理和海量统一数据存储。其中以 Apache Iceberg 为代表的表格式和 Flink 计算引擎组成的数据湖解决方案尤为亮眼。Flink 社区方面也主动拥抱数据湖技术,当前 Flink 和 Iceberg 在数据入湖方面的集成度最高。

那么 Apache Iceberg 是什么呢?引用官网的定义是:Apache Iceberg is an open table format for huge analytic datasets。也就是 Apache Iceberg 是一个大规模数据分析的开放表格式。

基于Flink CDC打通数据实时入湖

Iceberg 的表格式设计具有如下特点:

  • ACID: 不会读到不完整的 commit 数据,基于乐观锁实现,支持并发 commit,支持 Row-level delete,支持 upsert 操作。
  • 增量快照: Commit 后的数据即可见,在 Flink 实时入湖场景下,数据可见根据 checkpoint 的时间间隔来确定的,增量形式也可回溯历史快照。
  • 开放的表格式: 对于一个真正的开放表格式,支持多种数据存储格式,如:parquet、orc、avro等,支持多种计算引擎,如:Spark、Flink、Hive、Trino/Presto。
  • 流批接口支持: 支持流式写入、批量写入,支持流式读取、批量读取。下文的测试中,主要测试了流式写入和批量读取的功能。


基于Flink CDC打通数据实时入湖

Flink CDC打通数据实时导入Iceberg实践

当前使用 Flink 最新版本1.12,支持 CDC 功能和更好的流批一体。Apache Iceberg 最新版本 0.11 已经支持 Flink API 方式 upsert,如果使用编写框架代码的方式使用该功能,无异于镜花水月,可望而不可及。本着 SQL 就是生产力的初衷,该测试使用最新 Iceberg 的 master 分支代码编译尝鲜,并对源码稍做修改,达到支持使用 Flink SQL 方式 upsert。

先来了解一下什么是 Row-Level Delete?该功能是指根据一个条件从一个数据集里面删除指定行。那么为什么这个功能那么重要呢?众所周知,大数据中的行级删除不同于传统数据库的更新和删除功能,在基于 HDFS 架构的文件系统上数据存储只支持数据的追加,为了在该构架下支持更新删除功能,删除操作演变成了一种标记删除,更新操作则是转变为先标记删除、后插入一条新数据。具体实现方式可以分为Copy on Write(COW)模式和Merge on Read(MOR)模式,其中 Copy on Write 模式可以保证下游的数据读具有最大的性能,而 Merge on Read 模式保证上游数据插入、更新、和删除的性能,减少传统 Copy on Write 模式下写放大问题。

在 Apache Iceberg 中目前实现的是基于 Merge on Read 模式实现的 Row-Level Delete。在 Iceberg 中 MOR 相关的功能是在 Iceberg Table Spec Version 2: Row-level Deletes 中进行实现的,V1 是没有相关实现的。虽然当前 Apache Iceberg 0.11 版本不支持 Flink SQL 方式进行 Row-Level Delete,但为了方便测试,通过对源码的修改支持 Flink SQL 方式。在不远的未来,Apache Iceberg 0.12 版本将会对 Row-Level Delete 进行性能和稳定性的加强。

Flink SQL CDC 和 Apache Iceberg 的架构设计和整合如何巧妙,不能局限于纸上谈兵,下面就实际操作一下,体验其功能的强大和带来的便捷。并且顺便体验一番流批一体,下面的离线查询和实时 upsert 入湖等均使用 Flink SQL 完成。

1,数据入湖环境准备

以 Flink SQL CDC 方式将实时数据导入数据湖的环境准备非常简单直观,因为 Flink 支持流批一体功能,所以实时导入数据湖的数据,也可以使用 Flink SQL离线或实时进行查询。如下测试是使用Flink提供的 sql-client 完成的:

第一步,新建Kafka映射表,用于实时接收 Topic 中的 changlog 数据:

  id STRING,
name STRING
) WITH (
'connector' = 'kafka',
'topic' = 'topic_name',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'debezium-json'
);

第二步,新建 iceberg 结果表,用于存储实时入湖的数据:

CREATE TABLE iceberg_catalog.default.IcebergTable ( id STRING, name STRING );

注:

a)其中省略了配置 catalog 过程

b)当前 iceberg 0.11 默认创建表格式版本 V1,通过代码更改版本为 V2,以支持 upsert 方式导入数据湖

第三步,启动 upsert 方式实时入湖的 Flink 任务:

SET table.dynamic-table-options.enabled=true;

INSERT INTO IcebergTable /*+OPTIONS('equality-field-columns'='id')*/ SELECT * FROM KafkaTable;

注:当前iceberg 0.11不支持Flink SQL形式upsert,通过修改源码达到支持配置指定字段更新功能。

第四步,离线或者实时查询统计IcebergTable表中的数据行数:

a)离线方式

SET execution.type=batch;
SELECT COUNT(*) FROM IcebergTable;

b)实时方式

SET execution.type=streaming;
SELECT COUNT(*) FROM IcebergTable;

2,数据入湖速度测试

数据入湖速度测试会根据环境配置、参数配置、数据格式等不同有所不同,下面是列出主要配置和测试出的数据作为参考。

a)资源配置情况

TaskManager 内存4G,slot:1
Checkpoint 1分钟
测试数据列数 10列
测试数据行数 1000万
iceberg存储格式 parquet

b)测试数据情况

数据入湖分为 append 和 upsert 两种方式。append 方式只能新增数据,不能对结果数据进行更新操作;upsert 方式即能够对结果数据更新。

append 方式使用场景是导入数据之前已经明确该表数据不需要更新,如离线数据导入数据湖的场景,append 方式下导入数据速度如下:

INSERT INTO IcebergTable SELECT * FROM KafkaTable;

并行度1 12.2万/秒
并行度2 19.6万/秒
并行度4 28.3万/秒

update 方式使用场景是既有插入的数据又有对之前插入数据的更新的场景,如数据库实时同步,upsert 方式下导入数据速度,该方式需要指定在更新时以那个字段查找,类似于 update 语句中的 where 条件,一般设置为表的主键即可,如下:

INSERT INTO IcebergTable /*+OPTIONS('equality-field-columns'='id')*/ SELECT * FROM KafkaTable;

导入的数据 只有数据插入 只有数据更新
并行度1 3.2万/秒 2.9万/秒
并行度2 4.9万/秒 4.2万/秒
并行度4 6.1万/秒 5.1万/秒

c)结论

  • append 方式导入速度远大于 upsert 导入数据速度。在使用的时候,如没有更新数据的场景时,则不需要 upsert 方式导入数据。
  • 导入速度随着并行度的增加而增加。
  • upsert 方式数据的插入和更新速度相差不大,主要得益于 MOR 原因。

3,数据入湖任务运维

在实际使用过程中,默认配置下是不能够长期稳定的运行的,一个实时数据导入 iceberg 表的任务,需要通过至少下述四点进行维护,才能使Iceberg表的入湖和查询性能保持稳定。

a)压缩小文件
Flink从Kafka 消费的数据以 checkpoint 方式提交到 Iceberg 表,数据文件使用的是 parquet 格式,这种格式无法追加,而流式数据又不能等候太长时间,所以会不断 commit 提交数据产生小文件。目前 Iceberg 提供了一个批任务 action 来压缩小文件,需要定期周期性调用进行小文件的压缩功能。示例代码如下:
Table table = ... 
Actions.forTable(table)
.rewriteDataFiles()
    .targetSizeInBytes(100 * 1024 * 1024) // 100 MB
    .execute();
b)快照过期处理
iceberg 本身的架构设计决定了,对于实时入湖场景,会产生大量的 snapshot 文件,快照过期策略是通过额外的定时任务周期执行,过期 snapshot 文件和过期数据文件均会被删除。如果实际使用场景不需要 time travel 功能,则可以保留较少的 snapshot 文件。
Table table = ... 
Actions.forTable(table)
    .expireSnapshots()
.expireOlderThan(System.currentTimeMillis())
.retainLast(5)
    .execute();
c)清理orphan文件
orphan 文件的产生是由于正常或者异常的数据写入但是未提交导致的,长时间积累会产生大量脱离元数据的孤立数据文件,所以也需要类似 JVM 的垃圾回收一样,周期性清理这些文件。该功能不需要频繁运行,设置为3-5天运行一次即可。
Table table = ...
Actions.forTable(table)
    .removeOrphanFiles()
    .execute();
d)删除元数据文件
  • 每次提交 snapshot 均会自动产生一个新的 metadata 文件,实时数据入库会频繁的产生大量 metadata 文件,需要通过如下配置达到自动删除 metadata 文件的效果。
Property Description
write.metadata.delete-after-commit.enabled Whether to delete old metadata files after each table commit
write.metadata.previous-versions-max The number of old metadata files to keep

4,数据入湖问题讨论

这里主要讨论数据一致性和顺序性问题。

Q1: 程序 BUG 或者任务重启等导致数据传输中断,如何保证数据的一致性呢?

Answer:数据一致保证通过两个方面实现,借助 Flink 实现的 exactly once 语义和故障恢复能力,实现数据严格一致性。借助 Iceberg ACID 能力来隔离写入对分析任务的不利影响。

Q2:数据入湖否可保证全局顺序性插入和更新?

Answer:不可以全局保证数据生产和数据消费的顺序性,但是可以保证同一条数据的插入和更新的顺序性。首先数据抽取的时候是单线程的,然后分发到 Kafka 的各个 partition 中,此时同一个 key 的变更数据打入到同一个 Kafka 的分区里面,Flink 读取的时候也能保证顺序性消费每个分区中的数据,进而保证同一个key的插入和更新的顺序性。


基于Flink CDC打通数据实时入湖

未来规划


新的技术最终是要落地才能发挥其内在价值的,针对在实践应用中面临的纷繁复杂的数据,结合流计算技术 Flink、表格式 Iceberg,未来落地规划主要集中在两个方面,一是Iceberg集成到本行的实时计算平台中,解决易用性的问题;二是基于Iceberg,构建准实时数仓进行探索和落地。

1,整合Iceberg到实时计算平台

目前,我所负责的实时计算平台是一个基于 SQL 的高性能实时大数据处理平台,该平台彻底规避繁重的底层流计算处理逻辑、繁琐的提交过程等,为用户打造一个只需关注实时计算逻辑的平台,助力企业向实时化、智能化大数据转型。

基于Flink CDC打通数据实时入湖

实时计算平台未来将会整合 Apache Iceberg 数据源,用户可以在界面配置 Flink SQL 任务,该任务以 upsert 方式实时解析 changlog 并导入到数据湖中。并增加小文件监控、定时任务压缩小文件、清理过期数据等功能。

2,准实时数仓探索

本文对数据实时入湖从原理和实战做了比较多的阐述,在完成实时数据入湖SQL化的功能以后,入湖后的数据有哪些场景的使用呢?下一个目标当然是入湖的数据分析实时化。比较多的讨论是关于实时数据湖的探索,结合所在企业数据特点探索适合落地的实时数据分析场景成为当务之急。

基于Flink CDC打通数据实时入湖

随着数据量的持续增大,和业务对时效性的严苛要求,基于 Apache Flink 和Apache Iceberg 构建准实时数仓愈发重要和迫切,作为实时数仓的两大核心组件,可以缩短数据导入、方便数据行级变更、支持数据流式读取等。

基于Flink CDC打通数据实时入湖

基于Flink CDC打通数据实时入湖