【Flink】第九篇:Flink SQL 性能优化实战
缘起
最近我们组在大规模上线Flink SQL作业。首先,在进行跑批量初始化完历史数据后,剩下的就是消费Kafka历史数据进行追数了。但是发现某些作业的追数过程十分缓慢,要运行一晚上甚至三四天才能追上最新数据。由于是实时数仓指标计算上线初期,经常验证作业如果有问题就得重蹈覆辙重新追数,效率很低,于是我开始分析Flink SQL的优化。
问题
insert into tableB
select a, max(b), max(c), sum(d) ...
from tableA
group by a
上面这个作业的简化版SQL,主要就是做一个分组聚合:
从tableA分组聚合出结果插入tableB
tableA的联合主键是:a,b(但是a的离散度已经很高了)
tableA的Flink表类型为upset-kafka
tableB的Flink表类型为HBase
初步分析
这个作业跑在集群上的job graph如下:
可以看到有三个vertex:
第一个是TableSourceScan
第二个是ChangelogNormalize
第三个是GroupAggregate
TableSourceScan接入tableA表的upsert-kafka流;
ChangelogNormalize对upset-kafka进行撤回语义的解析;
GroupAggregate对撤回流进行分组聚合,然后写入tableB的HBase;
优化思路1:local/global agg
agg分类:
group agg
select count(a) from t group by b
over agg
select count(a) over (partition by b order by c) from t
window agg
select count(a) from t group by tumble(ts, interval '10' seconds), b
local/global agg:
核心思想与hadoop的combiner是一致的,就是在mapreduce的过程中,在map阶段就做一个预聚合,即combine操作。
带来的收益是:减少网络shuffle数据,提升计算引擎的性能。
前提条件:
agg的所有agg function都是mergeable(实现merge方法)
table.optimizer.agg-phase-strategy为AUTO或TWO_PHASE
Stream下,minibatch开启;Batch下,AUTO会根据cost选择
解释说明:
mergeable其实就是能用分治法解决的计算问题,例如sum、count等,而avg就不能用分治法先计算部分元素的avg,再计算最终avg了,结果有时候会出错。
table.optimizer.agg-phase-strategy:默认为AUTO,意思是引擎尽量做预聚合;TWO_PHASE表示所有聚合操作都做预聚合;ONE_PHASE表示所有聚合都不做预聚合。
minibatch:即开启微批模式。主要有三个参数:
table.exec.mini-batch.enabled:是否开启,默认不开启
table.exec.mini-batch.size:微批的record buffer大小
table.exec.mini-batch.allow-latency:微批的time buffer大小
minibatch的本质就是平衡实时性和吞吐量的刻度尺。
所以,local/global agg一共需要三个参数控制。
验证
经过对比验证,在这个SQL场景下的效率提升很小。
local/global agg降低了第二个vertex即ChangelogNormalize的sent records的数据量,而并没有使得第一个vertex的数据处理效率有显著提升。
所以,这个作业的瓶颈并不在vertex间, 而在于第一个vertex的处理数据效率。
优化思路二:调大并行度
这个思路的关键在于source upsert-kafka的分区数,这是制约吞吐量的瓶颈。因为在upsert-kafka中,每个partition最多被一个Flink线程读取。
增加了10倍的并行度,source分区也增加10倍后,作业周转时间缩短了将近一半。
优化思路三:RocksDB性能调优
仔细分析这个SQL作业,是对一个联合主键的字段做group by,那么state一定会非常大。
经过在对这个表在数仓中的数据进行分析,发现这个字段的离散度几乎接近于主键的离散度。
而进行group by必然要根据每一条upsert kafka的数据去查验在flink statebackend中物化的source table中该字段值的分布情况,这应该是才是瓶颈所在!
沿着这个思路,开始分析Flink的statebackend机制。
这里我们简单回顾一下Flink statebackend(后面再做专题总结):
由 Flink 管理的 keyed state 是一种分片的键/值存储,每个 keyed state 的工作副本都保存在负责该键的 taskmanager 本地中。另外,Operator state 也保存在机器节点本地。Flink 定期获取所有状态的快照,并将这些快照复制到持久化的位置,例如分布式文件系统。
如果发生故障,Flink 可以恢复应用程序的完整状态并继续处理,就如同没有出现过异常。
Flink 管理的状态存储在 state backend 中。Flink 有两种 state backend 的实现 – 一种基于 RocksDB 内嵌 key/value 存储将其工作状态保存在磁盘上的,另一种基于堆的 state backend,将其工作状态保存在 Java 的堆内存中。这种基于堆的 state backend 有两种类型:FsStateBackend,将其状态快照持久化到分布式文件系统;MemoryStateBackend,它使用 JobManager 的堆保存状态快照。
当使用基于堆的 state backend 保存状态时,访问和更新涉及在堆上读写对象。但是对于保存在 RocksDBStateBackend 中的对象,访问和更新涉及序列化和反序列化,所以会有更大的开销。但 RocksDB 的状态量仅受本地磁盘大小的限制。还要注意,只有 RocksDBStateBackend 能够进行增量快照,这对于具有大量变化缓慢状态的应用程序来说是大有裨益的。
所有这些 state backends 都能够异步执行快照,这意味着它们可以在不妨碍正在进行的流处理的情况下执行快照。
我们的线上一般采用的是RocksDB作为状态后端,checkpoint dir采用hdfs文件系统。其实我个人觉得这个应该根据作业的特性进行选择,根据我个人的经验以及知识沉淀,选择的主要因素是作业的state大小及对处理数据性能的要求:
RocksDBStateBackend可以突破内存的限制,rocksDB的数据逻辑结构和redis相似,但是数据的物理存储结构又和hbase相似,继承自levelDB的LSM树思想,缺点是性能太低
而FsStateBackend是在做snapshot的时候才将内存的state持久化到远端,速度接近于内存状态
MemoryStateBackend是纯内存的,一般只用做调试。
但是由于这个大状态作业追数速度实在太慢,我甚至想过:
在追数的时候用FsStateBackend,并配置大内存,且把managed memory调成0,同时将ck的周期设置的很大,基本上不做ck,追上后savepoint。再把状态后端换成RocksDB,并且从FSSatebackend的savepoint处恢复,但是发现1.13才支持savepoint切换statebackend类型。
只剩下调优RocksDB一条路了。根据之前对HBase的LSM原理的理解,进行知识迁移,马上对RocksDB有了一定的认识。在HBase中调优效果最明显无乎:
blockcache读缓存、memStore写缓存、增加布隆过滤器、提升compact效率
沿着这个思路,再查阅了一番RocksDB资料后,决定先对如下参数进行调优:
state.backend.rocksdb.block.cache-size
state.backend.rocksdb.block.blocksize
Block 块是 RocksDB 保存在磁盘中的 SST 文件的基本单位,它包含了一系列列有序的 Key 和 Value 集合,可以设置固定的大小。
但是,通过增加 Block Size,会显著增加读放大(Read Amplification)效应,令读取数据时,吞吐量下降。原因是 Block Size增加以后,如果 Block Cache 的大小没有变,就会⼤大减少 Cache 中可存放的 Block 数。如果 Cache 中还存处理索引和过滤器等内容,那么可放置的数据块数目就会更少,可能需要更多的磁盘 IO 操作,找到数据就更更慢了,此时读取性能会大幅下降。反之,如果减小BlockSize,会让读的性能有不少提升,但是写性能会下降,⽽而且对 SSD 寿命也不利。
因此我的调优经验是,如果需要增加 Block Size 的大小来提升读写性能,请务必一并增加 Block Cache Size 的大小,这样才可以取得比较好的读写性能。Block Cache,缓存清除算法⽤用的是 LRU(Least Recently Used)。
验证
测试对比后发现,原本半天左右完成的作业只需要一到两个小时即可追上数据!
感悟
性能调优就如同把脉治病,关键在于对症下药。
前期,要分析当前场景下真正制约性能的瓶颈所在,后期,在症结处用效果最明显的方式处理症结。
参考
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/learn-flink/fault_tolerance/
https://github.com/facebook/RocksDB/wiki/Block-cache
https://github.com/facebook/rocksdb/wiki/Memory-usage-in-rocksdb