本文大概记录一下在 Apache Airflow 的 Task 或 Operator(这两个基本是同一概念) 中如何使用 模板(Template) 和上下文(Context). Airflow 的模板引擎用的 Jinja Template, 它也被 FastAPI 和 Flask 所采纳。首先只有构造 Operator 时的参数或参数指定的文件内容中,或者调用 Operator render_template() 方法才能用模板语法,像 {{ ds }}
. Apache Airflow 有哪些模板变量可用请参考: Templates references / Variables, 本文将会打印出一个 Task 的 context 变量列出所有可用的上下文变量, 不断的深入,最后在源代码找到相关的定义。
通过使用模板或上下文,我们能能够在任务中使用到 Apache Airflow 一些内置的变量值,如 DAG 或任务当前运行时的状态等 。
当我们手动触发一个 DAG 时, 在 Configuration JSON
中输入的参数也能在 context 中找到. 所有的 Operator 继承链可追溯到 AbstractOperator -> Templater, 因此所有的 Operator(Task) 都能通过调用 Templater.render_template() 方法对模板进行渲染,该方法的原型是
1 2 3 4 5 6 7 |
def render_template( self, content: Any, context: Context, jinja_env: jinja2.Environment | None = None, seen_oids: set[int] | None = None, ) |
留意在 Templater 中定义的两个变量
1234 # For derived classes to define which fields will get jinjaified.template_fields: Collection[str]# Defines which files extensions to look for in the templated fields.template_ext: Sequence[str]
template_fields
定义了在构造 Operator 实例时哪些参数支持模板解析,template_ext
指定规定什么扩展名对应的文件内容中的模板变量可被解析。
以 BashOperator 为例,它定义了
1 2 3 |
template_fields: Sequence[str] = ("bash_command", "env", "cwd") template_fields_renderers = {"bash_command": "bash", "env": "json"} template_ext: Sequence[str] = (".sh", ".bash") |
- 而 template_fields_renderers: ClassVar[dict[str, str]] = {} 定义在 airflow.sdk.bases.operator 中,可看出它是 template_fields 和 template_ext 综合体,key 为支持模板的参数,value 为支持模板的文件扩展名。
bash_command, env, cwd 参数中的模板变量会被解析, 如果 bash_command 中指定的是一个脚本文件 *.sh 或 *.bash,那么在该脚本文件中的模板变量也将被替换,如
1 2 3 4 |
step1 = BashOperator( task_id="step_1", bash_command="echo {{ data_interval_start | ds }}" ) |
或者 @task.bash
的使用
1 2 3 4 |
@task.bash def bash_example(): return "test.sh " # return "echo {{ ds }}" # or |
同样查看常用的 PythonOperator 中定义
1 2 |
template_fields: Sequence[str] = ("templates_dict", "op_args", "op_kwargs") template_fields_renderers = {"templates_dict": "json", "op_args": "py", "op_kwargs": "py"} |
从中我们理解到 PythonOperator 的什么参数支持模板,什么参数支持从文件中读取内容,并支持模板变量解析, 其他的 Operator 类似。
最后我们来写一个简单的 DAG,演示了参数中使用模板,并在手动触发时指定 Configuration JSON 内容, 任务中打印出整个 context
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
from datetime import datetime import pendulum import json from airflow.sdk import DAG from airflow.providers.standard.operators.bash import BashOperator from airflow.providers.standard.decorators.python import PythonOperator def _step2(task_instance_key, **context): print(f'context: {json.dumps(context, default=str)}') print(f'run_id: {context['run_id']}') print(f"{task_instance_key=}") print(context["task"].render_template("try_number: {{ ti.try_number }}", context)) with DAG( dag_id="hello_dag", schedule="@daily", start_date=pendulum.today(), ): step1 = BashOperator( task_id="step_1", bash_command="echo {{data_interval_start | ds}}" ) step2 = PythonOperator( task_id="step2", python_callable = _step2, op_args=("{{ task_instance_key_str }}",) ) step1 >> step2 |
Apache Airflow 的 DAG 声明有多种方式,1) with 上下文管理器 2) @dag, 相应的 @task, @task.bash 等装饰器(Airflow 2.0 起), 还有动态的或模板化的,本人倾向于使用 with DAG(...) 的方式。
保存到 Apache Airflow 的 dags 目录,手动触发执行,并在触发填入 Configuration JSON 内容
1 |
{"key1": "this is from Configuration JSON"} |
执行后 step_1 中看到相关日志输出
[]
INFO
- 2025-05-21: source="airflow.task.hooks.airflow.providers.standard.hooks.subprocess.SubprocessHook"
step_2 中看到相关日志输出
3[]INFO
- run_id: manual__2025-05-21T02:47:06.482473+00:00: chan="stdout": source="task"4[]INFO
- task_instance_key='hello_dag__step2__20250521': chan="stdout": source="task"5[]INFO
- try_number: 1: chan="stdout": source="task"
单列出 print(f'context: {json.dumps(context, default=str)}') 的输出进行格式化后如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
{ "dag": "<DAG: hello_dag>", "inlets": [], "map_index_template": null, "outlets": [], "run_id": "manual__2025-05-21T02:47:06.482473+00:00", "task": "<Task(PythonOperator): step2>", "task_instance": "id=UUID('0196f0bc-ca09-7b6e-b0ca-fd3508c84372') task_id='step2' dag_id='hello_dag' run_id='manual__2025-05-21T02:47:06.482473+00:00' try_number=1 map_index=-1 hostname='b4fdb75c6566' context_carrier=None task=<Task(PythonOperator): step2> bundle_instance=LocalDagBundle(name=dags-folder) max_tries=0 start_date=datetime.datetime(2025, 5, 21, 2, 47, 9, 584053, tzinfo=TzInfo(UTC)) end_date=None state=<TaskInstanceState.RUNNING: 'running'> is_mapped=False rendered_map_index=None", "ti": "id=UUID('0196f0bc-ca09-7b6e-b0ca-fd3508c84372') task_id='step2' dag_id='hello_dag' run_id='manual__2025-05-21T02:47:06.482473+00:00' try_number=1 map_index=-1 hostname='b4fdb75c6566' context_carrier=None task=<Task(PythonOperator): step2> bundle_instance=LocalDagBundle(name=dags-folder) max_tries=0 start_date=datetime.datetime(2025, 5, 21, 2, 47, 9, 584053, tzinfo=TzInfo(UTC)) end_date=None state=<TaskInstanceState.RUNNING: 'running'> is_mapped=False rendered_map_index=None", "outlet_events": "OutletEventAccessors(_dict={})", "inlet_events": "InletEventsAccessors(_inlets=[], _assets={}, _asset_aliases={})", "macros": "<MacrosAccessor (dynamic access to macros)>", "params": { "key1": "this is from Configuration JSON" }, "var": { "json": "<VariableAccessor (dynamic access)>", "value": "<VariableAccessor (dynamic access)>" }, "conn": "<ConnectionAccessor (dynamic access)>", "dag_run": "dag_id='hello_dag' run_id='manual__2025-05-21T02:47:06.482473+00:00' logical_date=datetime.datetime(2025, 5, 21, 2, 45, 20, 594000, tzinfo=TzInfo(UTC)) data_interval_start=datetime.datetime(2025, 5, 21, 2, 45, 20, 594000, tzinfo=TzInfo(UTC)) data_interval_end=datetime.datetime(2025, 5, 21, 2, 45, 20, 594000, tzinfo=TzInfo(UTC)) run_after=datetime.datetime(2025, 5, 21, 2, 45, 20, 594000, tzinfo=TzInfo(UTC)) start_date=datetime.datetime(2025, 5, 21, 2, 47, 7, 252201, tzinfo=TzInfo(UTC)) end_date=None clear_number=0 run_type=<DagRunType.MANUAL: 'manual'> conf={'key1': 'this is from Confguration JSON'} consumed_asset_events=[]", "triggering_asset_events": "TriggeringAssetEventAccessor(_events=defaultdict(<class 'list'>, {}))", "task_instance_key_str": "hello_dag__step2__20250521", "task_reschedule_count": 0, "prev_start_date_success": "2025-05-21 01:43:44.010945+00:00", "prev_end_date_success": "2025-05-21 01:43:49.252632+00:00", "logical_date": "2025-05-21 02:45:20.594000+00:00", "ds": "2025-05-21", "ds_nodash": "20250521", "ts": "2025-05-21T02:45:20.594000+00:00", "ts_nodash": "20250521T024520", "ts_nodash_with_tz": "20250521T024520.594000+0000", "data_interval_end": "2025-05-21 02:45:20.594000+00:00", "data_interval_start": "2025-05-21 02:45:20.594000+00:00", "prev_data_interval_start_success": "2025-05-21 01:43:39.342000+00:00", "prev_data_interval_end_success": "2025-05-21 01:43:39.342000+00:00", "templates_dict": null } |
从中看到 context["task"]
是一个 Task(Operator), 所以能对它进行调用 render_template() 方法。我们在触发 DAG 时的输入出现在
1 2 3 |
"params": { "key1": "this is from Configuration JSON" } |
利用这一 Configuration JSON 功能使得在手工触发 DAG 可依照我们的意愿执行相应的分支,比如说任务中通过检查当天为周日时执行的分支,通过预设代码逻辑在手工触发时用 {"force": true} 也能执行,这样就不需要修改原有 DAG 代码。
如果我们的 Task 中不需获取到整个 context, 然后从中获得 task_instance, dag_run 或其他明确参数的话,经过测试证明 Apache Airflow 的 dag_processor 在获取 Task 的参数信息后最后是通过参数名对参数进行映射的,比如我们定义如下的 _step2() 函数
1 2 3 4 5 |
def _step2(task_instance: TaskInstance, dag_run: DagRun, inlet_events, my_param): print(f"task_instance: {task_instance}") print(f"dag_run: {dag_run}") print(f"inlet_events: {inlet_events}") print(f"{my_param=}") |
触发,看到相应的的输出
[]INFO
- task_instance: id=UUID('0196f0dd-987f-748a-b8de-812251470d2d') task_id='step2' dag_id='hello_dag' run_id='manual__2025-05-21T03:22:56.492907+00:00' try_number=1 map_index=-1 hostname='b4fdb75c6566' context_carrier=None task=<Task(PythonOperator): step2> bundle_instance=LocalDagBundle(name=dags-folder) max_tries=0 start_date=datetime.datetime(2025, 5, 21, 3, 22, 59, 785081, tzinfo=TzInfo(UTC)) end_date=None state=<TaskInstanceState.RUNNING: 'running'> is_mapped=False rendered_map_index=None: chan="stdout": source="task"3[]INFO
- dag_run: dag_id='hello_dag' run_id='manual__2025-05-21T03:22:56.492907+00:00' logical_date=datetime.datetime(2025, 5, 21, 3, 22, 51, 705000, tzinfo=TzInfo(UTC)) data_interval_start=datetime.datetime(2025, 5, 21, 3, 22, 51, 705000, tzinfo=TzInfo(UTC)) data_interval_end=datetime.datetime(2025, 5, 21, 3, 22, 51, 705000, tzinfo=TzInfo(UTC)) run_after=datetime.datetime(2025, 5, 21, 3, 22, 51, 705000, tzinfo=TzInfo(UTC)) start_date=datetime.datetime(2025, 5, 21, 3, 22, 57, 515261, tzinfo=TzInfo(UTC)) end_date=None clear_number=0 run_type=<DagRunType.MANUAL: 'manual'> conf={'key1': 'this is from Configuration JSON'} consumed_asset_events=[]: chan="stdout": source="task"4[]INFO
- inlet_events: InletEventsAccessors(_inlets=[], _assets={}, _asset_aliases={}): chan="stdout": source="task"5[]INFO
- my_param='hello world': chan="stdout": source="task"
说明 Apache Airflow 的 dag_processor 提取 task 参数时是按名称对应的,看到参数名为 inlet_events
, 执行任务时传入与之相应的值。
最后一个终极测试,修改 _step2() 函数为
1 2 3 4 5 |
def _step2(*args, **kwargs): frame = inspect.currentframe() args_info = inspect.getargvalues(frame) print("All input arguments:") print(json.dumps(args_info.locals, default=str)) |
用 *args, **kwars 接收到全部传入的参数,同样的测试, 把输出进行 JSON 格式化,就像
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 |
{ "args": [], "kwargs": { "dag": "<DAG: hello_dag>", "inlets": [], "map_index_template": null, "outlets": [], "run_id": "manual__2025-05-21T03:31:00.855955+00:00", "task": "<Task(PythonOperator): step2>", "task_instance": "id=UUID('0196f0e4-fc8b-745f-a08b-1622f4d94127') task_id='step2' dag_id='hello_dag' run_id='manual__2025-05-21T03:31:00.855955+00:00' try_number=1 map_index=-1 hostname='b4fdb75c6566' context_carrier=None task=<Task(PythonOperator): step2> bundle_instance=LocalDagBundle(name=dags-folder) max_tries=0 start_date=datetime.datetime(2025, 5, 21, 3, 31, 3, 667696, tzinfo=TzInfo(UTC)) end_date=None state=<TaskInstanceState.RUNNING: 'running'> is_mapped=False rendered_map_index=None", "ti": "id=UUID('0196f0e4-fc8b-745f-a08b-1622f4d94127') task_id='step2' dag_id='hello_dag' run_id='manual__2025-05-21T03:31:00.855955+00:00' try_number=1 map_index=-1 hostname='b4fdb75c6566' context_carrier=None task=<Task(PythonOperator): step2> bundle_instance=LocalDagBundle(name=dags-folder) max_tries=0 start_date=datetime.datetime(2025, 5, 21, 3, 31, 3, 667696, tzinfo=TzInfo(UTC)) end_date=None state=<TaskInstanceState.RUNNING: 'running'> is_mapped=False rendered_map_index=None", "outlet_events": "OutletEventAccessors(_dict={})", "inlet_events": "InletEventsAccessors(_inlets=[], _assets={}, _asset_aliases={})", "macros": "<MacrosAccessor (dynamic access to macros)>", "params": { "key1": "this is from Configuration JSON" }, "var": { "json": "<VariableAccessor (dynamic access)>", "value": "<VariableAccessor (dynamic access)>" }, "conn": "<ConnectionAccessor (dynamic access)>", "dag_run": "dag_id='hello_dag' run_id='manual__2025-05-21T03:31:00.855955+00:00' logical_date=datetime.datetime(2025, 5, 21, 3, 30, 42, 564000, tzinfo=TzInfo(UTC)) data_interval_start=datetime.datetime(2025, 5, 21, 3, 30, 42, 564000, tzinfo=TzInfo(UTC)) data_interval_end=datetime.datetime(2025, 5, 21, 3, 30, 42, 564000, tzinfo=TzInfo(UTC)) run_after=datetime.datetime(2025, 5, 21, 3, 30, 42, 564000, tzinfo=TzInfo(UTC)) start_date=datetime.datetime(2025, 5, 21, 3, 31, 1, 283439, tzinfo=TzInfo(UTC)) end_date=None clear_number=0 run_type=<DagRunType.MANUAL: 'manual'> conf={'key1': 'this is from Configuration JSON'} consumed_asset_events=[]", "triggering_asset_events": "TriggeringAssetEventAccessor(_events=defaultdict(<class 'list'>, {}))", "task_instance_key_str": "hello_dag__step2__20250521", "task_reschedule_count": 0, "prev_start_date_success": "2025-05-21 03:29:02.033893+00:00", "prev_end_date_success": "2025-05-21 03:29:07.004639+00:00", "logical_date": "2025-05-21 03:30:42.564000+00:00", "ds": "2025-05-21", "ds_nodash": "20250521", "ts": "2025-05-21T03:30:42.564000+00:00", "ts_nodash": "20250521T033042", "ts_nodash_with_tz": "20250521T033042.564000+0000", "data_interval_end": "2025-05-21 03:30:42.564000+00:00", "data_interval_start": "2025-05-21 03:30:42.564000+00:00", "prev_data_interval_start_success": "2025-05-21 03:28:56.534000+00:00", "prev_data_interval_end_success": "2025-05-21 03:28:56.534000+00:00", "my_param": "hello world", "templates_dict": null }, "frame": "<frame at 0x7f4f4b1790e0, file '/opt/airflow/dags/hello_dag.py', line 25, code _step2>" } |
"args": []
为空是因为没有用 op_args 向 PythonOperator 传递参数,这里基本上与最初的 **context 输出类似,只多包含了 my_param
自定义的参数,有了这个参数列表后,task 函数就可以针对性的定义参数名,如想要获取 Configuration JSON 的输入,只要定义
1 2 |
def _step2(params: dict[str, Any]): print(params["key1"]) |
但是却不能定义一个名为 context 的参数,如
1 2 |
def _step2(context): print(context) |
这样定义的 context
参数只是一个普通的像 my_param
那样的自定义参数,必须用 op_args 或 op_kwargs 给它传递值。
原理上是因为 Python 会用下面的方式去调用 python_callable
对应的方法
_step2(conf=..., dag=..., dag_run..., data_interval_start=.., ...)
而假如定义方法 _step2(data_interval_start, **context), 那么参数名对应到的 data_interval_start 参数直接获得值,其余则全放到 context 中。
这个参数列表定义在 airflow.sdk.definitions.context.Context 中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
class Context(TypedDict, total=False): """Jinja2 template context for task rendering.""" conn: Any dag: DAG dag_run: DagRunProtocol data_interval_end: DateTime | None data_interval_start: DateTime | None outlet_events: OutletEventAccessorsProtocol ds: str ds_nodash: str expanded_ti_count: int | None exception: None | str | BaseException inlets: list inlet_events: InletEventsAccessors logical_date: DateTime macros: Any map_index_template: str | None outlets: list params: dict[str, Any] prev_data_interval_start_success: DateTime | None prev_data_interval_end_success: DateTime | None prev_start_date_success: DateTime | None prev_end_date_success: DateTime | None reason: str | None run_id: str start_date: DateTime # TODO: Remove Operator from below once we have MappedOperator to the Task SDK # and once we can remove context related code from the Scheduler/models.TaskInstance task: BaseOperator | Operator task_reschedule_count: int task_instance: RuntimeTaskInstanceProtocol task_instance_key_str: str # `templates_dict` is only set in PythonOperator templates_dict: dict[str, Any] | None test_mode: bool ti: RuntimeTaskInstanceProtocol # triggering_asset_events: Mapping[str, Collection[AssetEvent | AssetEventPydantic]] triggering_asset_events: Any try_number: int | None ts: str ts_nodash: str ts_nodash_with_tz: str var: Any |
在 Airflow UI 中,运行后的 Dag 步骤,可查看到 Rendered Templates
, 其中有渲染的模板值,如果显示
或者在开发过程中,DAG 无需运行就能用 airflow 命令来查看模板渲染结果,命令为
airflow tasks render [dag id] [task id] [desired execution date(例: 2025-04-24T22:00:00)]
总结:
本文由一个简单的无参 Task 逐步探索了以下问题
- Apache Airflow 如何使用模板
- 在何处可使用模板,Operator 的参数中,指为参数指定文件中
- 如何获得所有任务参数
- 如何选择性的定义所需的任务参数,只需要定义 airflow.sdk.definitions.context.Context 中列出的变量对应名称的参数
对于主要的两方面的内容未进行深入
- 怎样在模板中使用 filter, 即 {{ data_interval_end | ds }} 中管道符后的
ds
- 如何传递
jinja_env
参数
最后赞叹一句,Apache Airflow 代码写得很美。