Apache Airflow 任务中使用模板或上下文

本文大概记录一下在 Apache Airflow 的 Task 或 Operator(这两个基本是同一概念) 中如何使用 模板(Template) 和上下文(Context). Airflow 的模板引擎用的  Jinja Template, 它也被 FastAPI 和 Flask 所采纳。首先只有构造 Operator 时的参数或参数指定的文件内容中,或者调用 Operator render_template() 方法才能用模板语法,像 {{ ds }}.  Apache Airflow 有哪些模板变量可用请参考: Templates references / Variables, 本文将会打印出一个 Task 的 context 变量列出所有可用的上下文变量, 不断的深入,最后在源代码找到相关的定义。


通过使用模板或上下文,我们能能够在任务中使用到 Apache Airflow 一些内置的变量值,如 DAG 或任务当前运行时的状态等 。

当我们手动触发一个 DAG 时, 在 Configuration JSON 中输入的参数也能在 context 中找到. 所有的 Operator  继承链可追溯到 AbstractOperator -> Templater, 因此所有的 Operator(Task) 都能通过调用 Templater.render_template() 方法对模板进行渲染,该方法的原型是
1    def render_template(
2        self,
3        content: Any,
4        context: Context,
5        jinja_env: jinja2.Environment | None = None,
6        seen_oids: set[int] | None = None,
7    )

留意在 Templater 中定义的两个变量
1    # For derived classes to define which fields will get jinjaified.
2    template_fields: Collection[str]
3    # Defines which files extensions to look for in the templated fields.
4    template_ext: Sequence[str]

template_fields 定义了在构造 Operator 实例时哪些参数支持模板解析,template_ext 指定规定什么扩展名对应的文件内容中的模板变量可被解析。

以 BashOperator 为例,它定义了
1    template_fields: Sequence[str] = ("bash_command", "env", "cwd")
2    template_fields_renderers = {"bash_command": "bash", "env": "json"}
3    template_ext: Sequence[str] = (".sh", ".bash")
  • 而 template_fields_renderers: ClassVar[dict[str, str]] = {} 定义在 airflow.sdk.bases.operator 中,可看出它是 template_fields 和 template_ext 综合体,key 为支持模板的参数,value 为支持模板的文件扩展名。

bash_command, env, cwd 参数中的模板变量会被解析, 如果 bash_command 中指定的是一个脚本文件 *.sh 或 *.bash,那么在该脚本文件中的模板变量也将被替换,如
1step1 = BashOperator(
2    task_id="step_1",
3    bash_command="echo {{ data_interval_start | ds }}"
4)

或者 @task.bash 的使用
1    @task.bash
2    def bash_example():
3        return "test.sh "
4        # return "echo {{ ds }}"   # or

同样查看常用的 PythonOperator 中定义
1    template_fields: Sequence[str] = ("templates_dict", "op_args", "op_kwargs")
2    template_fields_renderers = {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"}

从中我们理解到 PythonOperator 的什么参数支持模板,什么参数支持从文件中读取内容,并支持模板变量解析, 其他的 Operator 类似。

最后我们来写一个简单的 DAG,演示了参数中使用模板,并在手动触发时指定 Configuration JSON 内容, 任务中打印出整个 context
 1from datetime import datetime
 2import pendulum
 3import json
 4from airflow.sdk import DAG
 5from airflow.providers.standard.operators.bash import BashOperator
 6from airflow.providers.standard.decorators.python import PythonOperator
 7
 8
 9def _step2(task_instance_key, **context):
10    print(f'context: {json.dumps(context, default=str)}')
11    print(f'run_id: {context['run_id']}')
12    print(f"{task_instance_key=}")
13    print(context["task"].render_template("try_number: {{ ti.try_number }}", context))
14
15with DAG(
16        dag_id="hello_dag",
17        schedule="@daily",
18        start_date=pendulum.today(),
19):
20    step1 = BashOperator(
21        task_id="step_1",
22        bash_command="echo {{data_interval_start | ds}}"
23    )
24
25    step2 = PythonOperator(
26        task_id="step2",
27        python_callable = _step2,
28        op_args=("{{ task_instance_key_str }}",)
29    )
30
31    step1 >> step2

Apache Airflow 的 DAG 声明有多种方式,1) with 上下文管理器 2) @dag, 相应的 @task, @task.bash 等装饰器(Airflow 2.0 起), 还有动态的或模板化的,本人倾向于使用 with DAG(...) 的方式。

保存到 Apache Airflow 的 dags 目录,手动触发执行,并在触发填入 Configuration JSON 内容
1{"key1": "this is from Configuration JSON"}

执行后 step_1 中看到相关日志输出 
[INFO - 2025-05-21source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
step_2 中看到相关日志输出
3[INFO - run_id: manual__2025-05-21T02:47:06.482473+00:00chan="stdout"source="task"
4[INFO - task_instance_key='hello_dag__step2__20250521'chan="stdout"source="task"
5[INFO - try_number: 1chan="stdout"source="task"
单列出 print(f'context: {json.dumps(context, default=str)}') 的输出进行格式化后如下
 1{
 2  "dag": "<DAG: hello_dag>",
 3  "inlets": [],
 4  "map_index_template": null,
 5  "outlets": [],
 6  "run_id": "manual__2025-05-21T02:47:06.482473+00:00",
 7  "task": "<Task(PythonOperator): step2>",
 8  "task_instance": "id=UUID('0196f0bc-ca09-7b6e-b0ca-fd3508c84372') task_id='step2' dag_id='hello_dag' run_id='manual__2025-05-21T02:47:06.482473+00:00' try_number=1 map_index=-1 hostname='b4fdb75c6566' context_carrier=None task=<Task(PythonOperator): step2> bundle_instance=LocalDagBundle(name=dags-folder) max_tries=0 start_date=datetime.datetime(2025, 5, 21, 2, 47, 9, 584053, tzinfo=TzInfo(UTC)) end_date=None state=<TaskInstanceState.RUNNING: 'running'> is_mapped=False rendered_map_index=None",
 9  "ti": "id=UUID('0196f0bc-ca09-7b6e-b0ca-fd3508c84372') task_id='step2' dag_id='hello_dag' run_id='manual__2025-05-21T02:47:06.482473+00:00' try_number=1 map_index=-1 hostname='b4fdb75c6566' context_carrier=None task=<Task(PythonOperator): step2> bundle_instance=LocalDagBundle(name=dags-folder) max_tries=0 start_date=datetime.datetime(2025, 5, 21, 2, 47, 9, 584053, tzinfo=TzInfo(UTC)) end_date=None state=<TaskInstanceState.RUNNING: 'running'> is_mapped=False rendered_map_index=None",
10  "outlet_events": "OutletEventAccessors(_dict={})",
11  "inlet_events": "InletEventsAccessors(_inlets=[], _assets={}, _asset_aliases={})",
12  "macros": "<MacrosAccessor (dynamic access to macros)>",
13  "params": {
14    "key1": "this is from Configuration JSON"
15  },
16  "var": {
17    "json": "<VariableAccessor (dynamic access)>",
18    "value": "<VariableAccessor (dynamic access)>"
19  },
20  "conn": "<ConnectionAccessor (dynamic access)>",
21  "dag_run": "dag_id='hello_dag' run_id='manual__2025-05-21T02:47:06.482473+00:00' logical_date=datetime.datetime(2025, 5, 21, 2, 45, 20, 594000, tzinfo=TzInfo(UTC)) data_interval_start=datetime.datetime(2025, 5, 21, 2, 45, 20, 594000, tzinfo=TzInfo(UTC)) data_interval_end=datetime.datetime(2025, 5, 21, 2, 45, 20, 594000, tzinfo=TzInfo(UTC)) run_after=datetime.datetime(2025, 5, 21, 2, 45, 20, 594000, tzinfo=TzInfo(UTC)) start_date=datetime.datetime(2025, 5, 21, 2, 47, 7, 252201, tzinfo=TzInfo(UTC)) end_date=None clear_number=0 run_type=<DagRunType.MANUAL: 'manual'> conf={'key1': 'this is from Confguration JSON'} consumed_asset_events=[]",
22  "triggering_asset_events": "TriggeringAssetEventAccessor(_events=defaultdict(<class 'list'>, {}))",
23  "task_instance_key_str": "hello_dag__step2__20250521",
24  "task_reschedule_count": 0,
25  "prev_start_date_success": "2025-05-21 01:43:44.010945+00:00",
26  "prev_end_date_success": "2025-05-21 01:43:49.252632+00:00",
27  "logical_date": "2025-05-21 02:45:20.594000+00:00",
28  "ds": "2025-05-21",
29  "ds_nodash": "20250521",
30  "ts": "2025-05-21T02:45:20.594000+00:00",
31  "ts_nodash": "20250521T024520",
32  "ts_nodash_with_tz": "20250521T024520.594000+0000",
33  "data_interval_end": "2025-05-21 02:45:20.594000+00:00",
34  "data_interval_start": "2025-05-21 02:45:20.594000+00:00",
35  "prev_data_interval_start_success": "2025-05-21 01:43:39.342000+00:00",
36  "prev_data_interval_end_success": "2025-05-21 01:43:39.342000+00:00",
37  "templates_dict": null
38}

从中看到 context["task"] 是一个 Task(Operator), 所以能对它进行调用 render_template() 方法。我们在触发 DAG 时的输入出现在
1"params": {
2    "key1": "this is from Configuration JSON"
3}

利用这一 Configuration JSON 功能使得在手工触发 DAG 可依照我们的意愿执行相应的分支,比如说任务中通过检查当天为周日时执行的分支,通过预设代码逻辑在手工触发时用 {"force": true} 也能执行,这样就不需要修改原有 DAG 代码。 

如果我们的 Task 中不需获取到整个 context,  然后从中获得 task_instance, dag_run 或其他明确参数的话,经过测试证明 Apache Airflow 的 dag_processor 在获取 Task 的参数信息后最后是通过参数名对参数进行映射的,比如我们定义如下的 _step2() 函数
1def _step2(task_instance: TaskInstance, dag_run: DagRun, inlet_events, my_param):
2    print(f"task_instance: {task_instance}")
3    print(f"dag_run: {dag_run}")
4    print(f"inlet_events: {inlet_events}")
5    print(f"{my_param=}")

触发,看到相应的的输出
[INFO - task_instance: id=UUID('0196f0dd-987f-748a-b8de-812251470d2d') task_id='step2' dag_id='hello_dag' run_id='manual__2025-05-21T03:22:56.492907+00:00' try_number=1 map_index=-1 hostname='b4fdb75c6566' context_carrier=None task=<Task(PythonOperator): step2> bundle_instance=LocalDagBundle(name=dags-folder) max_tries=0 start_date=datetime.datetime(2025, 5, 21, 3, 22, 59, 785081, tzinfo=TzInfo(UTC)) end_date=None state=<TaskInstanceState.RUNNING: 'running'> is_mapped=False rendered_map_index=Nonechan="stdout"source="task"
3[INFO - dag_run: dag_id='hello_dag' run_id='manual__2025-05-21T03:22:56.492907+00:00' logical_date=datetime.datetime(2025, 5, 21, 3, 22, 51, 705000, tzinfo=TzInfo(UTC)) data_interval_start=datetime.datetime(2025, 5, 21, 3, 22, 51, 705000, tzinfo=TzInfo(UTC)) data_interval_end=datetime.datetime(2025, 5, 21, 3, 22, 51, 705000, tzinfo=TzInfo(UTC)) run_after=datetime.datetime(2025, 5, 21, 3, 22, 51, 705000, tzinfo=TzInfo(UTC)) start_date=datetime.datetime(2025, 5, 21, 3, 22, 57, 515261, tzinfo=TzInfo(UTC)) end_date=None clear_number=0 run_type=<DagRunType.MANUAL: 'manual'> conf={'key1': 'this is from Configuration JSON'} consumed_asset_events=[]chan="stdout"source="task"
4[INFO - inlet_events: InletEventsAccessors(_inlets=[], _assets={}, _asset_aliases={})chan="stdout"source="task"
5[INFO - my_param='hello world'chan="stdout"source="task"
说明 Apache Airflow 的 dag_processor 提取 task 参数时是按名称对应的,看到参数名为 inlet_events, 执行任务时传入与之相应的值。

最后一个终极测试,修改 _step2() 函数为
1def _step2(*args, **kwargs):
2    frame = inspect.currentframe()
3    args_info = inspect.getargvalues(frame)
4    print("All input arguments:")
5    print(json.dumps(args_info.locals, default=str))

用 *args, **kwars 接收到全部传入的参数,同样的测试, 把输出进行 JSON 格式化,就像
 1{
 2  "args": [],
 3  "kwargs": {
 4    "dag": "<DAG: hello_dag>",
 5    "inlets": [],
 6    "map_index_template": null,
 7    "outlets": [],
 8    "run_id": "manual__2025-05-21T03:31:00.855955+00:00",
 9    "task": "<Task(PythonOperator): step2>",
10    "task_instance": "id=UUID('0196f0e4-fc8b-745f-a08b-1622f4d94127') task_id='step2' dag_id='hello_dag' run_id='manual__2025-05-21T03:31:00.855955+00:00' try_number=1 map_index=-1 hostname='b4fdb75c6566' context_carrier=None task=<Task(PythonOperator): step2> bundle_instance=LocalDagBundle(name=dags-folder) max_tries=0 start_date=datetime.datetime(2025, 5, 21, 3, 31, 3, 667696, tzinfo=TzInfo(UTC)) end_date=None state=<TaskInstanceState.RUNNING: 'running'> is_mapped=False rendered_map_index=None",
11    "ti": "id=UUID('0196f0e4-fc8b-745f-a08b-1622f4d94127') task_id='step2' dag_id='hello_dag' run_id='manual__2025-05-21T03:31:00.855955+00:00' try_number=1 map_index=-1 hostname='b4fdb75c6566' context_carrier=None task=<Task(PythonOperator): step2> bundle_instance=LocalDagBundle(name=dags-folder) max_tries=0 start_date=datetime.datetime(2025, 5, 21, 3, 31, 3, 667696, tzinfo=TzInfo(UTC)) end_date=None state=<TaskInstanceState.RUNNING: 'running'> is_mapped=False rendered_map_index=None",
12    "outlet_events": "OutletEventAccessors(_dict={})",
13    "inlet_events": "InletEventsAccessors(_inlets=[], _assets={}, _asset_aliases={})",
14    "macros": "<MacrosAccessor (dynamic access to macros)>",
15    "params": {
16      "key1": "this is from Configuration JSON"
17    },
18    "var": {
19      "json": "<VariableAccessor (dynamic access)>",
20      "value": "<VariableAccessor (dynamic access)>"
21    },
22    "conn": "<ConnectionAccessor (dynamic access)>",
23    "dag_run": "dag_id='hello_dag' run_id='manual__2025-05-21T03:31:00.855955+00:00' logical_date=datetime.datetime(2025, 5, 21, 3, 30, 42, 564000, tzinfo=TzInfo(UTC)) data_interval_start=datetime.datetime(2025, 5, 21, 3, 30, 42, 564000, tzinfo=TzInfo(UTC)) data_interval_end=datetime.datetime(2025, 5, 21, 3, 30, 42, 564000, tzinfo=TzInfo(UTC)) run_after=datetime.datetime(2025, 5, 21, 3, 30, 42, 564000, tzinfo=TzInfo(UTC)) start_date=datetime.datetime(2025, 5, 21, 3, 31, 1, 283439, tzinfo=TzInfo(UTC)) end_date=None clear_number=0 run_type=<DagRunType.MANUAL: 'manual'> conf={'key1': 'this is from Configuration JSON'} consumed_asset_events=[]",
24    "triggering_asset_events": "TriggeringAssetEventAccessor(_events=defaultdict(<class 'list'>, {}))",
25    "task_instance_key_str": "hello_dag__step2__20250521",
26    "task_reschedule_count": 0,
27    "prev_start_date_success": "2025-05-21 03:29:02.033893+00:00",
28    "prev_end_date_success": "2025-05-21 03:29:07.004639+00:00",
29    "logical_date": "2025-05-21 03:30:42.564000+00:00",
30    "ds": "2025-05-21",
31    "ds_nodash": "20250521",
32    "ts": "2025-05-21T03:30:42.564000+00:00",
33    "ts_nodash": "20250521T033042",
34    "ts_nodash_with_tz": "20250521T033042.564000+0000",
35    "data_interval_end": "2025-05-21 03:30:42.564000+00:00",
36    "data_interval_start": "2025-05-21 03:30:42.564000+00:00",
37    "prev_data_interval_start_success": "2025-05-21 03:28:56.534000+00:00",
38    "prev_data_interval_end_success": "2025-05-21 03:28:56.534000+00:00",
39    "my_param": "hello world",
40    "templates_dict": null
41  },
42  "frame": "<frame at 0x7f4f4b1790e0, file '/opt/airflow/dags/hello_dag.py', line 25, code _step2>"
43}

"args": [] 为空是因为没有用 op_args 向 PythonOperator 传递参数,这里基本上与最初的 **context 输出类似,只多包含了 my_param 自定义的参数,有了这个参数列表后,task 函数就可以针对性的定义参数名,如想要获取 Configuration JSON 的输入,只要定义
1def _step2(params: dict[str, Any]):
2   print(params["key1"])

但是却不能定义一个名为 context 的参数,如
1def _step2(context):
2   print(context)

这样定义的 context 参数只是一个普通的像 my_param 那样的自定义参数,必须用 op_args 或 op_kwargs 给它传递值。

原理上是因为 Python 会用下面的方式去调用 python_callable 对应的方法
_step2(conf=..., dag=..., dag_run..., data_interval_start=.., ...)
而假如定义方法 _step2(data_interval_start, **context), 那么参数名对应到的 data_interval_start 参数直接获得值,其余则全放到 context 中。

这个参数列表定义在 airflow.sdk.definitions.context.Context
 1class Context(TypedDict, total=False):
 2    """Jinja2 template context for task rendering."""
 3
 4    conn: Any
 5    dag: DAG
 6    dag_run: DagRunProtocol
 7    data_interval_end: DateTime | None
 8    data_interval_start: DateTime | None
 9    outlet_events: OutletEventAccessorsProtocol
10    ds: str
11    ds_nodash: str
12    expanded_ti_count: int | None
13    exception: None | str | BaseException
14    inlets: list
15    inlet_events: InletEventsAccessors
16    logical_date: DateTime
17    macros: Any
18    map_index_template: str | None
19    outlets: list
20    params: dict[str, Any]
21    prev_data_interval_start_success: DateTime | None
22    prev_data_interval_end_success: DateTime | None
23    prev_start_date_success: DateTime | None
24    prev_end_date_success: DateTime | None
25    reason: str | None
26    run_id: str
27    start_date: DateTime
28    # TODO: Remove Operator from below once we have MappedOperator to the Task SDK
29    #   and once we can remove context related code from the Scheduler/models.TaskInstance
30    task: BaseOperator | Operator
31    task_reschedule_count: int
32    task_instance: RuntimeTaskInstanceProtocol
33    task_instance_key_str: str
34    # `templates_dict` is only set in PythonOperator
35    templates_dict: dict[str, Any] | None
36    test_mode: bool
37    ti: RuntimeTaskInstanceProtocol
38    # triggering_asset_events: Mapping[str, Collection[AssetEvent | AssetEventPydantic]]
39    triggering_asset_events: Any
40    try_number: int | None
41    ts: str
42    ts_nodash: str
43    ts_nodash_with_tz: str
44    var: Any

在 Airflow UI 中,运行后的 Dag 步骤,可查看到 Rendered Templates, 其中有渲染的模板值,如果显示

或者在开发过程中,DAG 无需运行就能用 airflow 命令来查看模板渲染结果,命令为
airflow tasks render [dag id] [task id] [desired execution date(例: 2025-04-24T22:00:00)]

总结:

本文由一个简单的无参 Task 逐步探索了以下问题

  1. Apache Airflow 如何使用模板
  2. 在何处可使用模板,Operator 的参数中,指为参数指定文件中
  3. 如何获得所有任务参数
  4. 如何选择性的定义所需的任务参数,只需要定义 airflow.sdk.definitions.context.Context 中列出的变量对应名称的参数

对于主要的两方面的内容未进行深入

  1. 怎样在模板中使用 filter, 即 {{ data_interval_end | ds }} 中管道符后的 ds
  2. 如何传递 jinja_env 参数

最后赞叹一句,Apache Airflow 代码写得很美。 永久链接 https://yanbin.blog/apache-airflow-task-use-template-context/, 来自 隔叶黄莺 Yanbin's Blog
[版权声明] 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。