Apache Airflow 重新唤起我的注意力是因为 Airflow 3.0 在近日 April 22, 2025 发布了,其二则是我们一直都有计划任务的需求,以下几种方案都太简陋
- 用 Windows 的计划任务或 Linux 的 Cron 都不易管理,且有单点故障问题
- 在 Java Spring 项目中使用集群模式的 Quartz 有些麻烦,且对于 AutoScaling 也不怎么友好
- AWS 上用 CloudWatch Rule + AWS Lambda 的方案可靠性没有问题,但不适于监控
因此还有必要再次尝试 Apache Airflow, 它有集中管理的界面,各个部件都是可伸缩的,如 WebServer, Workers 等。特别是刚出的 Apache Airflow 3.0 带来以下主要新特性
- 新的服务化架构,各个部件间耦合度降低
- 多语言支持,借助了 Task SDK, 可望用 Java, JavaScript, TypeScript 等语言写 DAG
- DAG 支持版本控制,可回溯历史
- 支持事件驱动,即 DAG 可响应外部事件,如文件到达,消息队列等
- 引入了资产驱动调度功能,可根据数据资产的变化 进行触发,可以说是事件驱动的一类
- 全新的 React UI 界面
Apache Airflow 提供了多种安装部署方式,如 pip, uv 安装,Docker 或 Docker Compose, 在 Kubernetes 环境可用官方的 Helm。
本文欲尝试的是用 Docker Compose 的方式在本地进行部署,与默认的 airflow standalone 方式唯一不同的只是把数据库独立了出来,不再使用内置的 SQLite,仍然使用单体中的 WebServer, Scheduler, Workers, Triggerer 等部件。
这是官方的 docker-compose.yaml, 本文对些有少许参考,该 docker-coompose 配置文件中含有 8 个 service, 分别是
- airflow-scheduler
- airflow-dag-processor
- airflow-api-server
- airflow-worker
- airflow-triggerer
- airflow-init
- postgres
- redis
此处尽量简化,只有两个 service -- postgres 和 airflow. Airflow 将使用 airflow standalone
启动,所它涵盖了以上 1 - 5 的功能。
Apache Airflow 镜像是从 Python:3.12-slim-bookworm 构建的。
准备目录结构
首先在当前目录中创建下面结构,所创建的空目录是为了映射到容器当中去的。
1 2 3 4 5 6 7 8 9 10 11 |
mkdir ./{config,dags,logs,plugins,postgres-data} touch Dockerfile docker-compose.yml . ├── config ├── dags ├── docker-compose.yml ├── Dockerfile ├── logs ├── plugins └── postgres-data |
Dockerfile 文件内容
Dockerfile 是用来构建 Airflow 服务的
1 2 3 4 5 6 7 8 9 10 11 12 |
FROM python:3.12-bookworm ENV AIRFLOW_HOME=/opt/airflow WORKDIR ${AIRFLOW_HOME} ARG CONSTRAINT_URL=https://raw.githubusercontent.com/apache/airflow/constraints-3.0.0/constraints-3.12.txt RUN pip install "apache-airflow[postgres]==3.0.0" --constraint ${CONSTRAINT_URL} && \ pip install flask_appbuilder ENTRYPOINT ["bash] |
在 DAG 中将要用到的 Python 组件用 pip install 安装即可,比如我们的任务将与 AWS 交互,那么就是在 RUN
中加上一个
pip install boto3
Airflow 将使用 PostgreSQL 数据库,所以选择了 apache-airflow[postgres]
。ENTRYPOINT
无所谓,因为我们会在后面用 docker-compose.yml 来指定如何启动 airflow 服务。
编写 docker-compose.yml 文件
docker-compose.yml 定义了如何启动容器,airflow 需要用构建 Dockerfile 构建的容器,postgres 是从 docker hub 中拉取的。
该文件内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
services: airflow: build: context: . dockerfile: Dockerfile ports: - "8080:8080" depends_on: postgres: condition: service_healthy healthcheck: test: ["CMD", "curl", "--fail", "http://localhost:8080/api/v2/version"] restart: always volumes: - ./dags:/opt/airflow/dags - ./logs:/opt/airflow/logs - ./config:/opt/airflow/config - ./plugins:/opt/airflow/plugins environment: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow AIRFLOW__CORE__LOAD_EXAMPLES: false AIRFLOW__WEBSERVER__EXPOSE_CONFIG: true entrypoint: ["airflow", "standalone"] postgres: image: postgres:16-bookworm environment: POSTGRES_USER: airflow POSTGRES_PASSWORD: airflow POSTGRES_DB: airflow volumes: - ./postgres-data:/var/lib/postgresql/data healthcheck: test: ["CMD", "pg_isready", "-U", "airflow"] restart: always |
前面说过,这里只定义了两个服务,airflow 和 postgres。两个容器都加入了 healthcheck, airflow 依赖于 postgres, 只有 postgres 数据库就绪后地会开始启动 airflow 服务。
留意其中的卷映射,以及数据库的名称,用户名及密码配置,在 airflow 服务中通过配置环境变量
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
使用 postgres 容器中的数据库
docker-compose 中定义的服务会使用相同的网络,所以 postgres 不需要暴露 5432 数据库端口在 airflow 服务中同样能访问到。由于我们要在外部访问 airflow 的 web 服务,因此在 airflow 映射了端口号 8080:8080
启动 airflow 和 postgres 服务
下面一些操作将要到到 docker-compose 命令, 它与 docker 命令有些类型, docker-compose 是比 Kubernetes 更简单的容器编排工具。Docker Swarm + Docker Compose 也能够类似于 Kubernetes 那样在集群环境中编排容器服务,当然不具有 Kubernetes 的复杂度和完备的功能,中小型的应用还是能就会的。
启动 docker-compose.yml 中定义的服务的命令是
docker-compose up
如果 docker-compose up
发现没有构建好 airflow 镜像,会首先构建相应的镜像,就像是事先执行了
docker-compose build
命令一样
一切正常的话,数据库准备,并且在启动 airflow 服务的 airflow standalone
命令时会自动连接 postgres 数据库并初始化数据库 Schema
现在我们可以打开浏览器访问 http://localhost:8080,被要求输入用户名和密码,要到 airflow 容器中去寻找
docker ps 或 docker-compose ps 命令列出所有启动的容器
1 2 3 4 |
docker-compose ps NAME IMAGE COMMAND SERVICE CREATED STATUS PORTS airflow-test-airflow-1 airflow-test-airflow "airflow standalone" airflow 6 minutes ago Up 5 minutes (healthy) 0.0.0.0:8080->8080/tcp airflow-test-postgres-1 postgres:16-bookworm "docker-entrypoint.s…" postgres 6 minutes ago Up 6 minutes (healthy) 5432/tcp |
为什么会有个 airflow-test
前缀,因为我的工作目录是 airflow-test
输入命令
docker-compose exec -it airflow bash
进到 airflow 容器
1 2 3 4 5 |
airflow-test docker-compose exec -it airflow bash root@9f40597439df:/opt/airflow# ls airflow.cfg config dags logs plugins simple_auth_manager_passwords.json.generated root@9f40597439df:/opt/airflow# cat simple_auth_manager_passwords.json.generated {"admin": "qC2adgwMqY4mZdZt"} |
有的用户名和密码,登陆 http://localhost:8080 后就是
界面与 Apache Airflow 2 比起来还是清爽了许多。
写一个最简单的 DAG 文件 hello_dag.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
from airflow.decorators import dag, task from datetime import datetime @dag( schedule="@daily", # run once daily start_date=datetime(2024, 1, 1), # start date for DAG catchup=False, # don't backfill tags=["example"], ) def hello_dag(): @task def hello(): print("👋 Hello from Airflow 3.0!") hello() dag = hello_dag() |
放置在 ./dags
目录,过最多 5 分钟,就能在 http://localhost:8080/dags 页面中刷出它来。
在上图中的 hello_dag,我手工触发执行了它一次。
其他 Apache Airflow 3.0 新功能,诸如 Assets, Admin 中的 Variables, Pools, Providers, Plugins, Connections, Config 功能都值得以后就深究。
由于在 docker-compose.yml 配置了 AIRFLOW__CORE__LOAD_EXAMPLES: false 环境变量,如果最初没有该环境变量变量或为 true 的话,我们就会看到许多的示例 Dags 和 Assets。或者直接访问 airflow/airflow-core/src/airflow/example_dags/ 查看官方提供的所有示例
看到这些,我们是否可以借助于 S3 Event 做些什么事情。
其他相关内容
airflow standalone 不建议使用,但个人认为对于只需 Airflow 做简单的事情的应用未尝不可,比如只需要定时调用某个 HTTP 服务,体力活由对应的 HTTP 服务去处理,即纯粹的任务调度。
如果我们查看 airflow standalone
启动了哪些服务,用命令 docker-compose exec airflow ps -ef
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
$ docker-compose exec airflow ps -ef UID PID PPID C STIME TTY TIME CMD root 1 0 0 03:27 ? 00:00:02 /usr/local/bin/python3.12 /usr/local/bin/airflow standalone root 9 1 3 03:27 ? 00:00:25 /usr/local/bin/python3.12 /usr/local/bin/airflow scheduler root 11 1 7 03:27 ? 00:00:53 /usr/local/bin/python3.12 /usr/local/bin/airflow dag-processor root 13 1 0 03:27 ? 00:00:04 /usr/local/bin/python3.12 /usr/local/bin/airflow api-server root 15 1 2 03:27 ? 00:00:16 /usr/local/bin/python3.12 /usr/local/bin/airflow triggerer root 20 13 0 03:27 ? 00:00:00 /usr/local/bin/python3.12 -c from multiprocessing.resource_tracker import main;main(10) root 21 13 1 03:27 ? 00:00:09 /usr/local/bin/python3.12 -c from multiprocessing.spawn import spawn_main; spawn_main(tracke root 22 13 1 03:27 ? 00:00:10 /usr/local/bin/python3.12 -c from multiprocessing.spawn import spawn_main; spawn_main(tracke root 23 13 1 03:27 ? 00:00:09 /usr/local/bin/python3.12 -c from multiprocessing.spawn import spawn_main; spawn_main(tracke root 24 13 1 03:27 ? 00:00:10 /usr/local/bin/python3.12 -c from multiprocessing.spawn import spawn_main; spawn_main(tracke root 25 15 0 03:27 ? 00:00:00 /usr/local/bin/python3.12 /usr/local/bin/airflow triggerer root 26 9 0 03:27 ? 00:00:00 /usr/local/bin/python3.12 /usr/local/bin/airflow scheduler root 27 26 0 03:27 ? 00:00:00 /usr/local/bin/python3.12 /usr/local/bin/airflow scheduler root 28 25 0 03:27 ? 00:00:00 /usr/local/bin/python3.12 /usr/local/bin/airflow triggerer root 29 25 0 03:27 ? 00:00:00 /usr/local/bin/python3.12 /usr/local/bin/airflow triggerer root 30 26 0 03:27 ? 00:00:00 /usr/local/bin/python3.12 /usr/local/bin/airflow scheduler root 31 15 0 03:27 ? 00:00:05 /usr/local/bin/python3.12 /usr/local/bin/airflow triggerer |
所有服务一应具权,精细点控制可以 scheduler, api-server, triggerer 等服务逐个启动,并指定 Replica 个数。基本的 Apache Airflow 都可以从 airflow standalone 开始,运行后根据实际状况对不同部件进行分布式部署,或者引入 Redis + Celery 使用分布式的 Worker -- CeleryExecutor.
docker-compose 的相关操作
docker-compose.yml 中定义的服务是可以单独启动的,如
docker-compose up postgres
docker-compose run --entrypoint bash -p 8080:8080 airflow
airflow db migrate
airflow standalone
airflow db migrate 相当于以前版本的 airflow db init 初始化数据库,兼具从旧 Airflow 数据库升级到新版本。
在 airflow 容器中执行了 airflow standalone 之后会生成 /opt/airflow/airflow.cfg 文件
airflow standalone 默认使用 SimpleAuthManager 方式验证用户,从前面已知它会在启动时生成 admin 的密码,记录在 ${AIRFLOW_HOME}/simple_auth_manager_passwords.json.generated 中。
老式的 airflow users create...
命令已不可用,除非安装了 apache-airflow-providers-fab
pip install apache-airflow-providers-fab
并且在 airflow.cfg 中把
auth_manager = airflow.api_fastapi.auth.managers.simple.simple_auth_manager.SimpleAuthManager
改成了
auth_manager = airflow.providers.fab.auth_manager.fab_auth_manager.FabAuthManager
如此才能用老的方式创建用户
airflow users create --role Admin --username yanbin --email yabqiu@gmail.com --firstname Yanbin --lastname Qiu --password my-password-123
但用户创建了也未必能通过界面验证通过,还等在 airflow.cfg 中配置
1 2 3 |
[webserver] authenticate = True auth_backend = airflow.www.security.FernetAuthBackend # or FabAuthBackend |
或者用环境变量
1 2 |
export AIRFLOW__WEBSERVER__AUTHENTICATE=True export AIRFLOW__WEBSERVER__AUTH_BACKEND=airflow.www.security.FernetAuthBackend |
既然 Apache Airflow 不建议那么使用,还是用新型的方式进行验证吧,比如通过 SSO 登陆,验证 JWT token 而进入到 Airflow 等。
指定 Airflow(webserver 的 context-path),可配置 airflow.cfg
, 在其中加上
1 2 |
[webserver] base_url = http://localhost:8080/airflow |
或者用环境变量
AIRFLOW__API__BASE_URL=http://localhost:8080/airflow
这里 base_url 配置不一定非要明确哪个 IP 地址,简单的指定 http://localhost:8080/airflow, 我们依然可以用实际 IP 地址或域名访问,如 http://192.168.0.100:8080 来访问,相当于只规定的 context-path 为 /airflow。因为 Airflow 的 api-server 服务是启动在 0.0.0.0:8080 上的
api-server | INFO: Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit)
如何不可访问就明确 IP 地址
AIRFLOW__API__BASE_URL=http://192.168.0.100:8080/airflow
或者直接配置为最终使用的域名,如
AIRFLOW__API__BASE_URL=http://example.com/airflow
似乎从中我们需要只是 /airflow 这一部分。
这样才便于进行负载均衡或反向代理的配置,例如要在 Nginx 中配置返回代理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
server { listen 443 ssl; server_name example.com; ssl_certificate <path-to>/fullchain.cer; ssl_certificate_key <path-to>/example.com.key; location / { proxy_pass http://192.168.0.2/; } location /airflow { proxy_pass http://192.168.0.100:8080/airflow/; } } |
对所有 /airflow 的访问代理到 http://192.168.0.100:8080/airflow, 否则代理到 http://192.168.0.2/,如果未能指定 Airflow 的 Context Path, 只是配置
1 2 3 |
location /airflow { proxy_pass http://192.168.0.100:8080/; } |
那第一次访问 http://example.com/airflow 没有问题,但一登陆就回到了 http://example.com/ 或者 dags 的 URL 是 http://example.com/dags, 这样就无法再通过 /airflow 上下文回到 /airflow -> http://192.168.0.100:8080 了.
不用 context-path, 前端使用了 nginx 作反向代理的话,如果响应中有绝对的路径表示法,如
<img src="/abc.img"/>
<a href="https://yanbin.blog/xyz.html/>
就需要用 sub_filter 去替换,肯定会影响一些性能
1 2 3 4 5 6 |
location /airflow { proxy_pass http://192.168.0.100:8080/ sub_filter '<a href="https://yanbin.blog/' '<a href="https://yanbin.blog/airflow'; sub_filter '<img src="/' '<img src"/airflow'; sub_filter_once off; } |
sub_filter 需要用正则表达式考虑到所有情况,一个或多个空格,标签属性的顺序等,所以最好是要用 context-path, 或者用子域名更优。
最后,过程中有什么问题,最快速的办法就是 docker-compose exec -it airflow bash 进到容器中去查,比如用 env 命令看看环境变量是否设置得当,卷映射是否成功。
链接: