vlambda博客
学习文章列表

基于Flink1.14 + Iceberg0.13构建实时数据湖实战

点击上方蓝色字体,选择“设为星标”
回复"面试"获取更多惊喜
基于Flink1.14 + Iceberg0.13构建实时数据湖实战

Hi,我是王知无,一个大数据领域的原创作者。
放心关注我,获取更多行业的一手消息。

目录

  1. Flink SQL Client配置Iceberg

  2. Java/Scala pom.xml配置

  3. Catalog
    3.1 Hive Catalog
    3.2 HDFS Catalog

  4. 数据库和表相关DDL命令
    4.1 创建数据库
    4.2 创建表(不支持primary key等)
    4.3 修改表
    4.4 删除表

  5. 插入数据到表
    5.1 insert into
    5.2 insert overwrite(只有Batch模式支持,且overwrite粒度为partition)

  6. 查询数据
    暂时还不支持通过Flink SQL读取Iceberg表的元数据,可以通过Java API读取

1. Flink SQL Client配置Iceberg

Flink集群需要使用Scala 2.12版本的

  1. 将Iceberg的依赖包下载放到Flink集群所有服务器的lib目录下,然后重启Flink
[root@flink1 ~]# wget -P /root/flink-1.14.3/lib https://repo.maven.apache.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.14/0.13.0/iceberg-flink-runtime-1.14-0.13.0.jar
[root@flink1 ~]#
[root@flink1 ~]# scp /root/flink-1.14.3/lib/iceberg-flink-runtime-1.14-0.13.0.jar root@flink2:/root/flink-1.14.3/lib

iceberg-flink-runtime-1.14-0.13.0.jar                                                                                                    100%   23MB  42.9MB/s   00:00    
[root@flink1 ~]# scp /root/flink-1.14.3/lib/iceberg-flink-runtime-1.14-0.13.0.jar root@flink3:/root/flink-1.14.3/lib
iceberg-flink-runtime-1.14-0.13.0.jar                                                                                                    100%   23MB  35.4MB/s   00:00    
[root@flink1 ~]

Iceberg默认支持Hadoop Catalog。如果需要使用Hive Catalog,需要将flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar放到Flink集群所有服务器的lib目录下,然后重启Flink

  1. 然后启动SQL Client就可以了

2. Java/Scala pom.xml配置

添加如下依赖

<dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-flink</artifactId>
            <version>0.13.0</version>
            <scope>provided</scope>
        </dependency>

3. Catalog

3.1 Hive Catalog

注意:测试的时候,从Hive中查询表数据,查询不到。但是从Trino查询可以查询到数据

使用Hive的metastore保存元数据,HDFS保存数据库表的数据

Flink SQL> create catalog hive_catalog with(
'type'='iceberg',
'catalog-type'='hive',
'property-version'='1',
'cache-enabled'='true',
'uri'='thrift://hive1:9083',
'client'='5',
'warehouse'='hdfs://nnha/user/hive/warehouse',
'hive-conf-dir'='/root/flink-1.14.3/hive_conf'
> );
[INFO] Execute statement succeed.

Flink SQL>
  • property-version: 为了向后兼容,以防property格式改变。当前设置为1即可

  • cache-enabled: 是否开启catalog缓存,默认开启

  • clients: 在hive metastore中,hive_catalog供客户端访问的连接池大小,默认是2

  • warehouse: 是Flink集群所在的HDFS路径, hive_catalog下的数据库表存放数据的位置

  • hive-conf-dir: hive集群的配置目录。只能是Flink集群的本地路径,从hive-site.xml解析出来的HDFS路径,是Flink集群所在HDFS路径

  • warehouse的优先级比hive-conf-dir的优先级高

3.2 HDFS Catalog

用HDFS保存元数据和数据库表的数据。warehouse是Flink集群所在的HDFS路径

Flink SQL> create catalog hadoop_catalog with (
'type'='iceberg',
'catalog-type'='hadoop',
'property-version'='1',
'cache-enabled'='true',
'warehouse'='hdfs://nnha/user/iceberg/warehouse'
> );
[INFO] Execute statement succeed.

Flink SQL>

通过配置conf/sql-cli-defaults.yaml实现永久catalog。但测试的时候并未生效

[root@flink1 ~]# cat /root/flink-1.14.3/conf/sql-cli-defaults.yaml 
catalogs:
  - name: hadoop_catalog
    type: iceberg
    catalog-type: hadoop
    property-version: 1
    cache-enabled: true
    warehouse: hdfs://nnha/user/iceberg/warehouse

[root@flink1 ~]#
[root@flink1 ~]# chown 501:games /root/flink-1.14.3/conf/sql-cli-defaults.yaml

下面我们重点以Hadoop Catalog为例,进行测试讲解

4. 数据库和表相关DDL命令

4.1 创建数据库

Catalog下面默认都有一个default数据库

Flink SQL> create database hadoop_catalog.iceberg_db;
[INFO] Execute statement succeed.

Flink SQL> use hadoop_catalog.iceberg_db;
[INFO] Execute statement succeed.

Flink SQL>
  • 会在HDFS目录上创建iceberg_db子目录

  • 如果删除数据库,会删除HDFS上的iceberg_db子目录

4.2 创建表(不支持primary key等)

Flink SQL> create table hadoop_catalog.iceberg_db.my_user (
> user_id bigint comment '用户ID',
> user_name string,
> birthday date,
> country string
> ) comment '用户表' 
> partitioned by (birthday, country) with (
'write.format.default'='parquet',
'write.parquet.compression-codec'='gzip'
> );
[INFO] Execute statement succeed.

Flink SQL>
  • 目前表不支持计算列、primay key, Watermark
  • 不支持计算分区。但是iceberg支持计算分区
  • 创建表生成的文件信息如下:
[root@flink1 ~]
[root@flink1 ~]# hadoop fs -ls hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata
Found 2 items
-rw-r--r--   1 root supergroup       2115 2022-02-13 22:01 hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v1.metadata.json
-rw-r--r--   1 root supergroup          1 2022-02-13 22:01 hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/version-hint.text
[root@flink1 ~]#

查看v1.metadata.json,可以看到"current-snapshot-id" : -1

Flink SQL> create table hadoop_catalog.iceberg_db.my_user_copy 
> like hadoop_catalog.iceberg_db.my_user;
[INFO] Execute statement succeed.

Flink SQL> 
  • 复制的表拥有相同的表结构、分区、表属性

4.3 修改表

修改表属性

Flink SQL> alter table hadoop_catalog.iceberg_db.my_user_copy 
set(
'write.format.default'='avro',
'write.avro.compression-codec'='gzip'
> );
[INFO] Execute statement succeed.

Flink SQL>
  • 目前Flink只支持修改iceberg的表属性

重命名表

Flink SQL> alter table hadoop_catalog.iceberg_db.my_user_copy 
> rename to hadoop_catalog.iceberg_db.my_user_copy_new;
[ERROR] Could not execute SQL statement. Reason:
java.lang.UnsupportedOperationException: Cannot rename Hadoop tables

Flink SQL>
  • Hadoop Catalog中的表不支持重命名表

4.4 删除表

Flink SQL> drop table hadoop_catalog.iceberg_db.my_user_copy;
[INFO] Execute statement succeed.

Flink SQL>
  • 会删除HDFS上的my_user_copy子目录

5. 插入数据到表

5.1 insert into

  1. insert into … values …
  2. insert into … select …
Flink SQL> insert into hadoop_catalog.iceberg_db.my_user(
> user_id, user_name, birthday, country
> ) values(1, 'zhang_san', date '2022-02-01''china'), 
> (2, 'li_si', date '2022-02-02''japan');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: f1aa8bee0be5bda8b166cc361e113268


Flink SQL>
Flink SQL> insert into hadoop_catalog.iceberg_db.my_user select (user_id + 1), user_name, birthday, country from hadoop_catalog.iceberg_db.my_user;
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: c408e324ca3861b39176c6bd15770aca


Flink SQL>

HDFS目录结果如下

hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/data/birthday=2022-02-01/country=china/00000-0-4ef3835f-b18b-4c48-b47a-85af1771a10a-00001.parquet
hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/data/birthday=2022-02-01/country=china/00000-0-6e66c02b-cb09-4fd0-b669-15aa7f5194e4-00001.parquet
hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/data/birthday=2022-02-02/country=japan/00000-0-4ef3835f-b18b-4c48-b47a-85af1771a10a-00002.parquet
hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/data/birthday=2022-02-02/country=japan/00000-0-6e66c02b-cb09-4fd0-b669-15aa7f5194e4-00002.parquet

5.2 insert overwrite(只有Batch模式支持,且overwrite粒度为partition)

只支持Flink Batch模式,不支持Streaming模式

insert overwrite替换多个整个分区,而不是一行数据。如果不是分区表,则替换的是整个表,如下所示:

Flink SQL> set 'execution.runtime-mode' = 'batch';
[INFO] Session property has been set.

Flink SQL>
Flink SQL> insert overwrite hadoop_catalog.iceberg_db.my_user values (4, 'wang_wu', date '2022-02-02''japan');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 63cf6c27060ec9ebdce75b785cc3fa3a

Flink SQL> set 'sql-client.execution.result-mode' = 'tableau';
[INFO] Session property has been set.

Flink SQL> select * from hadoop_catalog.iceberg_db.my_user;
+---------+-----------+------------+---------+
| user_id | user_name |   birthday | country |
+---------+-----------+------------+---------+
|       1 | zhang_san | 2022-02-01 |   china |
|       4 |   wang_wu | 2022-02-02 |   japan |
|       2 | zhang_san | 2022-02-01 |   china |
+---------+-----------+------------+---------+
3 rows in set

birthday=2022-02-02/country=japan分区下的数据如下,insert overwrite也是新增一个文件

birthday=2022-02-02/country=japan/00000-0-1d0ff907-60a7-4062-93a3-9b443626e383-00001.parquet
birthday=2022-02-02/country=japan/00000-0-4ef3835f-b18b-4c48-b47a-85af1771a10a-00002.parquet
birthday=2022-02-02/country=japan/00000-0-6e66c02b-cb09-4fd0-b669-15aa7f5194e4-00002.parquet

insert ovewrite … partition替换指定分区

Flink SQL> insert overwrite hadoop_catalog.iceberg_db.my_user partition (birthday = '2022-02-02', country = 'japan') select 5, 'zhao_liu';
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 97e9ba4131028c53461e739b34108ae0


Flink SQL> select * from hadoop_catalog.iceberg_db.my_user;
+---------+-----------+------------+---------+
| user_id | user_name |   birthday | country |
+---------+-----------+------------+---------+
|       1 | zhang_san | 2022-02-01 |   china |
|       5 |  zhao_liu | 2022-02-02 |   japan |
|       2 | zhang_san | 2022-02-01 |   china |
+---------+-----------+------------+---------+
3 rows in set

Flink SQL>

6. 查询数据

Batch模式

Flink SQL> select * from hadoop_catalog.iceberg_db.my_user;
+---------+-----------+------------+---------+
| user_id | user_name |   birthday | country |
+---------+-----------+------------+---------+
|       1 | zhang_san | 2022-02-01 |   china |
|       5 |  zhao_liu | 2022-02-02 |   japan |
|       2 | zhang_san | 2022-02-01 |   china |
+---------+-----------+------------+---------+
3 rows in set

Flink SQL>

streaming模式

查看最新的snapshot-id

[root@flink1 conf]# hadoop fs -cat hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/version-hint.text
5

我们前面创建表 + 两次insert + 两次insert overwrite,所以最新的版本号为5。然后我们查看该版本号对于的metadata json文件

[root@flink1 ~]# hadoop fs -cat hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v5.metadata.json
{
  "format-version" : 1,
  "table-uuid" : "84a5e90d-7ae9-4dfd-aeab-c74f07447513",
  "location" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user",
  "last-updated-ms" : 1644761481488,
  "last-column-id" : 4,
  "schema" : {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "user_id",
      "required" : false,
      "type" : "long"
    }, {
      "id" : 2,
      "name" : "user_name",
      "required" : false,
      "type" : "string"
    }, {
      "id" : 3,
      "name" : "birthday",
      "required" : false,
      "type" : "date"
    }, {
      "id" : 4,
      "name" : "country",
      "required" : false,
      "type" : "string"
    } ]
  },
  "current-schema-id" : 0,
  "schemas" : [ {
    "type" : "struct",
    "schema-id" : 0,
    "fields" : [ {
      "id" : 1,
      "name" : "user_id",
      "required" : false,
      "type" : "long"
    }, {
      "id" : 2,
      "name" : "user_name",
      "required" : false,
      "type" : "string"
    }, {
      "id" : 3,
      "name" : "birthday",
      "required" : false,
      "type" : "date"
    }, {
      "id" : 4,
      "name" : "country",
      "required" : false,
      "type" : "string"
    } ]
  } ],
  "partition-spec" : [ {
    "name" : "birthday",
    "transform" : "identity",
    "source-id" : 3,
    "field-id" : 1000
  }, {
    "name" : "country",
    "transform" : "identity",
    "source-id" : 4,
    "field-id" : 1001
  } ],
  "default-spec-id" : 0,
  "partition-specs" : [ {
    "spec-id" : 0,
    "fields" : [ {
      "name" : "birthday",
      "transform" : "identity",
      "source-id" : 3,
      "field-id" : 1000
    }, {
      "name" : "country",
      "transform" : "identity",
      "source-id" : 4,
      "field-id" : 1001
    } ]
  } ],
  "last-partition-id" : 1001,
  "default-sort-order-id" : 0,
  "sort-orders" : [ {
    "order-id" : 0,
    "fields" : [ ]
  } ],
  "properties" : {
    "write.format.default" : "parquet",
    "write.parquet.compression-codec" : "gzip"
  },
  "current-snapshot-id" : 138573494821828246,
  "snapshots" : [ {
    "snapshot-id" : 8012517928892530314,
    "timestamp-ms" : 1644761130111,
    "summary" : {
      "operation" : "append",
      "flink.job-id" : "8f228ae49d34aafb4b2887db3149e3f6",
      "flink.max-committed-checkpoint-id" : "9223372036854775807",
      "added-data-files" : "2",
      "added-records" : "2",
      "added-files-size" : "2487",
      "changed-partition-count" : "2",
      "total-records" : "2",
      "total-files-size" : "2487",
      "total-data-files" : "2",
      "total-delete-files" : "0",
      "total-position-deletes" : "0",
      "total-equality-deletes" : "0"
    },
    "manifest-list" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/snap-8012517928892530314-1-5c33451b-48ab-4ce5-be7a-2c2d2dc9e11d.avro",
    "schema-id" : 0
  }, {
    "snapshot-id" : 453371561664052237,
    "parent-snapshot-id" : 8012517928892530314,
    "timestamp-ms" : 1644761150082,
    "summary" : {
      "operation" : "append",
      "flink.job-id" : "813b7a17c21ddd003e1a210b1366e0c5",
      "flink.max-committed-checkpoint-id" : "9223372036854775807",
      "added-data-files" : "2",
      "added-records" : "2",
      "added-files-size" : "2487",
      "changed-partition-count" : "2",
      "total-records" : "4",
      "total-files-size" : "4974",
      "total-data-files" : "4",
      "total-delete-files" : "0",
      "total-position-deletes" : "0",
      "total-equality-deletes" : "0"
    },
    "manifest-list" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/snap-453371561664052237-1-bc0e56ec-9f78-4956-8412-4d8ca70ccc19.avro",
    "schema-id" : 0
  }, {
    "snapshot-id" : 6410282459040239217,
    "parent-snapshot-id" : 453371561664052237,
    "timestamp-ms" : 1644761403566,
    "summary" : {
      "operation" : "overwrite",
      "replace-partitions" : "true",
      "flink.job-id" : "f7085f68e5ff73c1c8aa1f4f59996068",
      "flink.max-committed-checkpoint-id" : "9223372036854775807",
      "added-data-files" : "1",
      "deleted-data-files" : "2",
      "added-records" : "1",
      "deleted-records" : "2",
      "added-files-size" : "1244",
      "removed-files-size" : "2459",
      "changed-partition-count" : "1",
      "total-records" : "3",
      "total-files-size" : "3759",
      "total-data-files" : "3",
      "total-delete-files" : "0",
      "total-position-deletes" : "0",
      "total-equality-deletes" : "0"
    },
    "manifest-list" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/snap-6410282459040239217-1-2b20c57e-5428-4483-9f7b-928b980dd50d.avro",
    "schema-id" : 0
  }, {
    "snapshot-id" : 138573494821828246,
    "parent-snapshot-id" : 6410282459040239217,
    "timestamp-ms" : 1644761481488,
    "summary" : {
      "operation" : "overwrite",
      "replace-partitions" : "true",
      "flink.job-id" : "d434d6d4f658d61732d7e9a0a85279fc",
      "flink.max-committed-checkpoint-id" : "9223372036854775807",
      "added-data-files" : "1",
      "deleted-data-files" : "1",
      "added-records" : "1",
      "deleted-records" : "1",
      "added-files-size" : "1251",
      "removed-files-size" : "1244",
      "changed-partition-count" : "1",
      "total-records" : "3",
      "total-files-size" : "3766",
      "total-data-files" : "3",
      "total-delete-files" : "0",
      "total-position-deletes" : "0",
      "total-equality-deletes" : "0"
    },
    "manifest-list" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/snap-138573494821828246-1-b243b39e-7122-4571-b6fa-c902241e36a8.avro",
    "schema-id" : 0
  } ],
  "snapshot-log" : [ {
    "timestamp-ms" : 1644761130111,
    "snapshot-id" : 8012517928892530314
  }, {
    "timestamp-ms" : 1644761150082,
    "snapshot-id" : 453371561664052237
  }, {
    "timestamp-ms" : 1644761403566,
    "snapshot-id" : 6410282459040239217
  }, {
    "timestamp-ms" : 1644761481488,
    "snapshot-id" : 138573494821828246
  } ],
  "metadata-log" : [ {
    "timestamp-ms" : 1644760911017,
    "metadata-file" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v1.metadata.json"
  }, {
    "timestamp-ms" : 1644761130111,
    "metadata-file" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v2.metadata.json"
  }, {
    "timestamp-ms" : 1644761150082,
    "metadata-file" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v3.metadata.json"
  }, {
    "timestamp-ms" : 1644761403566,
    "metadata-file" : "hdfs://nnha/user/iceberg/warehouse/iceberg_db/my_user/metadata/v4.metadata.json"
  } ]
}[root@flink1 ~]#

可以看到 "current-snapshot-id" : 138573494821828246,,表示当前的snapshot-id

Flink SQL> set 'execution.runtime-mode' = 'streaming';
[INFO] Session property has been set.

Flink SQL>
Flink SQL> select * from hadoop_catalog.iceberg_db.my_user 
> /*+ options(
'streaming'='true'
'monitor-interval'='5s'
> )*/ ;
+----+----------------------+--------------------------------+------------+--------------------------------+
| op |              user_id |                      user_name |   birthday |                        country |
+----+----------------------+--------------------------------+------------+--------------------------------+
| +I |                    5 |                       zhao_liu | 2022-02-02 |                          japan |
| +I |                    2 |                      zhang_san | 2022-02-01 |                          china |
| +I |                    1 |                      zhang_san | 2022-02-01 |                          china |

可以看到最新snapshot对应的数据

Flink SQL> select * from hadoop_catalog.iceberg_db.my_user 
> /*+ options(
'streaming'='true'
'monitor-interval'='5s',
'start-snapshot-id'='138573494821828246'
> )*/ ;
+----+----------------------+--------------------------------+------------+--------------------------------+
| op |              user_id |                      user_name |   birthday |                        country |
+----+----------------------+--------------------------------+------------+--------------------------------+

这里只能指定最后一个insert overwrite操作的snapshot id,及其后面的snapshot id,否则后台会报异常,且程序一直处于restarting的状态:

java.lang.UnsupportedOperationException: Found overwrite operation, cannot support incremental data in snapshots (8012517928892530314, 138573494821828246]

在本示例中snapshot id: 138573494821828246,是最后一个snapshot id,同时也是最后一个insert overwrite操作的snapshot id。如果再insert两条数据,则只能看到增量的数据

Flink SQL> insert into hadoop_catalog.iceberg_db.my_user(
> user_id, user_name, birthday, country
> ) values(6, 'zhang_san', date '2022-02-01''china');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 8eb279e61aed66304d78ad027eaf8d30


Flink SQL> insert into hadoop_catalog.iceberg_db.my_user(
> user_id, user_name, birthday, country
> ) values(7, 'zhang_san', date '2022-02-01''china');
[INFO] Submitting SQL update statement to the cluster...
[INFO] SQL update statement has been successfully submitted to the cluster:
Job ID: 70a050e455d188d0d3f3adc2ba367fb6


Flink SQL> select * from hadoop_catalog.iceberg_db.my_user 
> /*+ options(
'streaming'='true'
'monitor-interval'='30s',
'start-snapshot-id'='138573494821828246'
> )*/ ;
+----+----------------------+--------------------------------+------------+--------------------------------+
| op |              user_id |                      user_name |   birthday |                        country |
+----+----------------------+--------------------------------+------------+--------------------------------+
| +I |                    6 |                      zhang_san | 2022-02-01 |                          china |
| +I |                    7 |                      zhang_san | 2022-02-01 |                          china |
  • streaming模式支持读取增量snapshot数据

  • 如果不指定start-snapshot-id,则先读取当前snapshot全量数据,再读取增量数据。如果指定start-snapshot-id,读取该snapshot-id之后的增量数据,即不读取该snapshot-id的数据

  • monitor-interval:表示监控新提交的数据文件的时间间隔,默认1s

如果这个文章对你有帮助,不要忘记 「在看」 「点赞」 「收藏」 三连啊喂!