继续玩弄那个小风车,先前买的 《Data Pipelines with Apache Airflow》 一眼没看直接作废,因为是基于 Apache Airflow 2.x 的,3.0 既出立马又买了该书的第二版,倒是基于 Apache Airflow 3.0 的,但写书之时 3.0 尚未正式推出,所以书中内容与实际应用有许多出入。
Apache Airflow 自 2.4 起就支持基于 Asset 事件触发 DAG,那时叫做 Data-aware,从 Apache Airflow 3.0 起更名为 Asset-Aware, 并且在 UI 上也会显示使用到的 Assets。那么 Asset-Aware 解决什么问题呢,它采用了 Producer/Consumer 模式可把依赖的某一共同资源的 DAG 串联起来。比如某一个 Producer DAG 写了文件到 s3://asset-bucket/example.csv, (发布一个事件 ), 然后相当于订阅了该事件所有相关 Consumer DAG 都会得到执行。
Airflow 的 Asset 使用 URI 的格式
- s3://asset-bucket/example.csv
- file://tmp/data/export.json
- postgresql://mydb/schema/mytable
- gs://my-bucket/processed/report.parquet
注意,Airflow 的 Asset 只是一个标识符,或者想像为一个消息队列的 Topic,并不会象它的 URI 暗示的那样需要真正存在一个目标资源(如 s3://asset-bucket/example.csv),或者产生了一个实际的事件(如 s3, file 的 change event)。它更像是 Java Spring 中的 ApplicationEvent, Producer 与 Consumer DAG 之间只是用该标识符进行关联,或指引如何交换信息,比如使用 Asset(s3://asset-bucket/example.csv), 它们可能根本不用操作 S3, 不过一般还是会按照某种约定来行事的。
因此 Asset 的 URL 可以自由的定义,如 ecs://task_definition/airflow-web
假如我们启动一个 Apache Airflow, 设定了 AIRFLOW__CORE__LOAD_EXAMPLES=false, 不加载示例的话,打开 Web 界面看到的 Assets 中也是空的。未接触 Assets 之前不免会让人思考如何才能添加一个 Asset 呢?进去 Asset 界面没有添加按钮,学了之后才知道它是在 dag_processor 解析 DAG 之后自动添加上去了,Assets 界面只是提供了根据 Asset 显示相关联的 DAG。
下面我们开始实际创建三个 DAG, 一个 Producer, 两个 Consumer, 并用 Asset 把它们关联起来,Producer 可基于普通的 Interval 方式触发,当 Producer 执行成功后,才触发执行那两个 Consumer。
首选创建 asset_producer.py
1 2 3 4 5 6 7 8 9 10 |
import pendulum from airflow.sdk import DAG, Asset from airflow.providers.standard.operators.bash import BashOperator with DAG(dag_id="asset-producer"): task1 = BashOperator( task_id="task1", bash_command="echo 'Hello World'", outlets=[Asset("s3://asset-bucket/example.csv")], ) |
放到 dags 目录,过会儿在界面上看到
asset_consumer1.py
1 2 3 4 5 6 7 8 9 10 11 |
from airflow.providers.standard.operators.bash import BashOperator from airflow.sdk import DAG, Asset with DAG( dag_id="asset_consumer1", schedule=[Asset("s3://asset-bucket/example.csv")] ): task = BashOperator( task_id="consumer2_task", bash_command="echo '{{ triggering_asset_events }}' updated", ) |
asset_consumer2.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
import json from airflow.providers.standard.operators.python import PythonOperator from airflow.sdk import DAG, Asset def _task(**context): print(json.dumps(context, default=str)) with DAG( dag_id="asset_consumer2", schedule=[Asset("s3://asset-bucket/example.csv")] ): task = PythonOperator( task_id="conumser2_task", python_callable=_task ) |
现在 UI 上看到的这三个 DAG
从这里我们看不出 asset-producer 与 asset_consumer1/asset_consumer2 之间是如何关联的,只知道两个 Consumer 是由 s3://asset-bucket/example.csv 触发的。但是来到 Assets 页面,点开 Asset s3://asset-bucket/example.csv 就看到它们之间的关系了
通过这个图,我们完全可以理解为
- asset-producer 执行成功后,发布一个标识为 s3://asset-bucket/example.csv 的事件
- asset_consumer1 和 asset_consumer2 由于订阅了该事件,所以会在 asset_producer 成功结束后被触发执行
至于 asset-producer, asset_consumer1, asset_consumer2 之间如何通过 s3://asset-bucket/example.csv 交换信息,那是实现上的约定。
实际测试,手工触发 asset-producer, 完后 asset_consumer1 和 asset_consumer2 立即执行,再来看 asset_consumer2 中输出的 context 信息(JSON 格式化后)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
{ "dag": "<DAG: asset_consumer2>", "inlets": [], "map_index_template": null, "outlets": [], "run_id": "asset_triggered__2025-05-21T17:46:53.382896+00:00_3yLqCHdL", "task": "<Task(PythonOperator): conumser2_task>", "task_instance": "id=UUID('0196f3f4-9717-7f32-a01a-463e0f7d5908') task_id='conumser2_task' dag_id='asset_consumer2' run_id='asset_triggered__2025-05-21T17:46:53.382896+00:00_3yLqCHdL' try_number=1 map_index=-1 hostname='62ac3339227e' context_carrier=None task=<Task(PythonOperator): conumser2_task> bundle_instance=LocalDagBundle(name=dags-folder) max_tries=0 start_date=datetime.datetime(2025, 5, 21, 17, 46, 55, 970788, tzinfo=TzInfo(UTC)) end_date=None state=<TaskInstanceState.RUNNING: 'running'> is_mapped=False rendered_map_index=None", "ti": "id=UUID('0196f3f4-9717-7f32-a01a-463e0f7d5908') task_id='conumser2_task' dag_id='asset_consumer2' run_id='asset_triggered__2025-05-21T17:46:53.382896+00:00_3yLqCHdL' try_number=1 map_index=-1 hostname='62ac3339227e' context_carrier=None task=<Task(PythonOperator): conumser2_task> bundle_instance=LocalDagBundle(name=dags-folder) max_tries=0 start_date=datetime.datetime(2025, 5, 21, 17, 46, 55, 970788, tzinfo=TzInfo(UTC)) end_date=None state=<TaskInstanceState.RUNNING: 'running'> is_mapped=False rendered_map_index=None", "outlet_events": "OutletEventAccessors(_dict={})", "inlet_events": "InletEventsAccessors(_inlets=[], _assets={}, _asset_aliases={})", "macros": "<MacrosAccessor (dynamic access to macros)>", "params": {}, "var": { "json": "<VariableAccessor (dynamic access)>", "value": "<VariableAccessor (dynamic access)>" }, "conn": "<ConnectionAccessor (dynamic access)>", "dag_run": "dag_id='asset_consumer2' run_id='asset_triggered__2025-05-21T17:46:53.382896+00:00_3yLqCHdL' logical_date=None data_interval_start=None data_interval_end=None run_after=datetime.datetime(2025, 5, 21, 17, 46, 53, 382896, tzinfo=TzInfo(UTC)) start_date=datetime.datetime(2025, 5, 21, 17, 46, 55, 492780, tzinfo=TzInfo(UTC)) end_date=None clear_number=0 run_type=<DagRunType.ASSET_TRIGGERED: 'asset_triggered'> conf={} consumed_asset_events=[AssetEventDagRunReference(asset=AssetReferenceAssetEventDagRun(name='s3://asset-bucket/example.csv', uri='s3://asset-bucket/example.csv', extra={}), extra={}, source_task_id='task1', source_dag_id='asset-producer', source_run_id='manual__2025-05-21T17:46:50.673849+00:00', source_map_index=-1, source_aliases=[], timestamp=datetime.datetime(2025, 5, 21, 17, 46, 53, 373222, tzinfo=TzInfo(UTC)))]", "triggering_asset_events": "TriggeringAssetEventAccessor(_events=defaultdict(<class 'list'>, {AssetUniqueKey(name='s3://asset-bucket/example.csv', uri='s3://asset-bucket/example.csv'): [AssetEventDagRunReferenceResult(asset=AssetReferenceAssetEventDagRun(name='s3://asset-bucket/example.csv', uri='s3://asset-bucket/example.csv', extra={}), extra={}, source_task_id='task1', source_dag_id='asset-producer', source_run_id='manual__2025-05-21T17:46:50.673849+00:00', source_map_index=-1, source_aliases=[], timestamp=datetime.datetime(2025, 5, 21, 17, 46, 53, 373222, tzinfo=TzInfo(UTC)))]}))", "task_instance_key_str": "asset_consumer2__conumser2_task__asset_triggered__2025-05-21T17:46:53.382896+00:00_3yLqCHdL", "task_reschedule_count": 0, "prev_start_date_success": "None", "prev_end_date_success": "None", "templates_dict": null } |
通过上面的信息有助于我们了解可在 DAG 间传递什么信息,比如由 triggering_asset_events 我们知道上游的 Producer 是谁。
其他相关话题
在由 Airflow 2.x 转换到 3.0 时有许多的 Python 包名和类名都变了,还好没正式的学习 Airflow 2.x 的 DAG, 障碍不大,但 Airflow 3.0 中同时保留了 2.x 的包和类,所以相同的类会存在于多个包中。
一个 DAG 可同时关联多个不同的 Asset
在一个 Python 文件中可以写多个 DAG 实例
触发 DAG 的多个 Asset 可以用逻辑操作,默认为 AND,如
1 2 3 4 5 6 7 8 9 |
dag1_asset = Asset("s3://dag1/output_1.txt") dag2_asset = Asset("s3://dag2/output_1.txt") with DAG( # Consume asset 1 and 2 with asset expressions schedule=(dag1_asset | dag2_asset), ..., ): ... |
比如通过逻辑操作实现条件,多个上游 DAG 都成功执行,或只要有一个上游 DAG 成功执行便触发
自 Airflow 3.0 起,真的提供了 Event-driven scheduling, 基于真实事件来触发,需要用到 AssetWatcher,比如监听 AWS SQS, Kafka 中的消息进而触发 DAG 执行,这是更高级话题,不就此展开。