Airflow实践 | 一款基于python的智能工作流引擎
【导语】目前拍拍贷有很多使用python代码编写的数据挖掘模型服务和数据处理调用任务。这些任务,往往过程复杂,运行时间很长。当任务数量较多时,无论运行还是监控都十分麻烦,我们需要一个工作流调度引擎来管理我们的工作。
- 【Airflow简介】 -
Airflow是一款Airbnb开源的基于python语言的工作流调度引擎,奉行 Configuration as code的哲学,所有的配置和工作流描述可以直接使用python代码编写,使其具有极大的灵活性和扩展性,界面友好,监控方便,非常适合用来管理我们的数据流任务。
在拍拍贷的日常使用中,为规范数据开发的工作流程, 我们对Airflow进行了二次开发。
1. 在已有的Sensor模块基础上,开发了针对模型前置条件检测功能的个性化Senor;
2. 综合了目前模型运行的数据处理特点,集成了基于个性化的SQL Opreator,其可通过简单配置就可以生成复杂的sql调度;
通过这些个性化模块的二次开发,我们只需要专注于业务逻辑,无需关注流程, 极大地提高了开发效率。
此外,扩展性方面,原生Airflow已经集成了基于Celery的消息调度系统,在系统应用出现性能瓶颈的情况下,可以帮助其快速、方便进行应用扩展。由于Airflow的源码即Python,固然对python语言的支持非常好,但在其他类型的应用上,也提供了多种插件和模块进行调用,例如面向多个平台的Operator:Amazon EMR,Azure,Databricks,此外对标准的Hadoop系组件也有对应的模块进行支持。
- 【Airflow安装】 -
Airflow使用python编写,安装十分方便,使用pip即可:
pip install apache-airflow
安装完之后, 需要初始化数据库:
airflow initdb
启动web服务器:
airflow webserver -p 8080
然后在浏览器中访问http://127.0.0.1:8080/admin/即可看到Airflow的web界面:
在生产环境部署时,需要增加额外的配置:
1. 修改airflow.cfg文件:
executor = LocalExecutor
sql_alchemy_conn = mysql://user:password@host:port/airflow
Airflow默认的executor是SequentialExecutor,不能并行执行任务,只能用于本地开发。LocalExecutor可以同时执行多个任务,加快速度。sql_alchemy_conn配置mysql数据库保存airflow的数据。
2. 使用supervisor启动airflow服务:
airflow webserver
airflow scheduler
supervisor能监控airflow服务,如果服务出错,可以自动重启服务。
- 【Airflow核心概念】 -
DAG,有向无环图(Directed Acyclic Graph),是airlfow最核心的概念,用来表示一系列任务之间的依赖关系,管理任务之间的调度。
DAG可以配置运行时间,运行的时间间隔, 如果失败了,可以配置决定是否发送监控邮件,以及重试次数等。通常一个DAG是dags目录下的一个.py文件。登录Airflow的web界面,可以查看DAG的依赖图:
通过调用自定义函数,可以完成绝大多数任务,Sensor语义很强大,它会每隔一段时间检查用户指定的条件,只有条件满足才会返回,否则会一直阻塞,直到超时返回。用户还可以自定义Operators,使得编写Airflow任务十分方便。
Tasks,每个任务都是Operators 的实例,表示DAG的节点。每个任务可以单独配置超时时长,失败后的重试次数,以及触发的规则。触发规则包含all_success,all_failed,all_done,one_failed,one_success等多个选项,由上游节点执行的状态来决定是否运行当前任务。
Task Instance,Task的一次执行,有多个可能的状态,包括running,success,failed,skipped等,表示执行是否成功。
- 【应用】 -
我们的模型大多数是从hive读取离线数据,生成变量表,然后将数据加载到本地,使用python做模型分析。可以将常用模型拆分成多个标准步骤:
1. 配置
我们会将常用的外部系统的连接信息保存在Airflow的Connections里面,包括mysql数据库,hive,ssh连接等。这样既可以隐藏密码等关键信息,又能方便管理,避免每个模型重复配置,如果要迁移数据库,修改用户名等操作也十分方便。当需要使用连接信息时,只需要调用相应的hook函数,传入conn_id即可。还有一些全局变量会存储在Airflow的Variable中,例如模型运行时间。当需要重跑模型时,只需要在web界面修改相应变量即可。
Airflow配置界面如图所示:
2. 检查依赖表
模型在跑sql之前会首先检查依赖表是否已更新。我们开发了用于检查依赖表的Sensor类,只需要配置需要检查的表名和日期,Airflow就会每隔一段时间检查相关表,如果条件满足,返回成功状态,否则会一直阻塞,知道满足条件,或者超过指定时间,返回失败状态, 并把检查结果写入Variable中,由后续任务处理。
3. 执行sql文件
模型最麻烦的步骤是预处理hive的数据,生成变量宽表。这主要靠执行sql完成。但是由于sql很多,执行很慢,导致耗时非常多。而且由于是调用外部系统,受很多因素影响,有一定概率的出错风险。我们自定义了 Operator,指定sql文件,即可运行相应sql任务,并可以设置重试次数和等待时长。这一步一般会拆分成多个任务,并发执行,这样会显著加快运行速度。如果任务失败,可以直接从失败的地方重跑,节约了大量时间。
4. 跑模型
这一步会将模型加载到本地,调用python的机器学习包进行处理。我们一般编写python函数,使用PythonOperator构建任务。
5. 存储结果
根据Airflow中Connection的配置,使用相应的hook函数,可以方便的将模型结果存储到hdfs,mysql数据库等地方。
6. 生成模型监控,并邮件发送监控结果。
7. 配置DAG
DAG可以方便的配置运行时间, 运行时间间隔,重试次数等。而采用 >> 操作符配置Operator之间的依赖关系,可以快速构建一个完整的DAG图。
当模型部署上线之后, 登录Airflow的web界面,可以很容易监控模型。Airflow界面功能十分强大,可以查看所有DAG的运行,针对单个DAG,可以查看节点构成,如果有节点运行失败,可以快速查看日志,定位报错原因,并且,只要清除报错日志,即可从报错点开始重跑,非常方便。
- 【优缺点】 -
Airflow最大的优点在于管理DAG十分方便。
我们最初是使用crontab管理模型。当模型数量增加之后,不好监控模型的报错情况。尤其当模型之间也有了相互依赖之后,一旦某个模型报错,会引起连锁反应,导致大量模型一起报错。只能用邮件监控运行情况,如果报错,必须从日志文件定位原因。补跑也很麻烦,需要手动修改配置,手动运行代码,而且每个模型都要从头开始跑,浪费时间。有了airflow之后,很容易就能监控每个模型的运行情况。
但Airflow的缺点也很明显,同一个DAG,不同任务之间交换数据十分不方便,少量数据交换可以通过Variable 和 Xcom交换, 大量数据的话,只能合并任务,这无疑增加了拆分任务的难度 。- 【总结】 -
Airflow功能很强大,目前我们已经有几十个模型迁移到了Airflow,运行稳定,监控方便,后续还会持续接入新的模型。
你可能还喜欢