Apache Airflow 分支与条件流程

来到稍微复杂一点的流程,虽说 DAG 不能有循环但分支还是可以有的。比如下面的分支流程

start >> [task1, task2] >> end

在 Airflow UI 中展示出来就是

Airflow 默认下游 Task 的 trigger_ruleall_success,  即要求上游的所有 Task 都必须成功才会执行,否则跟随着失败或跳过,这对于并行处理然后汇集结果的应用是合理的。

本文将要使用到的是分支(BranchPythonOperator), 比如共用一个 DAG, 周中与周末执行不同的分支,或根据条件从不同的数据源采集数据,下流的任务则需在任何一个分支成功即可触发。对上面的流程加上辅助任务,使其表达性更强 阅读全文 >>

Apache Airflow 3.0 使用 Asset-Aware DAG(producer/consumers)

继续玩弄那个小风车,先前买的 《Data Pipelines with Apache Airflow》 一眼没看直接作废,因为是基于 Apache Airflow 2.x 的,3.0 既出立马又买了该书的第二版,倒是基于 Apache Airflow 3.0 的,但写书之时 3.0 尚未正式推出,所以书中内容与实际应用有许多出入。

Apache Airflow 自 2.4 起就支持基于 Asset 事件触发 DAG,那时叫做 Data-aware,从 Apache Airflow 3.0 起更名为 Asset-Aware, 并且在 UI 上也会显示使用到的 Assets。那么 Asset-Aware  解决什么问题呢,它采用了 Producer/Consumer 模式可把依赖的某一共同资源的  DAG 串联起来。比如某一个 Producer DAG 写了文件到 s3://asset-bucket/example.csv, (发布一个事件 ), 然后相当于订阅了该事件所有相关 Consumer DAG 都会得到执行。

Airflow 的 Asset 使用 URI 的格式

  1. s3://asset-bucket/example.csv
  2. file://tmp/data/export.json
  3. postgresql://mydb/schema/mytable
  4. gs://my-bucket/processed/report.parquet

阅读全文 >>

Apache Airflow 任务中使用模板或上下文

本文大概记录一下在 Apache Airflow 的 Task 或 Operator(这两个基本是同一概念) 中如何使用 模板(Template) 和上下文(Context). Airflow 的模板引擎用的  Jinja Template, 它也被 FastAPI 和 Flask 所采纳。首先只有构造 Operator 时的参数或参数指定的文件内容中,或者调用 Operator render_template() 方法才能用模板语法,像 {{ ds }}.  Apache Airflow 有哪些模板变量可用请参考: Templates references / Variables, 本文将会打印出一个 Task 的 context 变量列出所有可用的上下文变量, 不断的深入,最后在源代码找到相关的定义。

通过使用模板或上下文,我们能能够在任务中使用到 Apache Airflow 一些内置的变量值,如 DAG 或任务当前运行时的状态等 。

当我们手动触发一个 DAG 时, 在 Configuration JSON 中输入的参数也能在 context 中找到. 所有的 Operator  继承链可追溯到 AbstractOperator -> Templater, 因此所有的 Operator(Task) 都能通过调用 Templater.render_template() 方法对模板进行渲染,该方法的原型是 阅读全文 >>

Docker Compose 简单配置 Apache Airflow 3.0(PostgreSQL)

Apache Airflow 重新唤起我的注意力是因为 Airflow 3.0 在近日 April 22, 2025 发布了,其二则是我们一直都有计划任务的需求,以下几种方案都太简陋

  1. 用 Windows 的计划任务或 Linux 的 Cron 都不易管理,且有单点故障问题
  2. 在 Java Spring 项目中使用集群模式的 Quartz 有些麻烦,且对于 AutoScaling 也不怎么友好
  3. AWS 上用 CloudWatch Rule + AWS Lambda 的方案可靠性没有问题,但不适于监控 

因此还有必要再次尝试 Apache Airflow, 它有集中管理的界面,各个部件都是可伸缩的,如 WebServer, Workers 等。特别是刚出的 Apache Airflow 3.0 带来以下主要新特性

  1. 新的服务化架构,各个部件间耦合度降低
  2. 多语言支持,借助了 Task SDK, 可望用 Java, JavaScript, TypeScript 等语言写 DAG
  3. DAG 支持版本控制,可回溯历史
  4. 支持事件驱动,即  DAG 可响应外部事件,如文件到达,消息队列等
  5. 引入了资产驱动调度功能,可根据数据资产的变化 进行触发,可以说是事件驱动的一类
  6. 全新的 React UI 界面

阅读全文 >>