vlambda博客
学习文章列表

【Flink】第九篇:Flink SQL 性能优化实战


缘起



最近我们组在大规模上线Flink SQL作业。首先,在进行跑批量初始化完历史数据后,剩下的就是消费Kafka历史数据进行追数了。但是发现某些作业的追数过程十分缓慢,要运行一晚上甚至三四天才能追上最新数据。由于是实时数仓指标计算上线初期,经常验证作业如果有问题就得重蹈覆辙重新追数,效率很低,于是我开始分析Flink SQL的优化。



问题



insert into tableBselect a, max(b), max(c), sum(d) ...from tableAgroup by a


上面这个作业的简化版SQL,主要就是做一个分组聚合:


  1. 从tableA分组聚合出结果插入tableB


  2. tableA的联合主键是:a,b(但是a的离散度已经很高了)


  3. tableA的Flink表类型为upset-kafka


  4. tableB的Flink表类型为HBase



初步分析



这个作业跑在集群上的job graph如下:


【Flink】第九篇:Flink SQL 性能优化实战


可以看到有三个vertex:


  1. 第一个是TableSourceScan


  2. 第二个是ChangelogNormalize


  3. 第三个是GroupAggregate


TableSourceScan接入tableA表的upsert-kafka流;


ChangelogNormalize对upset-kafka进行撤回语义的解析;


GroupAggregate对撤回流进行分组聚合,然后写入tableB的HBase;



优化思路1:local/global agg



agg分类:


  • group agg

select count(a) from t group by b


  • over agg

select count(a) over (partition by b order by c) from t


  • window agg

select count(a) from t group by tumble(ts, interval '10' seconds), b


local/global agg:


【Flink】第九篇:Flink SQL 性能优化实战


核心思想与hadoop的combiner是一致的,就是在mapreduce的过程中,在map阶段就做一个预聚合,即combine操作。

【Flink】第九篇:Flink SQL 性能优化实战


带来的收益是:减少网络shuffle数据,提升计算引擎的性能。


前提条件:


  1. agg的所有agg function都是mergeable(实现merge方法)


  2. table.optimizer.agg-phase-strategy为AUTO或TWO_PHASE



  3. Stream下,minibatch开启;Batch下,AUTO会根据cost选择


解释说明:


mergeable其实就是能用分治法解决的计算问题,例如sum、count等,而avg就不能用分治法先计算部分元素的avg,再计算最终avg了,结果有时候会出错。


table.optimizer.agg-phase-strategy:默认为AUTO,意思是引擎尽量做预聚合;TWO_PHASE表示所有聚合操作都做预聚合;ONE_PHASE表示所有聚合都不做预聚合。


minibatch:即开启微批模式。主要有三个参数:


  • table.exec.mini-batch.enabled:是否开启,默认不开启

  • table.exec.mini-batch.size:微批的record buffer大小

  • table.exec.mini-batch.allow-latency:微批的time buffer大小


minibatch的本质就是平衡实时性和吞吐量的刻度尺。


所以,local/global agg一共需要三个参数控制。



验证



经过对比验证,在这个SQL场景下的效率提升很小。


local/global agg降低了第二个vertex即ChangelogNormalize的sent records的数据量,而并没有使得第一个vertex的数据处理效率有显著提升。


所以,这个作业的瓶颈并不在vertex间, 而在于第一个vertex的处理数据效率。



优化思路二:调大并行度



这个思路的关键在于source upsert-kafka的分区数,这是制约吞吐量的瓶颈。因为在upsert-kafka中,每个partition最多被一个Flink线程读取。


增加了10倍的并行度,source分区也增加10倍后,作业周转时间缩短了将近一半。



优化思路三:RocksDB性能调优



仔细分析这个SQL作业,是对一个联合主键的字段做group by,那么state一定会非常大。


经过在对这个表在数仓中的数据进行分析,发现这个字段的离散度几乎接近于主键的离散度。


而进行group by必然要根据每一条upsert kafka的数据去查验在flink statebackend中物化的source table中该字段值的分布情况,这应该是才是瓶颈所在!


沿着这个思路,开始分析Flink的statebackend机制。


这里我们简单回顾一下Flink statebackend(后面再做专题总结):


由 Flink 管理的 keyed state 是一种分片的键/值存储,每个 keyed state 的工作副本都保存在负责该键的 taskmanager 本地中。另外,Operator state 也保存在机器节点本地。Flink 定期获取所有状态的快照,并将这些快照复制到持久化的位置,例如分布式文件系统。


如果发生故障,Flink 可以恢复应用程序的完整状态并继续处理,就如同没有出现过异常。


Flink 管理的状态存储在 state backend 中。Flink 有两种 state backend 的实现 – 一种基于 RocksDB 内嵌 key/value 存储将其工作状态保存在磁盘上的,另一种基于堆的 state backend,将其工作状态保存在 Java 的堆内存中。这种基于堆的 state backend 有两种类型:FsStateBackend,将其状态快照持久化到分布式文件系统;MemoryStateBackend,它使用 JobManager 的堆保存状态快照。


【Flink】第九篇:Flink SQL 性能优化实战


当使用基于堆的 state backend 保存状态时,访问和更新涉及在堆上读写对象。但是对于保存在 RocksDBStateBackend 中的对象,访问和更新涉及序列化和反序列化,所以会有更大的开销。但 RocksDB 的状态量仅受本地磁盘大小的限制。还要注意,只有 RocksDBStateBackend 能够进行增量快照,这对于具有大量变化缓慢状态的应用程序来说是大有裨益的。


所有这些 state backends 都能够异步执行快照,这意味着它们可以在不妨碍正在进行的流处理的情况下执行快照。


我们的线上一般采用的是RocksDB作为状态后端,checkpoint dir采用hdfs文件系统。其实我个人觉得这个应该根据作业的特性进行选择,根据我个人的经验以及知识沉淀,选择的主要因素是作业的state大小及对处理数据性能的要求:


  • RocksDBStateBackend可以突破内存的限制,rocksDB的数据逻辑结构和redis相似,但是数据的物理存储结构又和hbase相似,继承自levelDB的LSM树思想,缺点是性能太低

  • 而FsStateBackend是在做snapshot的时候才将内存的state持久化到远端,速度接近于内存状态

  • MemoryStateBackend是纯内存的,一般只用做调试。


但是由于这个大状态作业追数速度实在太慢,我甚至想过:


在追数的时候用FsStateBackend,并配置大内存,且把managed memory调成0,同时将ck的周期设置的很大,基本上不做ck,追上后savepoint。再把状态后端换成RocksDB,并且从FSSatebackend的savepoint处恢复,但是发现1.13才支持savepoint切换statebackend类型。


只剩下调优RocksDB一条路了。根据之前对HBase的LSM原理的理解,进行知识迁移,马上对RocksDB有了一定的认识。在HBase中调优效果最明显无乎:


blockcache读缓存、memStore写缓存、增加布隆过滤器、提升compact效率


沿着这个思路,再查阅了一番RocksDB资料后,决定先对如下参数进行调优:


  • state.backend.rocksdb.block.cache-size

  • state.backend.rocksdb.block.blocksize


Block 块是 RocksDB 保存在磁盘中的 SST 文件的基本单位,它包含了一系列列有序的 Key 和 Value 集,可以设置固定的大小。



但是,通过增加 Block Size,会显著增加读放大(Read Amplification)效应,令读取数据时,吞吐量下降。原因是 Block Size增加以后,如果 Block Cache 的大小没有变,就会⼤大减少 Cache 中可存放的 Block 数。如果 Cache 中还存处理索引和过滤器等内容,那么可放置的数据块数目就会更少,可能需要更多的磁盘 IO 操作,找到数据就更更慢了,此时读取性能会大幅下降。反之,如果减小BlockSize,会让读的性能有不少提升,但是写性能会下降,⽽而且对 SSD 寿命也不利。


因此我的调优经验是,如果需要增加 Block Size 的大小来提升读写性能,请务必一并增加 Block Cache Size 的大小,这样才可以取得比较好的读写性能。Block Cache,缓存清除算法⽤用的是 LRU(Least Recently Used)。



验证



测试对比后发现,原本半天左右完成的作业只需要一到两个小时即可追上数据!



感悟



性能调优就如同把脉治病,关键在于对症下药


前期,要分析当前场景下真正制约性能的瓶颈所在,后期,在症结处用效果最明显的方式处理症结。





参考



  1. https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/learn-flink/fault_tolerance/

  2. https://github.com/facebook/RocksDB/wiki/Block-cache

  3. https://github.com/facebook/rocksdb/wiki/Memory-usage-in-rocksdb