vlambda博客
学习文章列表

B站基于AIFlow+Flink在批流融合调度上的实践

本期作者


张杨&王丁


bilibili资深开发工程师


负责B站实时团队flink引擎sql方向工作,专注于flinksql性能提升优化。同时也关注flink引擎在机器学习、数仓等场景的应用落地。



1. 背景


B站在机器学习方面有非常丰富的应用场景和工程实践,尤其是推荐、广告、搜索等业务,经过数年的积累,B站的AI团队已经形成了相当成熟和稳定的机器学习训练平台和实验平台。随着技术和业务的不断演进,目前AI团队的机器学习工程已经开始从离线逐步向实时方向发展。当前B站的实时平台团队基于Flink开发的流计算平台已经在AI团队中广泛使用,将推广搜场景的效果进一步提升。

但是与离线训练不同,实时训练和实验往往涉及到很多不同系统资源的整合,如Kafka、Flink以及其他KV存储等资源,这些相比离线资源更加难以统一管理。在现有的工程架构下,创建一个完整的AI实验流程非常复杂,需要了解多个不同系统的使用和调试,申请各种存储、计算资源。这些流程性的东西往往会消耗非常多的时间精力,导致聚焦在实验本身的时间被大大减少,同时在流程和数据的版本管控,结果可重复性,以及流批的相互迁移转化上也都难以满足业务。我们希望提供一套完整的实验流平台,从创建实验到最终的效果验证,都能在平台上通过简单的Python代码或者可视化配置完成,简化整个流程,提升实验效率,同时优化流程和数据的管理能力,具备基本的流批迁移能力。基于阿里开源的AIFlow项目,我们设计了B站的AI协作平台。


2. 基本架构


AIFlow是阿里Flink生态团队开源的一套机器学习工作流框架,对工作流模型进行了统一抽象,通过Event机制进行流批混合的调度。它通过把机器学习的工作流抽象成一个AIGraph,引入了数据依赖和控制依赖,能够从多个维度进行Event调度。尤其是AIFlow可以使用Python脚本定义工作流,类似于Apache Airflow,对于AI同学来说学习成本是比较低的,能够快速上手。

AIFlow项目在设计上保留了极大的拓展性,既可以直接兼容Apache Airflow的原生DAG任务,也可以根据Plugin接口自定义不同的任务插件,甚至可以根据插件接口自定义底层的调度器。同时,AIFlow也内置了很多常见的插件,比如存储插件:OSS、S3、HDFS等存储插件,不同类型的Job插件如:Python、Bash、Flink等。

在使用AIFlow时,可以直接部署好Server,各个Client直接在本机运行,在本机进行数据流开发,开发完成后直接提交Project和Workflow到Server,这样对于测试环境或者调试来说是非常方便的。如果有更加定制化的需求,可以自行添加各种所需的插件,甚至可以修改提交模式,将Client端也部署到远程服务器中,在Client之上进行业务封装。B站使用的就是后者,我们在Client端进行了二次开发,并设计了一套基于AIFlow的数据产品。

B站基于AIFlow开发的AI协作平台架构主要经历了三个阶段。第一阶段主要是验证阶段,在这个阶段我们和Flink生态社区团队一起,通过了业务化的改造,完成了AIFlow在B站的第一版设计和落地。第二阶段主要是数据产品形态的打磨和迭代,也包括了对新调度引擎的适配。第三个阶段是优化,将当前的系统进行一次改动比较大的改造,并能够适配社区的最新版本和后续的迭代。当前处于第三阶段,下面对每个阶段分别进行介绍。


2.1 第一阶段


机器学习的全链路包含多个阶段,从数据的采集,到样本的生成,再到特征生成、模型训练、预测,以及效果评估等。B站的这些任务最开始都是离线的,包括Spark、Hive任务等。将这些阶段的任务进行了实时化改造后,我们实时平台将相当一部分任务转为基于Flink的实时任务。但是对于全链路而言,每个阶段的任务配置都是相对独立的,新模型的实验和验收需要经过多个任务的不同实验阶段,这个链路是非常长的。而且对于算法同学而言,理解一条链路上的全部任务和实验克隆是有一定的学习成本的。同时,在流任务计算的上下游,或多或少还存着一定的批任务,它们的触发条件也和流计算任务是紧密关联。如何解决流任务和批任务的混合调度,以及如何以一个全局视角总览整个机器学习的全链路任务,成为了我们的当务之急。

在2020年8月份前后,AIFlow进入我们的视野中。在和社区经过了深入交流和探讨后,我们对AIFlow进行了相应的业务开发和改造,逐步在B站落地AIFlow。在落地过程中,我们针对AIFlow提供的特性,主要做了以下几个方面的工作。

首先就是结合B站内部的技术栈,完成了基于Spark和Flink的RemoteOperator。这个比较好理解,我们的任务都是在远程执行,所以每个Operator基本上都是在结合业务属性构造和提交远程任务,然后定时轮询远程任务的状态。这里有一个点,由于我们所有的服务都是部署在k8s的容器中的,所以本地Operator可能会遇到容器宕机或发布重启等各种失败场景,但是实际上远程任务依然是在正常运行的,所以我们在RemoteOperator里做了一层恢复机制,能够保证在local task重新调度时,远程任务不会重启或重复提交。同时,我们完善了一般业务场景下流批依赖语义,支持流批任务互相触发与数据依赖。

还有一块工作是针对快慢节点导致的特征穿越问题。尤其是在实时训练过程的Flink任务中,如果两个或多个任务互相关联,但是处理速度又不相同,很容易引起工作流中的特征穿越问题。那么针对这个问题,我们通过改造Flink,植入对应的aiflow定制operator,通过notification模块互相使不同的任务之间通过Watermark保持协同工作,控制进度。


下图是我们系统初版的架构图:

B站基于AIFlow+Flink在批流融合调度上的实践


2.2 第二阶段


在完成了AIFlow在B站实时平台的初步落地后,这套系统也逐步在AI场景中应用于在线的流计算任务。在使用一段时间后,业务方不断地沟通和反馈。我们开始了下一个阶段的设计和改造。这一阶段主要解决了一些性能、稳定性和产品设计方面的问题。

在第一阶段的落地过程中,我们确实是遇到了一些性能和稳定性问题。2021年2月份左右,随着Airflow2.0的发布,AIFlow社区团队对底层调度器进行了很大的改造,基于Airflow2.0的Scheduler实现了EventBasedScheduler,不管是调度的稳定性还是性能都大大提高,我们也跟随社区版本,对我们的系统进行了升级。

另一块比较重要的工作就是对数据产品的打磨。B站内部这套基于AIFlow设计的数据产品代号为Ultron。在解决了性能和稳定性问题后,我们对Ultron的产品形态进行了精细化的打磨以及快速迭代。比如,在和业务方多次沟通后,我们决定去掉工作流中关于业务信息和参数的定义,将业务属性彻底和工作流的定义分离,减少业务方与业务无关的开发量,把参数、任务属性等信息转移到前端页面上,通过页面填参即可实现快速调试和修改任务的能力。在这个过程中,我们也是向AIFlow社区贡献了B站展现带数据节点和数据依赖的工作流前端代码。

下图是系统的内部截图:

B站基于AIFlow+Flink在批流融合调度上的实践

除了工作流定义简化和前端展示方面的优化,我们还阵对工作流的版本管理做了优化工作。Ultron的版本管理包括工作流版本管理和节点版本管理。不管是工作流Owner修改了工作流中节点的定义、关系、依赖,还是某个节点的参数信息,都会在我们的系统里生成一个版本快照,方便AI同学在遇到问题或者实验回退时能够快速恢复到之前的一个可正常运行状态。

除了上面提到的版本管理,还有一些比如元数据和数据源管理、工作流监控、运维面板、节点预检查、权限管理等方面的工作。总体来说,在这个阶段,Ultron系统已经具备了标准的工程系统能力,具有了更加成熟的产品形态和产品功能,开始承载更多的业务方在机器学习工作流方面的需求。

下图是我们第二阶段时的系统架构图。其中蓝色区域内是AIFlow原生的架构,绿色区域是Ultron在AIFLow基础上增加的一些功能,紫色区域是我们和业务逻辑相关的一些系统,包括处理业务逻辑的WebServer和定制开发的Flink版本,以及B站的大数据平台Berserker平台。

Ultron系统的主要业务方包括AI、广告等部门。业务方在使用Ultron时,仅需要写很少的Python代码来定义一个DAG格式的数据流,定义数据源类型和节点之间的依赖关系。之后剩下就是纯页面化操作。目前我们使用内部的git进行项目管理,将一个数据流所需的资源和Python定义文件按照约定结构推送到git后,在我们的平台上以git链接的方式去初始化一个数据流,接下来就是在页面上配置各个节点和数据源相关的配置或参数等等。配置过程中,为了减少平台用户的繁琐操作,我们还打通了数据源和元数据系统的关系,直接获取对应数据源的schema,而无需手动填参定义。

配置好数据流后,平台用户可以一键拉起整条数据流,也可以在数据流运行过程中修改单个节点配置后重启这个节点,而无需对整个数据流进行重启操作。也可以很方便地一键操作复制整个数据流,形成一条新的实验流,极大减少了用户的时间成本和学习成本。

B站基于AIFlow+Flink在批流融合调度上的实践


2.3 第三阶段


从第二阶段的工作可以看出,我们对AIFlow有了比较深入的理解,相对应地,为了适配我们的业务,也是做了很多的开发工作,这其中也包括了一部分对AIFlow源码的魔改。我们和AIFlow社区团队也是保持了定期的交流,也是愈发发现我们魔改的AIFlow版本和社区近期release的版本差异越来越大。主要有几方面的原因:

1)我们在内部数据产品形态上花了比较长的时间,底层AIFlow相关的部分在稳定性和性能满足要求后大的改动比较少;

2)我们在第二阶段迭代时,依然使用了早期的架构设计,使工作流的提交、编译、部署等全部过程都在Server端实现,随着业务量的增长,这种单体式的系统设计不方便横向扩缩,存在着性能瓶颈的风险;

3)对AIFlow魔改过多,不方便底层AIFlow随着社区版本升级。


那么基于以上考虑,我们开始了第三阶段的工作,这也是我们目前正在进行的内容。

最核心的是,减少对AIFlow源码的魔改,所以我们决定对工作流引擎层进行重新设计,将编译工作流和调度工作流的过程分离,分为Client端和Server端。Client端负责与WebServer进行RPC交互,提供对外的访问接口,同时负责将业务信息翻译成AIFlow可识别的标准Project结构,也就是编译过程;Server端几乎与AIFlow完全一致,只是出于B站技术环境的因素,做了一些简单的容器化改造,比如使配置能适应服务发现,增加了几个BlobManager,将原本的RemoteOperator改造为标准的AIFlow可识别的Plugin等。这也得益于AIFlow社区同学对AIFlow不断地迭代,使得绝大部分与业务相关的功能都能够通过插件形式加载到系统中,而无需对源码做过多修改。这样改造后,Client端会彻底无状态化,可以方便扩缩容,承载更高的并发度,而Server端仅修改了极小部分的源码,后续版本升级也会非常方便。

下图是我们在第三阶段时的系统架构图。途中蓝色区域是AIFlow原生的组件和功能,绿色区域是我们在AIFlow提供的接口上做的Plugin拓展和自定义的一些业务代码翻译组件。紫色区域是数据产品层面的一些组件功能。

B站基于AIFlow+Flink在批流融合调度上的实践


3. 流批融合


我们在进行实时特征计算的时候,一个非常大的诉求就是特征回溯,对于长时间窗口的特征,业务是很难等到在累积到足够时间的数据之后才开始使用,因此希望使用离线的历史数据进行特征填充,但是离线的数据在hdfs上,基于hdfs有两种计算方案,第一种按照实时的逻辑开发一套离线特征的计算,专门用来做数据回补,第二种是直接把离线的hdfs数据,灌到实时计算的流程里面,但是需要解决时序问题,实时数据本身就隐含了时序特性。在考虑到换引擎开发任务的巨大成本,我们决定基于第二种方案来做特征回溯,流批数据使用一套计算逻辑。

B站基于AIFlow+Flink在批流融合调度上的实践

3.1 流批融合的问题


第一是如何保证数据的顺序性。实时数据有个隐含的语义就是数据是顺序进来的,生产出来立马处理,天然有一定的顺序性。但是离线的 HDFS 不是,HDFS 是有分区的,分区内的数据完全乱序,实际业务里面大量计算过程是依赖时序的,如何解决离线数据的乱序是一个很大的问题。

第二是如何保证特征和样本版本的一致性。比如有两条链路,一条是特征的生产,一条是样本生产,样本生产依赖特征生产,如何保证它们之间版本的一致性,没有穿越?

第三就是如何保证实时链路和回溯链路计算逻辑的一致?这个问题其实对我们来说不用担心,我们是直接在实时链路上回溯离线数据。

第四是一些性能方面的问题,怎么快速得算完大量的历史数据,这块我们在计算过程中做了大量的异步和预加载工作。


3.2 流批融合问题的解决方案


我们重点讲下上面问题1和问题2的解决方案。

针对第一个问题,为了数据的顺序性,我们 HDFS 的离线数据进行 kafka 化处理,这里不是把它灌到 kafka 里面去,而是模拟 kafka 的数据架构,分区并且分区内有序,我们把 HDFS 数据也处理成类似的架构,模拟成逻辑上的分区,并且逻辑分区内有序,Flink 读取的 hdfssource 也进行了对应的开发支持这种模拟的数据架构。这块的模拟计算目前是使用 spark 做的,后面我们会改成 Flink。

B站基于AIFlow+Flink在批流融合调度上的实践

第二个问题分为两部分,第一个实时特征部分的解决依赖于 Hbase 存储,Hbase 支持根据版本查询。特征计算完后直接按照版本写入 Hbase,样本生成的时候去查 Hbase 带上对应的版本号即可,这里面的版本通常是数据时间。

第二个离线特征部分,因为不需要重新计算了,离线存储 hdfs 都有,但是不支持点查,这块进行 kv 化处理就好,为了性能我们做了异步预加载。

B站基于AIFlow+Flink在批流融合调度上的实践


4. 展望


基础架构上我们会继续在基于AIFlow的实验流平台上继续努力,解决当前依然存在的几个问题:

1)目前受限于Server端底层使用了基于Airflow2.0改造的EventSchedulerJob,所以在高可用方面也是收到了数据库锁的影响,目前尚不支持。不过AIFlow社区已经在开发改造中,估计下一个release版本就可以支持调度器的高可用特性。

2)我们另外考虑的一个比较重要的点是,希望可以通过AIFlow无缝兼容Airflow2.0的任务。在B站内部目前还有若干个Airflow调度的集群,大部分场景是和机器学习都是有关联的,随着流批一体的逐步成熟,我们在AIFlow中看到了可以兼容Airflow2.0的可能性,当前我们正在推进原生Airflow调度任务向2.0兼容的版本迁移,在后续第三阶段的工作内容中,我们也会不断推进AIFlow和Airflow工作流的融合进程。

3)后续我们希望拓展AIFlow在pyFlink和Alink方面的应用,增加更丰富的流式机器学习算子,也会积极拓展Ultron在特征管理等方面的能力。


流批融合上我们会继续探索flink batch能力在特征回溯上的应用,做到真正的流批一体,在batch的基础上也能提升特征回溯的效率。

B站目前基于AIFlow的机器学习工作流平台已经在线上承接了AI部门推荐相关的大部分流式计算任务,广告相关的业务也在推进中。从目前来看,AIFlow对流批一体的支持比较好,对于流触发批,批触发流,流到流,批到批都做到了比较好的抽象。后续我们也会不断探索新的解决方案来满足我们在机器学习方面的一些典型场景。


点击阅读原文或以下链接,获取更多 AIFlow 相关资料:https://github.com/flink-extended/ai-flow


钉钉扫描下方二维码加入 AIFlow 用户钉钉交流群,了解框架最新功能及相关原理。

B站基于AIFlow+Flink在批流融合调度上的实践