Apache Airflow 任务中使用模板或上下文

本文大概记录一下在 Apache Airflow 的 Task 或 Operator(这两个基本是同一概念) 中如何使用 模板(Template) 和上下文(Context). Airflow 的模板引擎用的  Jinja Template, 它也被 FastAPI 和 Flask 所采纳。首先只有构造 Operator 时的参数或参数指定的文件内容中,或者调用 Operator render_template() 方法才能用模板语法,像 {{ ds }}.  Apache Airflow 有哪些模板变量可用请参考: Templates references / Variables, 本文将会打印出一个 Task 的 context 变量列出所有可用的上下文变量, 不断的深入,最后在源代码找到相关的定义。

通过使用模板或上下文,我们能能够在任务中使用到 Apache Airflow 一些内置的变量值,如 DAG 或任务当前运行时的状态等 。

当我们手动触发一个 DAG 时, 在 Configuration JSON 中输入的参数也能在 context 中找到. 所有的 Operator  继承链可追溯到 AbstractOperator -> Templater, 因此所有的 Operator(Task) 都能通过调用 Templater.render_template() 方法对模板进行渲染,该方法的原型是

留意在 Templater 中定义的两个变量

template_fields 定义了在构造 Operator 实例时哪些参数支持模板解析,template_ext 指定规定什么扩展名对应的文件内容中的模板变量可被解析。

以 BashOperator 为例,它定义了

  • 而 template_fields_renderers: ClassVar[dict[str, str]] = {} 定义在 airflow.sdk.bases.operator 中,可看出它是 template_fields 和 template_ext 综合体,key 为支持模板的参数,value 为支持模板的文件扩展名。

bash_command, env, cwd 参数中的模板变量会被解析, 如果 bash_command 中指定的是一个脚本文件 *.sh 或 *.bash,那么在该脚本文件中的模板变量也将被替换,如

或者 @task.bash 的使用

同样查看常用的 PythonOperator 中定义

从中我们理解到 PythonOperator 的什么参数支持模板,什么参数支持从文件中读取内容,并支持模板变量解析, 其他的 Operator 类似。

最后我们来写一个简单的 DAG,演示了参数中使用模板,并在手动触发时指定 Configuration JSON 内容, 任务中打印出整个 context

Apache Airflow 的 DAG 声明有多种方式,1) with 上下文管理器 2) @dag, 相应的 @task, @task.bash 等装饰器(Airflow 2.0 起), 还有动态的或模板化的,本人倾向于使用 with DAG(...) 的方式。

保存到 Apache Airflow 的 dags 目录,手动触发执行,并在触发填入 Configuration JSON 内容

执行后 step_1 中看到相关日志输出 

[INFO - 2025-05-21source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"

step_2 中看到相关日志输出

3[INFO - run_id: manual__2025-05-21T02:47:06.482473+00:00chan="stdout"source="task"
4[INFO - task_instance_key='hello_dag__step2__20250521'chan="stdout"source="task"
5[INFO - try_number: 1chan="stdout"source="task"

单列出 print(f'context: {json.dumps(context, default=str)}') 的输出进行格式化后如下

从中看到 context["task"] 是一个 Task(Operator), 所以能对它进行调用 render_template() 方法。我们在触发 DAG 时的输入出现在

利用这一 Configuration JSON 功能使得在手工触发 DAG 可依照我们的意愿执行相应的分支,比如说任务中通过检查当天为周日时执行的分支,通过预设代码逻辑在手工触发时用 {"force": true} 也能执行,这样就不需要修改原有 DAG 代码。 

如果我们的 Task 中不需获取到整个 context,  然后从中获得 task_instance, dag_run 或其他明确参数的话,经过测试证明 Apache Airflow 的 dag_processor 在获取 Task 的参数信息后最后是通过参数名对参数进行映射的,比如我们定义如下的 _step2() 函数

触发,看到相应的的输出

[INFO - task_instance: id=UUID('0196f0dd-987f-748a-b8de-812251470d2d') task_id='step2' dag_id='hello_dag' run_id='manual__2025-05-21T03:22:56.492907+00:00' try_number=1 map_index=-1 hostname='b4fdb75c6566' context_carrier=None task=<Task(PythonOperator): step2> bundle_instance=LocalDagBundle(name=dags-folder) max_tries=0 start_date=datetime.datetime(2025, 5, 21, 3, 22, 59, 785081, tzinfo=TzInfo(UTC)) end_date=None state=<TaskInstanceState.RUNNING: 'running'> is_mapped=False rendered_map_index=Nonechan="stdout"source="task"
3[INFO - dag_run: dag_id='hello_dag' run_id='manual__2025-05-21T03:22:56.492907+00:00' logical_date=datetime.datetime(2025, 5, 21, 3, 22, 51, 705000, tzinfo=TzInfo(UTC)) data_interval_start=datetime.datetime(2025, 5, 21, 3, 22, 51, 705000, tzinfo=TzInfo(UTC)) data_interval_end=datetime.datetime(2025, 5, 21, 3, 22, 51, 705000, tzinfo=TzInfo(UTC)) run_after=datetime.datetime(2025, 5, 21, 3, 22, 51, 705000, tzinfo=TzInfo(UTC)) start_date=datetime.datetime(2025, 5, 21, 3, 22, 57, 515261, tzinfo=TzInfo(UTC)) end_date=None clear_number=0 run_type=<DagRunType.MANUAL: 'manual'> conf={'key1': 'this is from Configuration JSON'} consumed_asset_events=[]chan="stdout"source="task"
4[INFO - inlet_events: InletEventsAccessors(_inlets=[], _assets={}, _asset_aliases={})chan="stdout"source="task"
5[INFO - my_param='hello world'chan="stdout"source="task"

说明 Apache Airflow 的 dag_processor 提取 task 参数时是按名称对应的,看到参数名为 inlet_events, 执行任务时传入与之相应的值。

最后一个终极测试,修改 _step2() 函数为

用 *args, **kwars 接收到全部传入的参数,同样的测试, 把输出进行 JSON 格式化,就像

"args": [] 为空是因为没有用 op_args 向 PythonOperator 传递参数,这里基本上与最初的 **context 输出类似,只多包含了 my_param 自定义的参数,有了这个参数列表后,task 函数就可以针对性的定义参数名,如想要获取 Configuration JSON 的输入,只要定义

但是却不能定义一个名为 context 的参数,如

这样定义的 context 参数只是一个普通的像 my_param 那样的自定义参数,必须用 op_args 或 op_kwargs 给它传递值。

原理上是因为 Python 会用下面的方式去调用 python_callable 对应的方法

_step2(conf=..., dag=..., dag_run..., data_interval_start=.., ...)

而假如定义方法 _step2(data_interval_start, **context), 那么参数名对应到的 data_interval_start 参数直接获得值,其余则全放到 context 中。

这个参数列表定义在 airflow.sdk.definitions.context.Context

在 Airflow UI 中,运行后的 Dag 步骤,可查看到 Rendered Templates, 其中有渲染的模板值,如果显示

 

或者在开发过程中,DAG 无需运行就能用 airflow 命令来查看模板渲染结果,命令为

airflow tasks render [dag id] [task id] [desired execution date(例: 2025-04-24T22:00:00)]

总结:

本文由一个简单的无参 Task 逐步探索了以下问题

  1. Apache Airflow 如何使用模板
  2. 在何处可使用模板,Operator 的参数中,指为参数指定文件中
  3. 如何获得所有任务参数
  4. 如何选择性的定义所需的任务参数,只需要定义 airflow.sdk.definitions.context.Context 中列出的变量对应名称的参数

对于主要的两方面的内容未进行深入

  1. 怎样在模板中使用 filter, 即 {{ data_interval_end | ds }} 中管道符后的 ds
  2. 如何传递 jinja_env 参数

最后赞叹一句,Apache Airflow 代码写得很美。

本文链接 https://yanbin.blog/apache-airflow-task-use-template-context/, 来自 隔叶黄莺 Yanbin Blog

[版权声明] Creative Commons License 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。

Subscribe
Notify of
guest

0 Comments
Inline Feedbacks
View all comments