vlambda博客
学习文章列表

FLink checkpoint生产使用实践和原理详解


    在了解checkpoint相关的概念前,需要先梳理几个对应的概念,分别是状态(state)、状态后端、savepoint。


状态(state)

    相对于sparkStreaming在状态管理中需要借助外部的存储(redis)等来手动的状态管理,flink的自己状态管理也是它在流式处理中更加具有优势的地方。

    在流式计算中操作一次就是处理一个独立的事件这个就是无状态的计算,但是有些操作则是需要记住多个事件的信息(窗口操作),这些需要记住多个事件信息的操作就是有状态的,也就是有状态计算。


状态的使用场景

>去重

    对于在统计实时支付人数来说,一般都是对支付人数根据member_id进行去重统计的,这个就需要记录历史中消费者的id才能实现去重,而不是独立的来一条消费记录就能处理,需要记录有历史的数据也就是状态。
> 聚合

    聚合操作也是一样的,在统计今日支付的金额或者订单数时候需要记录下来历史当前时刻的历史支付金额从而进行累加

>检测

    例如检测一个温度传感器在数据流中的温度是否在持续上升,而这个就是要用当前数据和历史数据的一个比较,而不是独立的处理当前数据,这个历史温度趋势的保存也就是状态了。


状态的分类

    一般来说具有下面所列的两种基本类型的状态:


Managed State

Raw State

状态管理方式

Flink Runtime托管, 自动存储, 自动恢复, 自动伸缩

用户自己管理

状态数据结构

Flink提供多种常用数据结构, 例如:ListState, MapState

字节数组: byte[]

使用场景

绝大数Flink算子

所有算子

    一般来说,通常不需要我们在自定义使用RawState,对于大部分的场景来说managedSate已经满足我们所需要的状态管理,对于managerstate来说又有两种类型:

    a>算子状态(operate state)

    b>键控状态(keyed    state)


Operator State

Keyed State

适用算子类型

可用于所有算子: 常用于source, 例如 FlinkKafkaConsumer

只适用于KeyedStream上的算子

状态分配

一个算子的子任务对应一个状态

一个Key对应一个State: 一个算子会处理多个Key, 则访问相应的多个State

创建和访问方式

实现CheckpointedFunctionListCheckpointed(已经过时)接口

重写RichFunction, 通过里面的RuntimeContext访问

横向扩展

并发改变时有多重重写分配方式可选: 均匀分配和合并后每个得到全量

并发改变, State随着Key在实例间迁移

支持的数据结构

ListStateBroadCastState

ValueState, ListState,MapState ReduceState, AggregatingState

更多状态的具体使用本文不再详细介绍可以在后面的文章做单独的介绍具体的使用案例。


状态后端

    负责状态的访问、存储和维护的一个可插入的组件就是状态后端。


状态后端最主要负责两件事情:

>>本地(taskmanager)的状态管理

    每条数据过来时,有状态的算子都会读取和更新状态,所以在访问时对于数据的处理低延时至关重要,因此每个并行子任务都会在本地维护对应的状态,可以保证快速访问。

>>将checkpoint状态写入远程存储。


由此Flink提供了3种状态后端:

MemoryStateBackend:内存级别的状态后端, 

存储方式:本地状态存储在TaskManager的内存中, 对应的checkpoint 存储在JobManager的内存中.

特点:快速, 低延迟, 但不稳定

使用场景: 1. 本地测试 2. 几乎无状态的作业(ETL) 3. JobManager不容易挂, 或者挂了影响不大. 4. 不推荐在生产环境下使用


FsStateBackend:

存储方式: 本地状态在TaskManager内存, Checkpoint时, 先把状态发送到JobManager的内存, 然后再存储在文件系统中

特点: 拥有内存级别的本地访问速度, 和更好的容错保证

使用场景: 1. 常规使用状态的作业. 例如分钟级别窗口聚合, join等 2. 需要开启HA的作业 3. 可以应用在生产环境中


RocksDBStateBackend:将所有的状态序列化之后, 存入本地的RocksDB数据库中.(一种NoSql数据库, KV形式存储)

存储方式: 1. 本地状态存储在TaskManager的RocksDB数据库中(实际是内存+磁盘) 2. Checkpoint在外部文件系统中.

使用场景: 1. 超大状态的作业, 例如天级的窗口聚合 2. 需要开启HA的作业 3. 对读写状态性能要求不高的作业 4. 可以使用在生产环境


savepoint

    Flink 还提供了可以自定义的镜像保存功能,就是保存点(savepoints)

savepoint和checkpoint的区别

Savepoint

Checkpoint

Savepoint是由命令触发, 由用户创建和删除

Checkpoint被保存在用户指定的外部路径中, flink自动触发

保存点存储在标准格式存储中,并且可以升级作业版本并可以更改其配置。

当作业失败或被取消时,将保留外部存储的检查点。

用户必须提供用于还原作业状态的保存点的路径。

用户必须提供用于还原作业状态的检查点的路径。


checkpoint、状态、状态后端、savepoint联系

    

    前面已经逐次的介绍了相关的概念,之间的联系就是checkpoint是从source触发到下游所有节点由状态后端完成的对所有状态一次保存的全局操作。单独对于状态而言则是独立的各个具有状态算子所保存的数据。

    对于savepoint则可以简单的理解为checkpoint是flink自动管理进行的状态保存,而savepoint则是手动操作保存的checkpoint。


checkpoint原理机制深入

    

下图左侧是 Checkpoint Coordinator,是整个 Checkpoint 的发起者,中间是由两个 source,一个 sink 组成的 Flink 作业,最右侧的是持久化存储,在大部分用户场景中对应 HDFS。

a. 第一步,Checkpoint Coordinator 向所有 source 节点触发Checkpoint;

b. 第二步,source 节点向下游广播 barrier,这个 barrier 就是实现 Chandy-Lamport 分布式快照算法的核心,下游的 task 只有收到所有 input 的 barrier 才会执行相应的 Checkpoint。

FLink checkpoint生产使用实践和原理详解

FLink checkpoint生产使用实践和原理详解

d. 第四步,下游的 sink 节点收集齐上游两个 input 的 barrier 之后,会执行本地快照,这里特地展示了 RocksDB incremental Checkpoint 的流程,首先 RocksDB 会全量刷数据到磁盘上(红色大三角表示),然后 Flink 框架会从中选择没有上传的文件进行持久化备份(紫色小三角)。

FLink checkpoint生产使用实践和原理详解

e. 同样的,sink 节点在完成自己的 Checkpoint 之后,会将 state handle 返回通知 Coordinator。

FLink checkpoint生产使用实践和原理详解

f. 最后,当 Checkpoint coordinator 收集齐所有 task 的 state handle,就认为这一次的 Checkpoint 全局完成了,向持久化存储中再备份一个 Checkpoint meta 文件。

FLink checkpoint生产使用实践和原理详解

Checkpoint 的 EXACTLY_ONCE 语义

    为了实现 EXACTLY ONCE 语义,Flink 通过一个 input buffer 将在对齐阶段收到的数据缓存起来,等对齐完成之后再进行处理。而对于 AT LEAST ONCE 语义,无需缓存收集到的数据,会对后续直接处理,所以导致 restore 时,数据可能会被多次处理。下图是官网文档里面就 Checkpoint align 的示意图:


FLink checkpoint生产使用实践和原理详解


    需要特别注意的是,Flink 的 Checkpoint 机制只能保证 Flink 的计算过程可以做到 EXACTLY ONCE,端到端的 EXACTLY ONCE 需要 source 和 sink 支持。


实践使用

    在公司使用中一般来说是将checkpoint的保存路径在HDFS中,但是本次要介绍的是使用s3对象文件存储系统来保存对应的状态。

    s3使用的是MinIO

 

    MinIO是一款开源云存储软件,可为大型数据基础架构提供高性能的分布式对象存储。它与Amazon S3 API兼容。

    MinIO服务器存储所有类型的非结构化数据,例如照片,视频,日志文件等。它也可以在开源Apache V2许可下使用,并且许多最强大的大数据和机器学习应用程序都使用MinIO S3对象存储。

    相对于hdfs,s3可以更加节省存储资源来保证数据安全,hdfs会暴露主节点来实现数据的备份安全和管理,同时s3可以相对于hdfs更能实现单独扩展等优势。

flink-conf.yaml对应的文件配置

# state.backend: filesystemstate.backend: filesystem
# Directory for checkpoints filesystem, when using any of the default bundled# state backends.## state.checkpoints.dir: hdfs://namenode-host:port/flink-checkpointsstate.checkpoints.dir: s3://state/checkpoints3.endpoint: http://10.150.60.10:31000s3.path.style.access: trues3.access-key: admins3.secret-key: boWtRT9wa4
# Default target directory for savepoints, optional.## state.savepoints.dir: hdfs://namenode-host:port/flink-savepointsstate.savepoints.dir: s3://state/savepoints3.endpoint: http://10.150.60.10:31000s3.path.style.access: trues3.access-key: admins3.secret-key: boWtRT9wa4
# Flag to enable/disable incremental checkpoints for backends that

    上面所给的就是基本的使用s3对应的配置参考,更多的也可以在flink官网中找到对应的配置。flink checkpoint的开启也是实现flink任务失败恢复策略的前提。

代码中配置开启checkpoint:


 //0.创建执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //开启checkpoint及相关配置 env.enableCheckpointing(5000L)    .getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(60000L); //任务失败恢复策略配置 env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,5000L)); env.setStateBackend(new FsStateBackend("hdfs://ip:8020/flink/ck")); //配置系统用户认证 System.setProperty("HADOOP_USER_NAME","bigdata");





扫码关注