深度集成 Flink: Apache Iceberg 0.11.0 最新功能解读
实现了 Flink Streaming Reader,意味着我们可以通过 Flink 流作业增量地去拉取 Apache Iceberg 中新增数据。对 Apache Iceberg 这样流批统一的存储层来说,Apache Flink 是真正意义上第一个实现了流批读写 Iceberg 的计算引擎,这也标志着 Apache Flink 和 Apache Iceberg 在共同打造流批统一的数据湖架构上开启了新的篇章。
实现了 Flink Streaming/Batch Reader 的 limit pushdown 和 filter pushdown。
实现了 CDC 和 Upsert 事件通过 flink 计算引擎写入 Apache Iceberg,并在中等数据规模上完成了正确性验证。
在 Flink Iceberg Sink 中支持 write.distribution-mode=hash 的方式写入数据,这可以从生产源头上大量减少小文件。
MERGE INTO
DELETE FROM
ALTER TABLE ... ADD/DROP PARTITION
ALTER TABLE ... WRITE ORDERED BY
通过 Call <procedure> 方式来执行更多的数据管理操作,例如合并小文件、清理过期文件等。
引入 AWS module,完成和 AWS S3[2] 以及 Glue Catalog[3] 等云服务的集成;
集成流行的开源 catalog 服务 nessie[4]。
Apache Flink流式读取
流作业写入 Apache Iceberg 表;
批作业写入 Apache Iceberg 表;
批作业读取 Apache Iceberg 表;
-- Submit the flink job in streaming mode for current session.
SET execution.type = streaming ;
-- Enable this switch because streaming read SQL will provide few job options in flink SQL hint options.
SET table.dynamic-table-options.enabled=true;
-- Read all the records from the iceberg current snapshot, and then read incremental data starting from that snapshot.
SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;
-- Read all incremental data starting from the snapshot-id '3821550127947089987' (records from this snapshot will be excluded).
SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/ ;
Flink Source 的
Limit Pushdown 和 Filter Pushdown
SELECT * FROM sample LIMIT 10;
SELECT * FROM sample WHERE data = 'a';
SELECT * FROM sample WHERE data != 'a';
SELECT * FROM sample WHERE data >= 'a';
SELECT * FROM sample WHERE data <= 'a';
SELECT * FROM sample WHERE data < 'a';
SELECT * FROM sample WHERE data > 'a';
SELECT * FROM sample WHERE data = 'a' AND id = 1;
SELECT * FROM sample WHERE data = 'a' OR id = 1;
SELECT * FROM sample WHERE data IS NULL;
SELECT * FROM sample WHERE NOT (id = 1);
SELECT * FROM sample WHERE data LIKE 'aaa%';
对 CDC(例如 MySQL Binlog)
和 Upsert 事件的支持
-
用户希望把来自关系型数据库的 binlog 导入到 Apache Iceberg 数据湖中,提供近实时的数据分析能力。 -
希望把 Flink 流作业 AGG 产生的 upsert stream 导入到 Apache Iceberg 数据湖中,从而借助 Apache Iceberg 的存储能力和 Apache Flink 的分析能力,提供近实时的数据报表。
第一阶段,是指 Flink 可以顺利地把 CDC 和 Upsert 的数据成功写入到 Apache Iceberg,并能读取到一个正确的结果;
第二阶段,是指 Flink+Iceberg 能顺利通过较大数据量的稳定性测试和性能测试,保证整条链路的稳定性和性能,从而达到可以上生产的水准。
支持 write.distribution-mode=hash
方式写入 Apache Iceberg
CREATE TABLE sample (
id BIGINT,
data STRING
) PARTITIONED BY (data) WITH (
'write.distribution-mode'='hash'
);
在 Flink 1.11 版本暂时不支持通过 SQL 的方式创建 bucket,但我们可以通过 Java API 的方式将上述按照 data 字段 partition 之后的表添加 bucket。调用方式如下:
table.updateSpec()
.addField(Expressions.bucket("id", 32))
.commit();
总结
腾讯内部每天都有大量的日志数据通过 Flink 清洗处理后导入到 Iceberg,最大的表日新增几十 TB;
Netflix 则将公司内几乎所有的用户行为数据通过 Flink 流计算导入到 Iceberg,最终存储在 AWS S3 之上,相比 HDFS 的方式, Flink+Iceberg 帮助他们公司节省大量的存储成本;
同程艺龙也在 Flink+Iceberg 之上做了大量探索,之前几乎所有的分析数据都存储在 Hive 上,鉴于 Hive 在 ACID 和历史回溯等方面能力不足,他们调研了 Iceberg,发现 Iceberg 非常适合替换他们的 Hive 存储格式,又由于上层计算生态的良好对接,几乎所有的历史计算作业都不需要做改动,就能方便地切换 Hive 表到 Iceberg 之上。到目前为止同程艺龙已经完成了几十张 Hive 表到 Iceberg 表的迁移;
汽车之家也是成功在生产环境大量替换 Hive 表为 Iceberg 表的公司之一,同时他们也是最早采用社区版 Iceberg 做 CDC 和 Upsert 数据分析 PoC 的公司,也非常期待未来 0.12.0 对 CDC 和 Upsert 场景的更多优化。
-
Netflix、腾讯和 Apple 几家公司的贡献者主力推动 Spark+Iceberg 的集成,腾讯、Netflix 和 Apple 在 Apache Spark 社区有着多位 Spark PMC 和 Spark Committer,在 Spark 社区和 Iceberg 社区的影响力有目共睹。我个人乐观地判断,Apache Iceberg 和 Spark 的集成体验,未来有望比肩 Databricks delta 的商业版体验,大家可以期待下。
-
阿里巴巴 Flink 团队、Netflix 以及国内外庞大的 Flink 用户群在不断地推动 Flink+Iceberg 的集成,不再赘述;
-
AWS Presto 团队以及 Trino 团队则在不断推动着 Presto 和 Iceberg 的集成,AWS Presto 团队已经明确将 Iceberg 选型为他们的数据湖 table format。同时,也可以非常明显地看到,AWS 团队在 Iceberg 和 S3 以及 Glue 生态打通方面做的大量工作,Apache Iceberg 已经成为 AWS 数据湖生态中相当重要的一环。
-
Cloudera 已经明确地选型 Apache Iceberg 来构建他们的商业版数据湖。使用过 Hadoop 的同学一定不会对这家公司陌生,没错,这家公司就是 Hadoop 商业发行版做的最为出色的公司之一。未来,他们将基于 Apache Iceberg 推出公有云服务,将给用户带来完善的 Flink、Spark、Hive、Impala 数据湖集成体验。这里重点说一下 Apache Impala,Cloudera 在交互式分析场景下非常倚重自家开源的 Apache Impala(事实上,在大数据基准测试下 Impala 的性能表现的确要比 Presto 更好),Apache Iceberg 对存储层较为完美的抽象和对多样化计算引擎的包容,是成功打动 Cloudera 选型 Apache Iceberg 最核心的理由之一。
▼ 关注「Flink 中文社区」,获取更多技术干货 ▼