日均百亿级日志处理:微博基于Flink的实时计算平台建设
来自:DBAplus社群
作者介绍
吕永卫,微博广告资深数据开发工程师,实时数据项目组负责人。
黄鹏,微博广告实时数据开发工程师,负责法拉第实验平台数据开发、实时数据关联平台、实时算法特征数据计算、实时数据仓库、实时数据清洗组件开发工作。
林发明,微博广告资深数据开发工程师,负责算法实时特征数据计算、实时数据关联平台、实时数据仓库、FlinkStream组件开发工作。
崔泽峰,微博广告资深数据开发工程师,负责实时算法特征数据计算、实时任务管理平台、FlinkStream组件、FlinkSQL扩展开发工作。
是随着微博业务线的快速扩张,微博广告各类业务日志的数量也随之急剧增长。传统基于Hadoop生态的离线数据存储计算方案已在业界形成统一的默契,但受制于离线计算的时效性制约,越来越多的数据应用场景已从离线转为实时。微博广告实时数据平台以此为背景进行设计与构建,目前该系统已支持日均处理日志数量超过百亿,接入产品线、业务日志类型若干。
相比于Spark,目前Spark的生态总体更为完善一些,且在机器学习的集成和应用性暂时领先。但作为下一代大数据引擎的有力竞争者-Flink在流式计算上有明显优势,Flink在流式计算里属于真正意义上的单条处理,每一条数据都触发计算,而不是像Spark一样的Mini Batch作为流式处理的妥协。Flink的容错机制较为轻量,对吞吐量影响较小,而且拥有图和调度上的一些优化,使得Flink可以达到很高的吞吐量。而Strom的容错机制需要对每条数据进行ack,因此其吞吐量瓶颈也是备受诟病。
这里引用一张图来对常用的实时计算框架做个对比。
Flink是一个开源的分布式实时计算框架。Flink是有状态的和容错的,可以在维护一次应用程序状态的同时无缝地从故障中恢复;它支持大规模计算能力,能够在数千个节点上并发运行;它具有很好的吞吐量和延迟特性。同时,Flink提供了多种灵活的窗口函数。
1)状态管理机制
Flink检查点机制能保持exactly-once语义的计算。状态保持意味着应用能够保存已经处理的数据集结果和状态。
2)事件机制
Flink支持流处理和窗口事件时间语义。事件时间可以很容易地通过事件到达的顺序和事件可能的到达延迟流中计算出准确的结果。
3)窗口机制
Flink支持基于时间、数目以及会话的非常灵活的窗口机制(window)。可以定制window的触发条件来支持更加复杂的流模式。
4)容错机制
Flink高效的容错机制允许系统在高吞吐量的情况下支持exactly-once语义的计算。Flink可以准确、快速地做到从故障中以零数据丢失的效果进行恢复。
5)高吞吐、低延迟
Flink具有高吞吐量和低延迟(能快速处理大量数据)特性。下图展示了Apache Flink和Apache Storm完成分布式项目计数任务的性能对比。
初期架构仅为计算与存储两层,新来的计算需求接入后需要新开发一个实时计算任务进行上线。重复模块的代码复用率低,重复率高,计算任务间的区别主要是集中在任务的计算指标口径上。
在存储层,各个需求方所需求的存储路径都不相同,计算指标可能在不通的存储引擎上有重复,有计算资源以及存储资源上的浪费情况。并且对于指标的计算口径也是仅局限于单个任务需求里的,不通需求任务对于相同的指标的计算口径没有进行统一的限制于保障。各个业务方也是在不同的存储引擎上开发数据获取服务,对于那些专注于数据应用本身的团队来说,无疑当前模式存在一些弊端。
随着数据体量的增加以及业务线的扩展,前期架构模式的弊端逐步开始显现。从当初单需求单任务的模式逐步转变为通用的数据架构模式。为此,我们开发了一些基于Flink框架的通用组件来支持数据的快速接入,并保证代码模式的统一性和维护性。在数据层,我们基于Clickhouse来作为我们数据仓库的计算和存储引擎,利用其支持多维OLAP计算的特性,来处理在多维多指标大数据量下的快速查询需求。在数据分层上,我们参考与借鉴离线数仓的经验与方法,构建多层实时数仓服务,并开发多种微服务来为数仓的数据聚合,指标提取,数据出口,数据质量,报警监控等提供支持。
整体架构分为五层:
1)接入层:接入原始数据进行处理,如Kafka、RabbitMQ、File等。
2)计算层:选用Flink作为实时计算框架,对实时数据进行清洗,关联等操作。
3)存储层: 对清洗完成的数据进行数据存储,我们对此进行了实时数仓的模型分层与构建,将不同应用场景的数据分别存储在如Clickhouse,Hbase,Redis,Mysql等存储。服务中,并抽象公共数据层与维度层数据,分层处理压缩数据并统一数据口径。
4)服务层:对外提供统一的数据查询服务,支持从底层明细数据到聚合层数据5min/10min/1hour的多维计算服务。同时最上层特征指标类数据,如计算层输入到Redis、Mysql等也从此数据接口进行获取。
5)应用层:以统一查询服务为支撑对各个业务线数据场景进行支撑。
监控报警:对Flink任务的存活状态进行监控,对异常的任务进行邮件报警并根据设定的参数对任务进行自动拉起与恢复。根据如Kafka消费的offset指标对消费处理延迟的实时任务进行报警提醒。
数据质量:监控实时数据指标,对历史的实时数据与离线hive计算的数据定时做对比,提供实时数据的数据质量指标,对超过阈值的指标数据进行报警。
整体数据从原始数据接入后经过ETL处理, 进入实时数仓底层数据表,经过配置化聚合微服务组件向上进行分层数据的聚合。根据不同业务的指标需求也可通过特征抽取微服务直接配置化从数仓中抽取到如Redis、ES、Mysql中进行获取。大部分的数据需求可通过统一数据服务接口进行获取。
原始日志数据因为各业务日志的不同,所拥有的维度或指标数据并不完整。所以需要进行实时的日志的关联才能获取不同维度条件下的指标数据查询结果。并且关联日志的回传周期不同,有在10min之内完成95%以上回传的业务日志,也有类似于激活日志等依赖第三方回传的有任务日志,延迟窗口可能大于1天。并且最大日志关联任务的日均数据量在10亿级别以上,如何快速处理与构建实时关联任务的问题首先摆在我们面前。对此我们基于Flink框架开发了配置化关联组件。对于不同关联日志的指标抽取,我们也开发了配置化指标抽取组件用于快速提取复杂的日志格式。以上两个自研组件会在后面的内容里再做详细介绍。
1)回传周期超过关联窗口的日志如何处理?
对于回传晚的日志,我们在关联窗口内未取得关联结果。我们采用实时+离线的方式进行数据回刷补全。实时处理的日志我们会将未关联的原始日志输出到另外一个暂存地(Kafka),同时不断消费处理这个未关联的日志集合,设定超时重关联次数与超时重关联时间,超过所设定任意阈值后,便再进行重关联。离线部分,我们采用Hive计算昨日全天日志与N天内的全量被关联日志表进行关联,将最终的结果回写进去,替换实时所计算的昨日关联数据。
2)如何提高Flink任务性能?
① Operator Chain
为了更高效地分布式执行,Flink会尽可能地将operator的subtask链接(chain)在一起形成task。每个task在一个线程中执行。将operators链接成task是非常有效的优化:它能减少线程之间的切换,减少消息的序列化/反序列化,减少数据在缓冲区的交换,减少了延迟的同时提高整体的吞吐量。
Flink会在生成JobGraph阶段,将代码中可以优化的算子优化成一个算子链(Operator Chains)以放到一个task(一个线程)中执行,以减少线程之间的切换和缓冲的开销,提高整体的吞吐量和延迟。下面以官网中的例子进行说明。
图中,source、map、[keyBy|window|apply]、sink算子的并行度分别是2、2、2、2、1,经过Flink优化后,source和map算子组成一个算子链,作为一个task运行在一个线程上,其简图如图中condensed view所示,并行图如parallelized view所示。算子之间是否可以组成一个Operator Chains,看是否满足以下条件:
-
上下游算子的并行度一致;
-
下游节点的入度为1;
-
上下游节点都在同一个 slot group 中;
-
下游节点的 chain 策略为 ALWAYS;
-
上游节点的 chain 策略为 ALWAYS 或 HEAD;
-
两个节点间数据分区方式是 forward;
-
用户没有禁用 chain。
② Flink异步IO
流式计算中,常常需要与外部系统进行交互。而往往一次连接中你那个获取连接等待通信的耗时会占比较高。下图是两种方式对比示例:
图中棕色的长条表示等待时间,可以发现网络等待时间极大地阻碍了吞吐和延迟。为了解决同步访问的问题,异步模式可以并发地处理多个请求和回复。也就是说,你可以连续地向数据库发送用户a、b、c等的请求,与此同时,哪个请求的回复先返回了就处理哪个回复,从而连续的请求之间不需要阻塞等待,如上图右边所示。这也正是 Async I/O 的实现原理。
③ Checkpoint优化
Flink实现了一套强大的checkpoint机制,使它在获取高吞吐量性能的同时,也能保证Exactly Once级别的快速恢复。
首先提升各节点checkpoint的性能考虑的就是存储引擎的执行效率。Flink官方支持的三种checkpoint state存储方案中,Memory仅用于调试级别,无法做故障后的数据恢复。其次还有Hdfs与Rocksdb,当所做Checkpoint的数据大小较大时,可以考虑采用Rocksdb来作为checkpoint的存储以提升效率。
其次的思路是资源设置,我们都知道checkpoint机制是在每个task上都会进行,那么当总的状态数据大小不变的情况下,如何分配减少单个task所分的的checkpoint数据变成了提升checkpoint执行效率的关键。
最后,增量快照. 非增量快照下,每次checkpoint都包含了作业所有状态数据。而大部分场景下,前后checkpoint里,数据发生变更的部分相对很少,所以设置增量checkpoint,仅会对上次checkpoint和本次checkpoint之间状态的差异进行存储计算,减少了checkpoint的耗时。
3)如何保障任务的稳定性?
在任务执行过程中,会遇到各种各样的问题,导致任务异常甚至失败。所以如何做好异常情况下的恢复工作显得异常重要。
① 设定重启策略
Flink支持不同的重启策略,以在故障发生时控制作业如何重启。集群在启动时会伴随一个默认的重启策略,在没有定义具体重启策略时会使用该默认策略。如果在工作提交时指定了一个重启策略,该策略会覆盖集群的默认策略。
默认的重启策略可以通过 Flink 的配置文件 flink-conf.yaml 指定。配置参数 restart-strategy 定义了哪个策略被使用。
常用的重启策略:
-
固定间隔(Fixed delay);
-
失败率(Failure rate);
-
无重启(No restart)。
② 设置HA
Flink在任务启动时指定HA配置主要是为了利用Zookeeper在所有运行的JobManager实例之间进行分布式协调.Zookeeper通过leader选取和轻量级一致性的状态存储来提供高可用的分布式协调服务。
③ 任务监控报警平台
在实际环境中,我们遇见过因为集群状态不稳定而导致的任务失败。在Flink1.6版本中,甚至遇见过任务出现假死的情况,也就是Yarn上的job资源依然存在,而Flink任务实际已经死亡。为了监测与恢复这些异常的任务,并且对实时任务做统一的提交、报警监控、任务恢复等管理,我们开发了任务提交与管理平台。通过Shell拉取Yarn上Running状态与Flink Job状态的列表进行对比,心跳监测平台上的所有任务,并进行告警、自动恢复等操作。
④ 作业指标监控
Flink任务在运行过程中,各Operator都会产生各自的指标数据,例如,Source会产出numRecordIn、numRecordsOut等各项指标信息,我们会将这些指标信息进行收集,并展示在我们的可视化平台上。指标平台如下图:
⑤ 任务运行节点监控
我们的Flink任务都是运行在Yarn上,针对每一个运行的作业,我们需要监控其运行环境。会收集JobManager及TaskManager的各项指标。收集的指标有jobmanager-fullgc-count、jobmanager-younggc-count、jobmanager-fullgc-time、jobmanager-younggc-time、taskmanager-fullgc-count、taskmanager-younggc-count、taskmanager-fullgc-time、taskmanager-younggc-time等,用于判断任务运行环境的健康度,及用于排查可能出现的问题。监控界面如下:
1)Flink Table
从Flink的官方文档,我们知道Flink的编程模型分为四层,sql是最高层的api, Table api是中间层,DataSteam/DataSet Api是核心,stateful Streaming process层是底层实现。
刚开始我们直接使用Flink Table做为数据关联的方式,直接将接入进来的DataStream注册为Dynamic Table后进行两表关联查询,如下图:
但尝试后发现在做那些日志数据量大的关联查询时往往只能在较小的时间窗口内做查询,否则会超过datanode节点单台内存限制,产生异常。但为了满足不同业务日志延迟到达的情况,这种实现方式并不通用。
2)Rocksdb
之后,我们直接在DataStream上进行处理,在CountWindow窗口内进行关联操作,将被关联的数据Hash打散后存储在各个datanode节点的Rocksdb中,利用Flink State原生支持Rocksdb做Checkpoint这一特性进行算子内数据的备份与恢复。这种方式是可行的,但受制于Rocksdb集群物理磁盘为非SSD的因素,这种方式在我们的实际线上场景中关联耗时较高。
3)外部存储关联
如Redis类的KV存储的确在查询速度上提升不少,但类似广告日志数据这样单条日志大小较大的情况,会占用不少宝贵的机器内存资源。经过调研后,我们选取了Hbase作为我们日志关联组件的关联数据存储方案。
为了快速构建关联任务,我们开发了基于Flink的配置化组件平台,提交配置文件即可生成数据关联任务并自动提交到集群。下图是任务执行的处理流程。
示意图如下:
下图是关联组件内的执行流程图:
1)加入Interval Join
随着日志量的增加,某些需要进行关联的日志数量可能达到日均十几亿甚至几十亿的量级。前期关联组件的配置化生成任务的方式的确解决了大部分线上业务需求,但随着进一步的关联需求增加,Hbase面临着巨大的查询压力。在我们对Hbase表包括rowkey等一系列完成优化之后,我们开始了对关联组件的迭代与优化。
第一步,减少Hbase的查询。我们使用Flink Interval Join的方式,先将大部分关联需求在程序内部完成,只有少部分仍需查询的日志会去查询外部存储(Hbase). 经验证,以请求日志与实验日志关联为例,对于设置Interval Join窗口在10s左右即可减少80%的hbase查询请求
① Interval Join的语义示意图
-
数据JOIN的区间 - 比如时间为3的EXP会在IMP时间为[2, 4]区间进行JOIN;
-
WaterMark - 比如图示EXP一条数据时间是3,IMP一条数据时间是5,那么WaterMark是根据实际最小值减去UpperBound生成,即:Min(3,5)-1 = 2;
-
过期数据 - 出于性能和存储的考虑,要将过期数据清除,如图当WaterMark是2的时候时间为2以前的数据过期了,可以被清除。
② Interval Join内部实现逻辑
③ Interval Join改造
因Flink原生的Intervak Join实现的是Inner Join,而我们业务中所需要的是Left Join,具体改造如下:
-
取消右侧数据流的join标志位;
-
左侧数据流有join数据时不存state。
2)关联率动态监控
在任务执行中,往往会出现意想不到的情况,比如被关联的数据日志出现缺失,或者日志格式错误引发的异常,造成关联任务的关联率下降严重。那么此时关联任务虽然继续在运行,但对于整体数据质量的意义不大,甚至是反向作用。在任务进行恢复的时,还需要清除异常区间内的数据,将Kafka Offset设置到异常前的位置再进行处理。
故我们在关联组件的优化中,加入了动态监控,下面示意图:
-
关联任务中定时探测指定时间范围 Hbase是否有最新数据写入,如果没有,说明写Hbase任务出现问题,则终止关联任务;
-
当写Hbase任务出现堆积时,相应的会导致关联率下降,当关联率低于指定阈值时终止关联任务;
-
当关联任务终止时会发出告警,修复上游任务后可重新恢复关联任务,保证关联数据不丢失。
为了快速进行日志数据的指标抽取,我们开发了基于Flink计算平台的指标抽取组件Logwash。封装了基于Freemaker的模板引擎做为日志格式的解析模块,对日志进行提取,算术运算,条件判断,替换,循环遍历等操作。
下图是Logwash组件的处理流程:
组件支持文本与Json两种类型日志进行解析提取,目前该清洗组件已支持微博广告近百个实时清洗需求,提供给运维组等第三方非实时计算方向人员快速进行提取日志的能力。
配置文件部分示例:
Flink中DataStream的开发,对于通用的逻辑及相同的代码进行了抽取,生成了我们的通用组件库FlinkStream。FlinkStream包括了对Topology的抽象及默认实现、对Stream的抽象及默认实现、对Source的抽象和某些实现、对Operator的抽象及某些实现、Sink的抽象及某些实现。任务提交统一使用可执行Jar和配置文件,Jar会读取配置文件构建对应的拓扑图。
对于Source进行抽象,创建抽象类及对应接口,对于Flink Connector中已有的实现,例如kafka,Elasticsearch等,直接创建新class并继承接口,实现对应的方法即可。对于需要自己去实现的connector,直接继承抽象类及对应接口,实现方法即可。目前只实现了KafkaSource。
与Source抽象类似,我们实现了基于Stream到Stream级别的Operator抽象。创建抽象Operate类,抽象Transform方法。对于要实现的Transform操作,直接继承抽象类,实现其抽象方法即可。目前实现的Operator,直接按照文档使用。如下:
针对Sink,我们同样创建了抽象类及接口。对Flink Connector中已有的Sink进行封装。目前可通过配置进行数据输出的Sink。目前以实现和封装的Sink组件有:Kafka、Stdout、Elasticsearch、Clickhouse、Hbase、Redis、MySQL。
创建Stream抽象类及抽象方法buildStream,用于构建StreamGraph。我们实现了默认的Stream,buildStream方法读取Source配置生成DataStream,通过Operator配置列表按顺序生成拓扑图,通过Sink配置生成数据写出组件。
对于单Stream,要处理的逻辑可能比较简单,主要读取一个Source进行数据的各种操作并输出。对于复杂的多Stream业务需求,比如多流Join,多流Union、Split流等,因此我们多流业务进行了抽象,产生了Topology。在Topology这一层可以对多流进行配置化操作。对于通用的操作,我们实现了默认Topology,直接通过配置文件就可以实现业务需求。对于比较复杂的业务场景,用户可以自己实现Topology。
我们对抽象的组件都是可配置化的,直接通过编写配置文件,构造任务的运行拓扑结构,启动任务时指定配置文件。
-
正文文本框Flink Environment配置化,包括时间处理类型、重启策略,checkpoint等;
-
Topology配置化,可配置不同Stream之间的处理逻辑与Sink;
-
Stream配置化,可配置Source,Operator列表,Sink。
配置示例如下:
run_env:
timeCharacteristic: "ProcessingTime" #ProcessingTime\IngestionTime\EventTime
restart: # 重启策略配置
type: # noRestart, fixedDelayRestart, fallBackRestart, failureRateRestart
checkpoint: # 开启checkpoint
type: "rocksdb" #
streams:
impStream: #粉丝经济曝光日志
type: "DefaultStream"
config:
source:
type: "Kafka011" # 源是kafka011版本
config:
parallelism: 20
operates:
-
type: "StringToMap"
config:
-
type: "SplitElement"
config:
...
-
type: "SelectElement"
config:
transforms:
-
type: "KeyBy"
config:
-
type: "CountWindowWithTimeOut" #Window需要和KeyBy组合使用
config:
-
type: "SplitStream"
config:
-
type: "SelectStream"
config:
sink:
-
type: Kafka
config:
-
type: Kafka
config:
在实时任务管理平台,新建任务,填写任务名称,选择任务类型(Flink)及版本,上传可执行Jar文件,导入配置或者手动编写配置,填写JobManager及TaskManager内存配置,填写并行度配置,选择是否重试,选择是否从checkpoint恢复等选项,保存后即可在任务列表中启动任务,并观察启动日志用于排查启动错误。
SQL语言是一门声明式的,简单的,灵活的语言,Flink本身提供了对SQL的支持。Flink1.6版本和1.8版本对SQL语言的支持有限,不支持建表语句,不支持对外部数据的关联操作。因此我们通过Apache Calcite对Flink SQL API进行了扩展,用户只需要关心业务需求怎么用SQL语言来表达即可。
扩展了支持创建源表SQL,通过解析SQL语句,获取数据源配置信息,创建对应的TableSource实例,并将其注册到Flink environment。示例如下:
使用Apache Calcite对SQL进行解析,通过维表关键字识别维表,使用RichAsyncFunction算子异步读取维表数据,并通过flatMap操作生成关联后的DataStream,然后转换为Table注册到Flink Environment。示例如下:
使用sqlQuery方法,支持从上一层表或者视图中创建视图表,并将新的视图表注册到Flink Environment。创建语句需要按照顺序写,比如myView2是从视图myView1中创建的,则myView1创建语句要在myView2语句前面。如下:
支持创建结果表,通过解析SQL语句,获取配置信息,创建对应的AppendStreamTableSink或者UpsertStreamTableSink实例,并将其注册到Flink Environment。示例如下:
支持自定义UDF函数,继承ScalarFunction或者TableFunction。在resources目录下有相应的UDF资源配置文件,默认会注册全部可执行Jar包中配置的UDF。直接按照使用方法使用即可。
部署方式同FlinkStream组件。
为了保证实时数据的统一对外出口以及保证数据指标的统一口径,我们根据业界离线数仓的经验来设计与构架微博广告实时数仓。
数据仓库分为三层,自下而上为:数据引入层(ODS,Operation Data Store)、数据公共层(CDM,Common Data Model)和数据应用层(ADS,Application Data Service)
数据引入层(ODS,Operation Data Store):将原始数据几乎无处理的存放在数据仓库系统,结构上与源系统基本保持一致,是数据仓库的数据准。
数据公共层(CDM,Common Data Model,又称通用数据模型层):包含DIM维度表、DWD和DWS,由ODS层数据加工而成。主要完成数据加工与整合,建立一致性的维度,构建可复用的面向分析和统计的明细事实表,以及汇总公共粒度的指标。
公共维度层(DIM):基于维度建模理念思想,建立整个企业的一致性维度。降低数据计算口径和算法不统一风险。
公共维度层的表通常也被称为逻辑维度表,维度和维度逻辑表通常一一对应。
公共汇总粒度事实层(DWS,Data Warehouse Service):以分析的主题对象作为建模驱动,基于上层的应用和产品的指标需求,构建公共粒度的汇总指标事实表,以宽表化手段物理化模型。构建命名规范、口径一致的统计指标,为上层提供公共指标,建立汇总宽表、明细事实表。
公共汇总粒度事实层的表通常也被称为汇总逻辑表,用于存放派生指标数据。
明细粒度事实层(DWD,Data Warehouse Detail):以业务过程作为建模驱动,基于每个具体的业务过程特点,构建最细粒度的明细层事实表。可以结合企业的数据使用特点,将明细事实表的某些重要维度属性字段做适当冗余,也即宽表化处理。
明细粒度事实层的表通常也被称为逻辑事实表。
数据应用层(ADS,Application Data Service):存放数据产品个性化的统计指标数据。根据CDM与ODS层加工生成。
对于原始日志数据,ODS层几乎是每条日志抽取字段后进行保留,这样便能对问题的回溯与追踪。在CDM层对ODS的数据仅做时间粒度上的数据压缩,也就是在指定时间切分窗口里,对所有维度下的指标做聚合操作,而不涉及业务性的操作。在ADS层,我们会有配置化抽取微服务,对底层数据做定制化计算和提取,输出到用户指定的存储服务里。
长按订阅更多精彩▼
如有收获,点个在看,诚挚感谢