vlambda博客
学习文章列表

工作流引擎 Apache Airflow 2.3.0 重磅发布!

我们很自豪地宣布 Apache Airflow 2.3.0 已经发布。   


Apache Airflow 2.3.0 包含自 2.2.0 以来的 700 多个提交,包括 50 个新功能、99 个改进、85 个错误修复和几个文档更改。



详情:


📦 PyPI:https://pypi.org/project/apache-airflow/2.3.0/ 


📚 文档:https://airflow.apache.org/docs/apache-airflow/2.3.0/ 


🛠️ 发版说明:

https://airflow.apache.org/docs/apache-airflow/2.3.0/release_notes.html

🐳 Docker 镜像:docker pull apache/airflow:2.3.0

🚏 约束:https://github.com/apache/airflow/tree/constraints-2.3.0


由于变更日志非常大,以下是此版本中提供的一些值得注意的新功能。


动态任务映射(AIP-42) 


现在,Airflow 中的动态任务具有一流的支持。这意味着您可以在运行时动态生成任务。就像使用for循环创建任务列表一样,您可以在这里创建相同的任务,而无需提前知道任务的确切数量。

您可以task生成要迭代的列表,这在循环中是不可能的for。

这是一个例子:


@taskdef make_list(): # This can also be from an API call, checking a database, -- almost anything you like, as long as the # resulting list/dictionary can be stored in the current XCom backend. return [1, 2, {"a": "b"}, "str"]

@taskdef consumer(arg): print(list(arg))

with DAG(dag_id="dynamic-map", start_date=datetime(2022, 4, 2)) as dag: consumer.expand(arg=make_list())


更多信息可以在这里找到:动态任务映射


网格视图替换树视图


网格视图取代了 Airflow 2.3.0 中的树视图。

截图:




从元数据数据库中清除历史记录 


Airflow 2.3.0 引入了一个新airflow db clean命令,可用于从元数据数据库中清除旧数据。

如果您想减小元数据数据库的大小,您可能需要使用此命令。

可以在此处找到更多信息:从元数据数据库中清除历史记录


LocalKubernetesExecutor 


有一个名为 LocalKubernetesExecutor 的新执行器。该执行器可帮助您使用 LocalExecutor 运行一些任务,并根据任务的队列在同一部署中使用 KubernetesExecutor 运行另一组任务。

更多信息可以在这里找到:LocalKubernetesExecutor


DagProcessorManager 作为独立进程 (AIP-43) 


从 2.3.0 开始,您可以将 DagProcessorManager 作为独立进程运行。因为 DagProcessorManager 运行用户代码,所以将其与调度程序进程分离并作为独立进程在不同主机中运行是一个好主意。

cli 命令将启动一个新进程,该airflow dag-processor进程将在单独的进程中运行 DagProcessorManager。在您可以将 DagProcessorManager 作为独立进程运行之前,您需要将[scheduler] Standalone_dag_processor设置为True.

可以在此处找到更多信息:dag-processor CLI 命令


连接的 JSON 序列化 


您现在可以使用json序列化格式创建连接。


airflow connections add 'my_prod_db' \ --conn-json '{ "conn_type": "my-conn-type", "login": "my-login", "password": "my-password", "host": "my-host", "port": 1234, "schema": "my-schema", "extra": { "param1": "val1", "param2": "val2" } }'


json在环境变量中设置连接时也可以使用序列化格式。

可以在此处找到更多信息:连接的 JSON 序列化


Airflow db downgrade和离线生成 SQL 脚本 


Airflow 2.3.0 引入了一个新命令airflow db downgrade,可以将数据库降级到您选择的版本。

您还可以为您的数据库生成降级/升级 SQL 脚本并针对您的数据库手动运行它,或者只查看将由降级/升级命令运行的 SQL 查询。

更多信息可以在这里找到:Airflow db downgrade和 SQL 脚本的离线生成


重用装饰任务 


您现在可以在 dag 文件中重用修饰任务。装饰任务有一个override方法可以让你覆盖它的参数。

这是一个例子:


@taskdef add_task(x, y): print(f"Task args: x={x}, y={y}") return x + y



@dag(start_date=datetime(2022, 1, 1))def mydag(): start = add_task.override(task_id="start")(1, 2) for i in range(3): start >> add_task.override(task_id=f"add_start_{i}")(start, i)


更多信息可以在这里找到:装饰 DAG 的重用


其他小功能 


这不是一个完整的列表,但一些值得注意或有趣的小功能包括:

  • Support different timeout value for dag file parsing

  • airflow dags reserialize command to reserialize dags

  • Events Timetable

  • SmoothOperator - Operator that does literally nothing except logging a YouTube link to Sade’s “Smooth Operator”. Enjoy!


Contributors 

Thanks to everyone who contributed to this release: Ash Berlin-Taylor, Brent Bovenzi, Daniel Standish, Elad, Ephraim Anierobi, Jarek Potiuk, Jed Cunningham, Josh Fell, Kamil Breguła, Kanthi, Kaxil Naik, Khalid Mammadov, Malthe Borch, Ping Zhang, Tzu-ping Chung and many others who keep making Airflow better for everyone.