基于Flink1.14 + Iceberg0.13构建实时数据湖实战
Hi,我是王知无,一个大数据领域的原创作者。
放心关注我,获取更多行业的一手消息。
目录
-
Flink SQL Client配置Iceberg
-
Java/Scala pom.xml配置
-
Catalog
3.1 Hive Catalog
3.2 HDFS Catalog -
数据库和表相关DDL命令
4.1 创建数据库
4.2 创建表(不支持primary key等)
4.3 修改表
4.4 删除表 -
插入数据到表
5.1 insert into
5.2 insert overwrite(只有Batch模式支持,且overwrite粒度为partition) -
查询数据
暂时还不支持通过Flink SQL读取Iceberg表的元数据,可以通过Java API读取
1. Flink SQL Client配置Iceberg
Flink集群需要使用Scala 2.12版本的
-
将Iceberg的依赖包下载放到Flink集群所有服务器的lib目录下,然后重启Flink
[root@flink1 ~]# wget -P /root/flink-1.14.3/lib https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.14/0.13.0/iceberg-flink-runtime-1.14-0.13.0.jar
[root@flink1 ~]#
[root@flink1 ~]# scp /root/flink-1.14.3/lib/iceberg-flink-runtime-1.14-0.13.0.jar root@flink2:/root/flink-1.14.3/lib
iceberg-flink-runtime-1.14-0.13.0.jar 100% 23MB 42.9MB/s 00:00
[root@flink1 ~]# scp /root/flink-1.14.3/lib/iceberg-flink-runtime-1.14-0.13.0.jar root@flink3:/root/flink-1.14.3/lib
iceberg-flink-runtime-1.14-0.13.0.jar 100% 23MB 35.4MB/s 00:00
[root@flink1 ~]#
Iceberg默认支持Hadoop Catalog。如果需要使用Hive Catalog,需要将flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar放到Flink集群所有服务器的lib目录下,然后重启Flink
-
然后启动SQL Client就可以了
2. Java/Scala pom.xml配置
添加如下依赖
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink</artifactId>
<version>0.13.0</version>
<scope>provided</scope>
</dependency>
3. Catalog
3.1 Hive Catalog
注意:测试的时候,从Hive中查询表数据,查询不到。但是从Trino查询可以查询到数据
使用Hive的metastore保存元数据,HDFS保存数据库表的数据
Flink SQL> create catalog hive_catalog with(
> 'type'='iceberg',
> 'catalog-type'='hive',
> 'property-version'='1',
> 'cache-enabled'='true',
> 'uri'='thrift://hive1:9083',
> 'client'='5',
> 'warehouse'='hdfs://nnha/user/hive/warehouse',
> 'hive-conf-dir'='/root/flink-1.14.3/hive_conf'
> );
[INFO] Execute statement succeed.
Flink SQL>
-
property-version: 为了向后兼容,以防property格式改变。当前设置为1即可
-
cache-enabled: 是否开启catalog缓存,默认开启
-
clients: 在hive metastore中,hive_catalog供客户端访问的连接池大小,默认是2
-
warehouse: 是Flink集群所在的HDFS路径, hive_catalog下的数据库表存放数据的位置
-
hive-conf-dir: hive集群的配置目录。只能是Flink集群的本地路径,从hive-site.xml解析出来的HDFS路径,是Flink集群所在HDFS路径
-
warehouse的优先级比hive-conf-dir的优先级高
3.2 HDFS Catalog
用HDFS保存元数据和数据库表的数据。warehouse是Flink集群所在的HDFS路径
Flink SQL> create catalog hadoop_catalog with (
> 'type'='iceberg',
> 'catalog-type'='hadoop',
> 'property-version'='1',
> 'cache-enabled'='true',
> 'warehouse'='hdfs://nnha/user/iceberg/warehouse'
> );
[INFO] Execute statement succeed.
Flink SQL>
通过配置conf/sql-cli-defaults.yaml实现永久catalog。但测试的时候并未生效
[root@flink1 ~]# cat /root/flink-1.14.3/conf/sql-cli-defaults.yaml
catalogs:
- name: hadoop_catalog
type: iceberg
catalog-type: hadoop
property-version: 1
cache-enabled: true
warehouse: hdfs://nnha/user/iceberg/warehouse
[root@flink1 ~]#
[root@flink1 ~]# chown 501:games /root/flink-1.14.3/conf/sql-cli-defaults.yaml
下面我们重点以Hadoop Catalog为例,进行测试讲解
4. 数据库和表相关DDL命令
4.1 创建数据库
Catalog下面默认都有一个default数据库
Flink SQL> create database hadoop_catalog.iceberg_db;
[INFO] Execute statement succeed.
Flink SQL> use hadoop_catalog.iceberg_db;
[INFO] Execute statement succeed.
Flink SQL>
-
会在HDFS目录上创建iceberg_db子目录
-
如果删除数据库,会删除HDFS上的iceberg_db子目录
4.2 创建表(不支持primary key等)
Flink SQL> create table hadoop_catalog.iceberg_db.my_user (
> user_id bigint comment '用户ID',
> user_name string,
> birthday date,
> country string
> ) comment '用户表'
> partitioned by (birthday, country) with (
> 'write.format.default'='parquet',
> 'write.parquet.compression-codec'='gzip'
> );
[INFO] Execute statement succeed.
Flink SQL>
-
目前表不支持计算列、primay key, Watermark -
不支持计算分区。但是iceberg支持计算分区 -
创建表生成的文件信息如下:
[root@flink1 ~]#
[root@flink1 ~]# hadoop fs -ls hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata
Found 2 items
-rw-r--r-- 1 root supergroup 2115 2022-02-13 22:01 hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v1.metadata.json
-rw-r--r-- 1 root supergroup 1 2022-02-13 22:01 hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/version-hint.text
[root@flink1 ~]#
查看v1.metadata.json,可以看到"current-snapshot-id" : -1
Flink SQL> create table hadoop_catalog.iceberg_db.my_user_copy
> like hadoop_catalog.iceberg_db.my_user;
[INFO] Execute statement succeed.
Flink SQL>
-
复制的表拥有相同的表结构、分区、表属性
4.3 修改表
修改表属性
Flink SQL> alter table hadoop_catalog.iceberg_db.my_user_copy
> set(
> 'write.format.default'='avro',
> 'write.avro.compression-codec'='gzip'
> );
[INFO] Execute statement succeed.
Flink SQL>
-
目前Flink只支持修改iceberg的表属性
重命名表
Flink SQL> alter table hadoop_catalog.iceberg_db.my_user_copy
> rename to hadoop_catalog.iceberg_db.my_user_copy_new;
[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Cannot rename Hadoop tables
Flink SQL>
-
Hadoop Catalog中的表不支持重命名表
4.4 删除表
Flink SQL> drop table hadoop_catalog.iceberg_db.my_user_copy;
[INFO] Execute statement succeed.
Flink SQL>
-
会删除HDFS上的my_user_copy子目录
5. 插入数据到表
5.1 insert into
-
insert into … values … -
insert into … select …
Flink SQL> insert into hadoop_catalog.iceberg_db.my_user(
> user_id, user_name, birthday, country
> ) values(1, 'zhang_san', date '2022-02-01', 'china'),
> (2, 'li_si', date '2022-02-02', 'japan');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: f1aa8bee0be5bda8b166cc361e113268
Flink SQL>
Flink SQL> insert into hadoop_catalog.iceberg_db.my_user select (user_id + 1), user_name, birthday, country from hadoop_catalog.iceberg_db.my_user;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: c408e324ca3861b39176c6bd15770aca
Flink SQL>
HDFS目录结果如下
hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/data/birthday=2022-02-01/country=china/00000-0-4ef3835f-b18b-4c48-b47a-85af1771a10a-00001.parquet
hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/data/birthday=2022-02-01/country=china/00000-0-6e66c02b-cb09-4fd0-b669-15aa7f5194e4-00001.parquet
hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/data/birthday=2022-02-02/country=japan/00000-0-4ef3835f-b18b-4c48-b47a-85af1771a10a-00002.parquet
hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/data/birthday=2022-02-02/country=japan/00000-0-6e66c02b-cb09-4fd0-b669-15aa7f5194e4-00002.parquet
5.2 insert overwrite(只有Batch模式支持,且overwrite粒度为partition)
只支持Flink Batch模式,不支持Streaming模式
insert overwrite替换多个整个分区,而不是一行数据。如果不是分区表,则替换的是整个表,如下所示:
Flink SQL> set 'execution.runtime-mode' = 'batch';
[INFO] Session property has been set.
Flink SQL>
Flink SQL> insert overwrite hadoop_catalog.iceberg_db.my_user values (4, 'wang_wu', date '2022-02-02', 'japan');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 63cf6c27060ec9ebdce75b785cc3fa3a
Flink SQL> set 'sql-client.execution.result-mode' = 'tableau';
[INFO] Session property has been set.
Flink SQL> select * from hadoop_catalog.iceberg_db.my_user;
+---------+-----------+------------+---------+
| user_id | user_name | birthday | country |
+---------+-----------+------------+---------+
| 1 | zhang_san | 2022-02-01 | china |
| 4 | wang_wu | 2022-02-02 | japan |
| 2 | zhang_san | 2022-02-01 | china |
+---------+-----------+------------+---------+
3 rows in set
birthday=2022-02-02/country=japan分区下的数据如下,insert overwrite也是新增一个文件
birthday=2022-02-02/country=japan/00000-0-1d0ff907-60a7-4062-93a3-9b443626e383-00001.parquet
birthday=2022-02-02/country=japan/00000-0-4ef3835f-b18b-4c48-b47a-85af1771a10a-00002.parquet
birthday=2022-02-02/country=japan/00000-0-6e66c02b-cb09-4fd0-b669-15aa7f5194e4-00002.parquet
insert ovewrite … partition替换指定分区
Flink SQL> insert overwrite hadoop_catalog.iceberg_db.my_user partition (birthday = '2022-02-02', country = 'japan') select 5, 'zhao_liu';
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 97e9ba4131028c53461e739b34108ae0
Flink SQL> select * from hadoop_catalog.iceberg_db.my_user;
+---------+-----------+------------+---------+
| user_id | user_name | birthday | country |
+---------+-----------+------------+---------+
| 1 | zhang_san | 2022-02-01 | china |
| 5 | zhao_liu | 2022-02-02 | japan |
| 2 | zhang_san | 2022-02-01 | china |
+---------+-----------+------------+---------+
3 rows in set
Flink SQL>
6. 查询数据
Batch模式
Flink SQL> select * from hadoop_catalog.iceberg_db.my_user;
+---------+-----------+------------+---------+
| user_id | user_name | birthday | country |
+---------+-----------+------------+---------+
| 1 | zhang_san | 2022-02-01 | china |
| 5 | zhao_liu | 2022-02-02 | japan |
| 2 | zhang_san | 2022-02-01 | china |
+---------+-----------+------------+---------+
3 rows in set
Flink SQL>
streaming模式
查看最新的snapshot-id
[root@flink1 conf]# hadoop fs -cat hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/version-hint.text
5
我们前面创建表 + 两次insert + 两次insert overwrite,所以最新的版本号为5。然后我们查看该版本号对于的metadata json文件
[root@flink1 ~]# hadoop fs -cat hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v5.metadata.json
{
"format-version" : 1,
"table-uuid" : "84a5e90d-7ae9-4dfd-aeab-c74f07447513",
"location" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user",
"last-updated-ms" : 1644761481488,
"last-column-id" : 4,
"schema" : {
"type" : "struct",
"schema-id" : 0,
"fields" : [ {
"id" : 1,
"name" : "user_id",
"required" : false,
"type" : "long"
}, {
"id" : 2,
"name" : "user_name",
"required" : false,
"type" : "string"
}, {
"id" : 3,
"name" : "birthday",
"required" : false,
"type" : "date"
}, {
"id" : 4,
"name" : "country",
"required" : false,
"type" : "string"
} ]
},
"current-schema-id" : 0,
"schemas" : [ {
"type" : "struct",
"schema-id" : 0,
"fields" : [ {
"id" : 1,
"name" : "user_id",
"required" : false,
"type" : "long"
}, {
"id" : 2,
"name" : "user_name",
"required" : false,
"type" : "string"
}, {
"id" : 3,
"name" : "birthday",
"required" : false,
"type" : "date"
}, {
"id" : 4,
"name" : "country",
"required" : false,
"type" : "string"
} ]
} ],
"partition-spec" : [ {
"name" : "birthday",
"transform" : "identity",
"source-id" : 3,
"field-id" : 1000
}, {
"name" : "country",
"transform" : "identity",
"source-id" : 4,
"field-id" : 1001
} ],
"default-spec-id" : 0,
"partition-specs" : [ {
"spec-id" : 0,
"fields" : [ {
"name" : "birthday",
"transform" : "identity",
"source-id" : 3,
"field-id" : 1000
}, {
"name" : "country",
"transform" : "identity",
"source-id" : 4,
"field-id" : 1001
} ]
} ],
"last-partition-id" : 1001,
"default-sort-order-id" : 0,
"sort-orders" : [ {
"order-id" : 0,
"fields" : [ ]
} ],
"properties" : {
"write.format.default" : "parquet",
"write.parquet.compression-codec" : "gzip"
},
"current-snapshot-id" : 138573494821828246,
"snapshots" : [ {
"snapshot-id" : 8012517928892530314,
"timestamp-ms" : 1644761130111,
"summary" : {
"operation" : "append",
"flink.job-id" : "8f228ae49d34aafb4b2887db3149e3f6",
"flink.max-committed-checkpoint-id" : "9223372036854775807",
"added-data-files" : "2",
"added-records" : "2",
"added-files-size" : "2487",
"changed-partition-count" : "2",
"total-records" : "2",
"total-files-size" : "2487",
"total-data-files" : "2",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/snap-8012517928892530314-1-5c33451b-48ab-4ce5-be7a-2c2d2dc9e11d.avro",
"schema-id" : 0
}, {
"snapshot-id" : 453371561664052237,
"parent-snapshot-id" : 8012517928892530314,
"timestamp-ms" : 1644761150082,
"summary" : {
"operation" : "append",
"flink.job-id" : "813b7a17c21ddd003e1a210b1366e0c5",
"flink.max-committed-checkpoint-id" : "9223372036854775807",
"added-data-files" : "2",
"added-records" : "2",
"added-files-size" : "2487",
"changed-partition-count" : "2",
"total-records" : "4",
"total-files-size" : "4974",
"total-data-files" : "4",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/snap-453371561664052237-1-bc0e56ec-9f78-4956-8412-4d8ca70ccc19.avro",
"schema-id" : 0
}, {
"snapshot-id" : 6410282459040239217,
"parent-snapshot-id" : 453371561664052237,
"timestamp-ms" : 1644761403566,
"summary" : {
"operation" : "overwrite",
"replace-partitions" : "true",
"flink.job-id" : "f7085f68e5ff73c1c8aa1f4f59996068",
"flink.max-committed-checkpoint-id" : "9223372036854775807",
"added-data-files" : "1",
"deleted-data-files" : "2",
"added-records" : "1",
"deleted-records" : "2",
"added-files-size" : "1244",
"removed-files-size" : "2459",
"changed-partition-count" : "1",
"total-records" : "3",
"total-files-size" : "3759",
"total-data-files" : "3",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/snap-6410282459040239217-1-2b20c57e-5428-4483-9f7b-928b980dd50d.avro",
"schema-id" : 0
}, {
"snapshot-id" : 138573494821828246,
"parent-snapshot-id" : 6410282459040239217,
"timestamp-ms" : 1644761481488,
"summary" : {
"operation" : "overwrite",
"replace-partitions" : "true",
"flink.job-id" : "d434d6d4f658d61732d7e9a0a85279fc",
"flink.max-committed-checkpoint-id" : "9223372036854775807",
"added-data-files" : "1",
"deleted-data-files" : "1",
"added-records" : "1",
"deleted-records" : "1",
"added-files-size" : "1251",
"removed-files-size" : "1244",
"changed-partition-count" : "1",
"total-records" : "3",
"total-files-size" : "3766",
"total-data-files" : "3",
"total-delete-files" : "0",
"total-position-deletes" : "0",
"total-equality-deletes" : "0"
},
"manifest-list" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/snap-138573494821828246-1-b243b39e-7122-4571-b6fa-c902241e36a8.avro",
"schema-id" : 0
} ],
"snapshot-log" : [ {
"timestamp-ms" : 1644761130111,
"snapshot-id" : 8012517928892530314
}, {
"timestamp-ms" : 1644761150082,
"snapshot-id" : 453371561664052237
}, {
"timestamp-ms" : 1644761403566,
"snapshot-id" : 6410282459040239217
}, {
"timestamp-ms" : 1644761481488,
"snapshot-id" : 138573494821828246
} ],
"metadata-log" : [ {
"timestamp-ms" : 1644760911017,
"metadata-file" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v1.metadata.json"
}, {
"timestamp-ms" : 1644761130111,
"metadata-file" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v2.metadata.json"
}, {
"timestamp-ms" : 1644761150082,
"metadata-file" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v3.metadata.json"
}, {
"timestamp-ms" : 1644761403566,
"metadata-file" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v4.metadata.json"
} ]
}[root@flink1 ~]#
可以看到 "current-snapshot-id" : 138573494821828246,,表示当前的snapshot-id
Flink SQL> set 'execution.runtime-mode' = 'streaming';
[INFO] Session property has been set.
Flink SQL>
Flink SQL> select * from hadoop_catalog.iceberg_db.my_user
> /*+ options(
> 'streaming'='true',
> 'monitor-interval'='5s'
> )*/ ;
+----+----------------------+--------------------------------+------------+--------------------------------+
| op | user_id | user_name | birthday | country |
+----+----------------------+--------------------------------+------------+--------------------------------+
| +I | 5 | zhao_liu | 2022-02-02 | japan |
| +I | 2 | zhang_san | 2022-02-01 | china |
| +I | 1 | zhang_san | 2022-02-01 | china |
可以看到最新snapshot对应的数据
Flink SQL> select * from hadoop_catalog.iceberg_db.my_user
> /*+ options(
> 'streaming'='true',
> 'monitor-interval'='5s',
> 'start-snapshot-id'='138573494821828246'
> )*/ ;
+----+----------------------+--------------------------------+------------+--------------------------------+
| op | user_id | user_name | birthday | country |
+----+----------------------+--------------------------------+------------+--------------------------------+
这里只能指定最后一个insert overwrite操作的snapshot id,及其后面的snapshot id,否则后台会报异常,且程序一直处于restarting的状态:
java.lang.UnsupportedOperationException: Found overwrite operation, cannot support incremental data in snapshots (8012517928892530314, 138573494821828246]
在本示例中snapshot id: 138573494821828246,是最后一个snapshot id,同时也是最后一个insert overwrite操作的snapshot id。如果再insert两条数据,则只能看到增量的数据
Flink SQL> insert into hadoop_catalog.iceberg_db.my_user(
> user_id, user_name, birthday, country
> ) values(6, 'zhang_san', date '2022-02-01', 'china');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 8eb279e61aed66304d78ad027eaf8d30
Flink SQL> insert into hadoop_catalog.iceberg_db.my_user(
> user_id, user_name, birthday, country
> ) values(7, 'zhang_san', date '2022-02-01', 'china');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 70a050e455d188d0d3f3adc2ba367fb6
Flink SQL> select * from hadoop_catalog.iceberg_db.my_user
> /*+ options(
> 'streaming'='true',
> 'monitor-interval'='30s',
> 'start-snapshot-id'='138573494821828246'
> )*/ ;
+----+----------------------+--------------------------------+------------+--------------------------------+
| op | user_id | user_name | birthday | country |
+----+----------------------+--------------------------------+------------+--------------------------------+
| +I | 6 | zhang_san | 2022-02-01 | china |
| +I | 7 | zhang_san | 2022-02-01 | china |
-
streaming模式支持读取增量snapshot数据
-
如果不指定start-snapshot-id,则先读取当前snapshot全量数据,再读取增量数据。如果指定start-snapshot-id,读取该snapshot-id之后的增量数据,即不读取该snapshot-id的数据
-
monitor-interval:表示监控新提交的数据文件的时间间隔,默认1s
如果这个文章对你有帮助,不要忘记 「在看」 「点赞」 「收藏」 三连啊喂!