从Spark到Flink,菜鸟实时数仓架构是怎样炼成的?
在开源盛世的今天,实时数仓的建设已经有了较为成熟的方案,技术选型上也都各有优劣。菜鸟作为物流供应链的主力军,时效要求已经成为了核心竞争力,离线数仓已不能满足发展的需要,在日益增长的订单和时效挑战下,菜鸟技术架构也在不断发展和完善,如何更准更高效的完成开发和维护,变得格外重要。本文将为大家分享菜鸟技术团队在建设实时数仓技术架构中的一些经验和探索,希望能给大家带来启发。
本文主要包括以下内容:
以前的实时数据技术架构
数据模型、计算引擎、数据服务的升级
其他技术工具的探索和创新
未来发展与思考
一、以前的实时数据技术架构
数据模型:
业务线内部模型层次混乱,数据使用成本特别高
需求驱动的烟囱式开发,完全没有复用的可能性,计算成本居高不下
各业务线横向或者纵向的交叉,导致开发过程中数据一致性偏差较大
纵向的数据模型,导致 BI 使用时比较困难
实时计算:
我们之前使用阿里云的 JStorm 和 Spark Streaming 进行开发,这两部分能满足大部分的实时数据开发,但是应用到物流供应链场景当中,实现起来并不简单,甚至无法实现
很难同时兼顾功能、性能、稳定性以及快速故障恢复能力
数据服务:
开发过程中,实时数据下沉到 MySQL,HBase 等数据库中,查询和保障方面不灵活
BI 权限控制和全链路保障不可靠
二、数据模型升级
充分复用公共中间层模型
参考离线数仓,将实时数据进行分层,第一层是数据采集,将从 MySQL 等数据库中采集到的数据放到 TT 消息中间件中,然后基于 TT 消息中间件和 HBase 的各种维表关联产生事实明细宽表,再将生成的数据写到 TT 消息中间件中。
通过订阅这个消息产生轻度汇总层和高度汇总层两层。轻度汇总层主要按照多个维度沉淀数据,高度汇总层主要用于大屏场景。
充分复用公共预值分流模型
左边公共数据中间层是将所有业务线整合在一起,然后进行分层。右边的业务数据中间层是各个业务基于横向的公共数据中间层的基础上,去分流自己业务的数据中间层,然后根据这些实时的消息,个性化的产出自己业务的数据中间层。
比如不同的订单,可以横向分流成进口供应链和出口供应链。这样实现了上游只需要一个公共分流作业完成,节约了计算资源。
左边是公共的数据中间层,里面包含整个大盘的订单数据,大盘的物流详情,汇总的公共粒度数据等,在这个基础之上做了个分流任务,然后从物流订单,物流详情等里面拆分出我们自己个性化的业务,比如国内的供应链,进口的供应链以及出口的供应链等。
经过这些步操作之后,可以轻易的区分哪些表是大屏的数据,哪些表是统计分析的数据,在数据易用性方面有了很大的提升。
三、计算引擎的提升
一开始,我们采用了 JStorm 和 Spark Streaming 进行实时开发。这两个计算引擎在满足大部分的场景时,没有特别大的问题,但是在应用到供应链或者物流场景时,不是那么简单。所以我们在17年切换到了 Flink 上,Flink 提供了一些很实用的功能,而这些功能在一些供应链的场景下比较适用:
首先是内部 Flink 支持一套完整的 SQL 写法,提高了开发效率;
其次是 Flink 内置的基于 State 的 retraction 机制,很好的支持了供应链当中的取消订单,换配等这种操作,而且非常简单。Flink 的 CEP 功能,方便的实现了超时统计的功能,还有目前正在推的 AtoScaling 方案,比如数据倾斜,资源配置等;
再者是 Flink 在批流混合问题的处理,有很好的支持。
基于Flink的实时计算引擎
左边有4列数据,第1列数据是物流订单,第2列的数据是物流订单的创建时间,第3列数据是这个订单是不是被取消,第4个数据是计划配送公司,就是订单分配给哪个配送公司。
这个业务需求看上去非常简单,其实就是统计每个配送公司计划履行的有效单量有多少。但是有两个点需要注意一下:
第一点,有一个订单 LP3,在开始的时候是有效的,然后在最后一条时取消了,就变成了一个无效订单,但这一条订单不应该统计在内,因为业务需求统计的是有效订单。
第二点,配送公司的转变,LP1 这个订单在一分钟时是 tmsA 来配送,后来变成了 tmsB,再过了一段时间,这个订单又变成 tmsC 配送。如果基于 Storm 增量计算,得出的结果显然是错误的,这时要按照最后一次消息给我们传过来的数据统计。
这样的场景在 Flink 中是如何实现的呢?Flink 也提供了一些非常好的回撤机制,下面是伪代码的实现:
第一段代码使用 Flink 的 last_value 函数,获取这个订单最后一个消息非空的值,在这个基础上进行汇总。一旦 last_value 中字段发生变化,都会触发撤回机制,得到最后正确的值。
这个案例发生在菜鸟实际物流场景中,第1个表格是一个日志的时间,第2个是物流订单,第3个字段是出库时间,最后一个字段是揽收时间,现在需要统计的是出库超6个小时没有被揽收的单量。
这里涉及到全链路时效问题,全链路时效指从下单到仓库发货到快递揽收到签收的整体时间,这个场景放在离线中是非常容易实现的,但是放到实时中来不是很简单。
比如 LP1 在00:05分的时候没有揽收,现在如果当下时刻是12点的话,也没有揽收,理论上应该计算这个订单,但是没有揽收就意味着没有消息进来,没有消息进来我们又要统计,其实我们用 Flink 是没法统计的,那这种情况我们怎么处理呢?我们的解决方案是如果没有这条消息我们用 Flink 来制造这条消息。这种超时消息统计我们想到了几种方法:
包括引入消息中间件 ( kafka ) 和 Flink 的 CEP。最终选择了 Flink 的 Timer Service,因为这种消息不是特别多,中间件又特别重。而 CEP 会丢掉一些回传不准确的消息,导致数据计算不准确,针对这些情况,我们在调研之后选择了 Timer Service,同时我们对它底层的 ProcessElement 和 OnTimer 两个方法进行了改写。
ProcessElement 告诉 Flink 存储什么样的数据,然后启动针对每一个超时的事件的 Timer Service。OnTimer 方法会在每个超时的时刻读这个超时的消息,并把这个超时的消息下发下来。基于下游跟正常流的关联操作之后就能计算超时消息的单量。伪代码如下:
先构造一个 process funcation 到 state 存数据,并为每一个超时的数据注册一个 Timer Service。然后执行 OnTimer 这个方法,读取并把这个超时的消息下发下去。
关于数据倾斜的问题,上图显示在 map 阶段 shuffer 之后数据倾斜到了红色的 Agg 上,这时就出现热点了。原来我们是对这个 lg_order_code 进行 hash 取值操作,然后再针对散列的结果进行二次的聚合,这样操作后在一定程度上减轻了数据的倾斜。
在最近的 Flink 的版本中已经实现了规避数据倾斜的方法,我们内部的 Blink 版本,有几个功能去优化热点的问题:
第一个就是 MiniBatch,之前来一条数据,我们就去 State 里面查询然后写入,MiniBatch 的作用是把所有的数据先聚合一次,类似一个微批处理,然后再把这个数据写到 State 里面,或者在从 State 里面查出来,这样可以大大的减轻对 State 查询的压力。
第二个办法就是 LocalGlobal,类似于在 hive 中 map 阶段中的 combiner,通过设置这个参数可以在读的时候先聚合。
第三个办法是 PartialFinal,类似于散列的方式,分两次聚合,相当于 hive 中两个入 reduce 操作。通过设置这三个参数,可以在大部分场景规避数据倾斜的问题。
智能化功能支持的另一个场景是资源配置。在进行实时 ETL 过程中,首先要定义 DDL,然后编写 SQL,之后需要进行资源配置。针对资源配置问题,菜鸟之前的方案是对每一个节点进行配置,包括并发量、是否会涉及消息乱序操作、CPU、内存等,一方面配置过程非常复杂,另一方面无法提前预知某些节点的资源消耗量。Flink 目前提供了较好的优化方案来解决该问题:
大促场景:该场景下,菜鸟会提前预估该场景下的 QPS,会将其配置到作业中并重启。重启后 Flink 会自动进行压测,测试该 QPS 每个节点所需要的资源。
日常场景:日常场景的 QPS 峰值可能远远小于大促场景,此时逐一配置 QPS 依然会很复杂。为此 Flink 提供了 AutoScaling 智能调优的功能,除了可以支持大促场景下提前设置 QPS 并压测获取所需资源,还可以根据上游下发的 QPS 的数据自动预估需要的资源。大大简化了资源配置的复杂度,使得开发人员可以更好地关注业务逻辑本身的开发。
四、数据服务的升级
在开发的过程中常用的数据库比较少,因此统一数据库连接标准是有必要的。我们开发的叫天工,它可以提供整个数据库统一的接入标准,提供统一的权限控制,提供统一的全链路的保障。
统一数据服务中间件
这个中间件将 SQL 作为 DSL,并且提供一些标准化的 HSF 的服务方式。作为菜鸟数据服务的践行者,天工也提供了一些非常贴近业务的,非常实用的功能,下面是几个案例。
对于 HBase 这种 NoSQL 数据库,BI 或者运营来说用代码来实现需求是比较困难的,所以开发天工的时候第一件事情就是把一些 NoSQL 转化成天工 SQL,包括我前面说的一个人员的表转化成一个二维表,这里是逻辑的转换,不是实际物理上的转化,大家通过运行这个 SQL,后台的中间件会自动转化成查询的语言,去查询后台的数据。
在开发数据产品的过程中,我们发现实时跟离线有时候分不开,比如有一个比较大的场景,需要统计实时 KPI 的完成率,它的分子是实际单量,分母是已经计划好的单量,数据源是来自两个部分:第一个部分来自已经做好的 KPI 的一个表,然后第二部分是一个实时计算出来的表。对于这种场景,之前我们是用 Java 去计算这两部分数据,然后在前端去运算,比较麻烦。现在通过天工 SQL 直接取这两部分数据关联,做到跨源数据的操作。
原来在整个服务的保障比较缺失,比如某个数据服务出了问题,我们直到运营反馈的时候才发现有问题,或者数据量比较大的时候,要去做限流和主备切换。
所以在数据服务的这一层中也把数据服务保障加到了天工的中间件里面。还有主备双活,将流量大的放在主库,流量适中的放在备库上。针对一些复杂的查询,在执行的时候很慢,我们会自动识别这些慢查询,然后进行阻断,等待资源充足后再执行,当然,也可通过添加白名单用户进行限流。上面这些功能在天工里面都有实现。
五、其他技术工具的探索和创新
除了前面讲的,我们在技术工具上也和阿里云计算平台的事业部进行了探索。每年遇到大促都要进行压测,大家要去启动数据,模拟大促流量,看看我们的实时作业能不能满足预期,比如有延迟,或者 QPS 过高,在原来我们会重启作业,然后把 source 和 sink 改成压测 source 和 sink,操作起来非常的麻烦。
后来我们做了一个实时的压测工具,可以做到一键启动所有重要的压测任务,并且会生成压测报告。我们只需要看压测本报告有没有满足我们的预期就行。基于 Flink 之后,我们开始做基于作业进度的监控,比如延迟监控、checkpoint 的监控、TPS 的预警等。
六、未来发展与思考
菜鸟目前在实时数仓方面更多的是基于 Flink 进行一系列功能的开发,未来的发展方向计划向批流混合以及 AI 方向演进。
Flink 提供了 batch 功能。菜鸟很多中小型的表分析不再导入到 Hbase 中, 而是在定义 source 的时候直接将 MaxCompute 的离线维表读到内存中,直接去做关联,如此一来很多操作不需要再进行数据同步的工作。
针对一些物流的场景。如果链路比较长,尤其是双十一支付的订单,在十一月十七号可能还存在未签收的情况,这时候如果发现作业中有一个错误,如果重启的话,作业的 State 将会丢失,再加之整个上游的 source 在 TT 中只允许保存三天,使得该问题的解决变得更加困难。
菜鸟之后发现 Flink 提供的 batch 功能可以很好地解决该问题,具体来讲是定义 TT 的 source,作为三天的实时场景的应用,TT 数据写到离线数据库进行历史数据备份,如果存在重启的情况,会读取并整合离线的数据,即使 Flink 的 state 丢失,因为离线数据的加入,也会生成新的 state,从而不必担心双十一的订单如果在十七号签收之前重启导致无法获取十一号的订单信息。
当然,在上述问题的解决上,菜鸟也踩了很多的小坑。其中的一个是整合实时数据和离线数据的时候,数据乱序的问题。菜鸟实现了一系列的 UDF 来应对该问题,比如实时数据和离线数据的读取优先级设置。
针对日志型的业务场景。比如曝光、网站流量等,其一条日志下来后,基本不会再发生变化。菜鸟目前在考虑将所有解析的工作交给 Flink 来处理,然后再写入到 batch 中,从而无需在 MaxCompute 的 ODPS 中进行批处理的操作。
在智能化方面。前面提到的数据倾斜隐患的规避、资源的优化等,都用到了 Flink 提供的智能化功能。
菜鸟也期望在实时 ETL 过程中的一些场景中,比如去重,也使用 Flink 相应的智能化解决方案来进行优化。此外,在数据服务保障上,如主备切换等,目前仍然依赖人工对数据库进行监控,菜鸟也期望 Flink 之后能提供全链路实时保障的策略。最后是业务场景的智能化,阿里 Alink 对于业务智能化的支持也是之后探索的方向。
近年来大数据技术发展迅猛,不断推陈出新抵抗新时代的海量数据处理挑战。但随着技术的更迭,每次演进都需要耗费大量的人力物力,传统的数据管理模式已摇摇欲坠,此时DataOps应运而生。9月11日来Gdevops全球敏捷运维峰会北京站一起看看联通大数据背后的DataOps体系建设:
《数据智能时代:构建能力开放的运营商大数据DataOps体系》中国联通大数据基础平台负责人/资深架构师 尹正军
此议题将讲述联通大数据背后的DataOps平台整体架构演进,包括数据采集交换加工过程、数据治理体系、数据安全管控、能力开放平台运营和大规模集群治理等核心实践内容。