Apache Airflow 3.0 使用 Asset-Aware DAG(producer/consumers)

继续玩弄那个小风车,先前买的 《Data Pipelines with Apache Airflow》 一眼没看直接作废,因为是基于 Apache Airflow 2.x 的,3.0 既出立马又买了该书的第二版,倒是基于 Apache Airflow 3.0 的,但写书之时 3.0 尚未正式推出,所以书中内容与实际应用有许多出入。

Apache Airflow 自 2.4 起就支持基于 Asset 事件触发 DAG,那时叫做 Data-aware,从 Apache Airflow 3.0 起更名为 Asset-Aware, 并且在 UI 上也会显示使用到的 Assets。那么 Asset-Aware  解决什么问题呢,它采用了 Producer/Consumer 模式可把依赖的某一共同资源的  DAG 串联起来。比如某一个 Producer DAG 写了文件到 s3://asset-bucket/example.csv, (发布一个事件 ), 然后相当于订阅了该事件所有相关 Consumer DAG 都会得到执行。

Airflow 的 Asset 使用 URI 的格式

  1. s3://asset-bucket/example.csv
  2. file://tmp/data/export.json
  3. postgresql://mydb/schema/mytable
  4. gs://my-bucket/processed/report.parquet

注意,Airflow 的 Asset 只是一个标识符,或者想像为一个消息队列的 Topic,并不会象它的 URI 暗示的那样需要真正存在一个目标资源(如 s3://asset-bucket/example.csv),或者产生了一个实际的事件(如 s3, file 的 change event)。它更像是 Java Spring 中的 ApplicationEvent, Producer 与 Consumer DAG 之间只是用该标识符进行关联,或指引如何交换信息,比如使用 Asset(s3://asset-bucket/example.csv), 它们可能根本不用操作 S3, 不过一般还是会按照某种约定来行事的。

因此 Asset 的 URL 可以自由的定义,如 ecs://task_definition/airflow-web

假如我们启动一个 Apache Airflow, 设定了 AIRFLOW__CORE__LOAD_EXAMPLES=false, 不加载示例的话,打开 Web 界面看到的 Assets 中也是空的。未接触 Assets 之前不免会让人思考如何才能添加一个 Asset 呢?进去 Asset 界面没有添加按钮,学了之后才知道它是在 dag_processor 解析 DAG 之后自动添加上去了,Assets 界面只是提供了根据 Asset 显示相关联的 DAG。

下面我们开始实际创建三个 DAG, 一个 Producer, 两个  Consumer, 并用 Asset 把它们关联起来,Producer  可基于普通的 Interval  方式触发,当 Producer 执行成功后,才触发执行那两个 Consumer。

首选创建 asset_producer.py

放到 dags 目录,过会儿在界面上看到

再创建另两个 DAG

asset_consumer1.py

asset_consumer2.py

现在 UI 上看到的这三个 DAG

从这里我们看不出 asset-producer 与 asset_consumer1/asset_consumer2 之间是如何关联的,只知道两个 Consumer 是由 s3://asset-bucket/example.csv 触发的。但是来到 Assets 页面,点开 Asset s3://asset-bucket/example.csv 就看到它们之间的关系了

通过这个图,我们完全可以理解为

  1. asset-producer 执行成功后,发布一个标识为 s3://asset-bucket/example.csv 的事件
  2. asset_consumer1 和 asset_consumer2 由于订阅了该事件,所以会在 asset_producer 成功结束后被触发执行

至于 asset-producer, asset_consumer1, asset_consumer2 之间如何通过 s3://asset-bucket/example.csv 交换信息,那是实现上的约定。

实际测试,手工触发 asset-producer, 完后 asset_consumer1 和 asset_consumer2 立即执行,再来看 asset_consumer2 中输出的 context 信息(JSON 格式化后)

通过上面的信息有助于我们了解可在 DAG 间传递什么信息,比如由 triggering_asset_events 我们知道上游的 Producer 是谁。

其他相关话题

在由 Airflow 2.x 转换到 3.0 时有许多的 Python 包名和类名都变了,还好没正式的学习 Airflow 2.x 的 DAG, 障碍不大,但 Airflow 3.0 中同时保留了 2.x 的包和类,所以相同的类会存在于多个包中。

一个 DAG 可同时关联多个不同的 Asset

在一个 Python 文件中可以写多个  DAG 实例

触发 DAG 的多个 Asset  可以用逻辑操作,默认为 AND,如

比如通过逻辑操作实现条件,多个上游 DAG 都成功执行,或只要有一个上游  DAG 成功执行便触发

自 Airflow 3.0 起,真的提供了 Event-driven scheduling, 基于真实事件来触发,需要用到  AssetWatcher,比如监听 AWS SQS, Kafka 中的消息进而触发 DAG 执行,这是更高级话题,不就此展开。

本文链接 https://yanbin.blog/apache-airflow-3-0-us-asset-aware-dagproducer-consumers/, 来自 隔叶黄莺 Yanbin Blog

[版权声明] Creative Commons License 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。

Subscribe
Notify of
guest

0 Comments
Inline Feedbacks
View all comments