Apache Airflow 分支与条件流程
来到稍微复杂一点的流程,虽说 DAG 不能有循环但分支还是可以有的。比如下面的分支流程
Airflow 默认下游 Task 的
本文将要使用到的是分支(BranchPythonOperator), 比如共用一个 DAG, 周中与周末执行不同的分支,或根据条件从不同的数据源采集数据,下流的任务则需在任何一个分支成功即可触发。对上面的流程加上辅助任务,使其表达性更强
(这里埋下一个 Bug, 这个流程永远都不会成功,以后将修复它)
在 determine_branch 中对触发 DAG 时传入的 params 参数进行判断以选择合适的分支,以此我们可对流程进行多样的测试。
上面流程反应在 Airflow UI 中是
(Apache Airflow 用来从简单的 Python 代码生成流程图也不错)
手工触发不执行,不传入额外的 params, 然我们看到运行的效果是
无论我们的 params 选择为何, 选择 task1, task2 或是 task3, 最后 join 和 end 总是被
原因是由于 join 任务要求它所有的上流任务 task1, task2, 和 task3 都要成功才会执行,这种全都要与 branch 的多选一的语义是相违背的。Airflow 不会自动感知它的上上游是否分支来更改 trigger_rule 的默认规则,而是要我们自己进行配置,所以我们得对
即只要它上游没有失败的有一个成功就行,这正好符合分支的要求。或者也可以用另一个规则
Airflow 可选的 trigger_rule 如下
从字面意义都很好理解,只要我们酌情选择就是。
好了,回到我们增加上
这样,只要贯通一个分支,最终的状态就能成功
再来一个测试,我们在触发 DAG 时输入 Configuration JSON
那么根据 determine_branch 中的逻辑就会选择 "task2", 看看运行的效果
没有问题,符合我们的预期。
determine_branch 是一个 Python 函数,所以如果赋予它 *context 参数
可以分解成
或者
除了分支 BranchPythonOperator, 还有 ShortCircuitOperator 也可算作某种形式的分支。
如果执行时一切正常,是这个样子
要是触发是配置 Configuration JSON 为
就是这样
进行触发时配置 Configuration JSON 为
下游跟随着 skipped
假如有兴趣的话还可以对任务抛出 Airflow 的 AirflowTaskTimeout 或 AirflowTaskTerminated 时进行测试,它们与 AirflowFailException 产生的 UI 效果一样,都是显示 failed, upstream_failed. 基本上也就是 AirflowSkipException 能产生特殊的效果,导致当前任务被跳过而非失败。
从这里我们还学到了,由于下游任务的默认 trigger_rule 是 all_success, 所以控制某个任务是否应被执行,可为它挂载一个悬空的上游任务,并在其中判断条件来抛出 AirflowFailExcepiton 或 AirflowSkipException. 永久链接 https://yanbin.blog/apache-airflow-branch-and-condition/, 来自 隔叶黄莺 Yanbin's Blog
[版权声明]
本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。
start >> [task1, task2] >> end在 Airflow UI 中展示出来就是
Airflow 默认下游 Task 的 trigger_rule 是 all_success, 即要求上游的所有 Task 都必须成功才会执行,否则跟随着失败或跳过,这对于并行处理然后汇集结果的应用是合理的。本文将要使用到的是分支(BranchPythonOperator), 比如共用一个 DAG, 周中与周末执行不同的分支,或根据条件从不同的数据源采集数据,下流的任务则需在任何一个分支成功即可触发。对上面的流程加上辅助任务,使其表达性更强
1import pendulum
2from airflow.sdk import DAG
3from airflow.providers.standard.operators.python import BranchPythonOperator
4from airflow.providers.standard.operators.empty import EmptyOperator
5
6def determine_branch(params):
7 return params.get("branch", "task1")
8
9with DAG(
10 dag_id="branch_dag",
11 schedule="@daily",
12 start_date=pendulum.today(),
13):
14
15 start = EmptyOperator(
16 task_id="start"
17 )
18
19 task1 = EmptyOperator(
20 task_id="task1"
21 )
22
23
24 task2 = EmptyOperator(
25 task_id="task2"
26 )
27
28 task3 = EmptyOperator(
29 task_id="task3"
30 )
31
32 join = EmptyOperator(
33 task_id="join"
34 )
35
36 end = EmptyOperator(
37 task_id="end"
38 )
39
40 branch = BranchPythonOperator(
41 task_id="choose_one_branch",
42 python_callable=determine_branch
43 )
44
45 start >> branch >> [task1, task2, task3] >> join >> end(这里埋下一个 Bug, 这个流程永远都不会成功,以后将修复它)
在 determine_branch 中对触发 DAG 时传入的 params 参数进行判断以选择合适的分支,以此我们可对流程进行多样的测试。
上面流程反应在 Airflow UI 中是
(Apache Airflow 用来从简单的 Python 代码生成流程图也不错)手工触发不执行,不传入额外的 params, 然我们看到运行的效果是
无论我们的 params 选择为何, 选择 task1, task2 或是 task3, 最后 join 和 end 总是被 skipped. 当然如果被选择的分支 task1, task2, 或 task3 执行失败的话,join 和 end 也会被标记为失败 -- 还要是 task1, task2, 或 task3 不是 EmptyOperator 的情况下。原因是由于 join 任务要求它所有的上流任务 task1, task2, 和 task3 都要成功才会执行,这种全都要与 branch 的多选一的语义是相违背的。Airflow 不会自动感知它的上上游是否分支来更改 trigger_rule 的默认规则,而是要我们自己进行配置,所以我们得对
join 任务进行修改1 join = EmptyOperator(
2 task_id="join",
3 trigger_rule="none_failed_min_one_success"
4 )即只要它上游没有失败的有一个成功就行,这正好符合分支的要求。或者也可以用另一个规则
none_failed, 只要没有失败的上游任务就执行,在上游为分支的情形下与 non_failed_min_one_success 行为是一致的。Airflow 可选的 trigger_rule 如下
1class TriggerRule(str, Enum):
2 """Class with task's trigger rules."""
3
4 ALL_SUCCESS = "all_success"
5 ALL_FAILED = "all_failed"
6 ALL_DONE = "all_done"
7 ALL_DONE_SETUP_SUCCESS = "all_done_setup_success"
8 ONE_SUCCESS = "one_success"
9 ONE_FAILED = "one_failed"
10 ONE_DONE = "one_done"
11 NONE_FAILED = "none_failed"
12 NONE_SKIPPED = "none_skipped"
13 ALWAYS = "always"
14 NONE_FAILED_MIN_ONE_SUCCESS = "none_failed_min_one_success"
15 ALL_SKIPPED = "all_skipped"从字面意义都很好理解,只要我们酌情选择就是。
好了,回到我们增加上
trigger_rule="none_failed_main_one_sucees" 的 join 任务后,再手工无条件触发执行
这样,只要贯通一个分支,最终的状态就能成功再来一个测试,我们在触发 DAG 时输入 Configuration JSON
1{
2 "branch": "task2"
3}
那么根据 determine_branch 中的逻辑就会选择 "task2", 看看运行的效果
没有问题,符合我们的预期。determine_branch 是一个 Python 函数,所以如果赋予它 *context 参数
determine_branch(**context)则可以根据 context 中的任何条件动态决定取用哪个分支,比如根据上游往 XComs 中放入的数据决定分支
# 默认 key 为 "return_value"注:XComs 是 Airflow 用来跨任务交互数据的机制(cross-communication), 实际的存储在数据库中序列化后的数据,任务的返回值以 task_ids="<task_id>", key 为 "return_value" 自动存储(但必须是可序列化的),或者任务中手工调用 xcom_push 存储
context["task_instance"].xcom_pull(task_ids="taskX") # 或 ti.xcom_pull(task_ids="taskX"), 声明了 ti 参数时
context["task_instance"].xcom_push(key="key", value="value") # 取数据另外, Airflow 重载的
context["task_instance"].xcom_pull(tasks_ids="taskX", key="key")
>> 操作符还是很形像的,而且写法很自由,例如上面的1start >> branch >> [task1, task2, task3] >> join >> end可以分解成
1start >> branch >> [task1, task2, task3]
2task1 >> join >> end
3task2 >> join >> end
4task3 >> join >> end或者
1start >> branch >> [task1, task2, task3]
2[task1, task2, task3] >> join >> end除了分支 BranchPythonOperator, 还有 ShortCircuitOperator 也可算作某种形式的分支。
条件流程
一个任务可根据上游任务的执行状况决定自己是 skipped, failed, upstream_failed, 在此处之所以称作条件流程,倒不如说是任务状态更为合适。一个任务抛出了未知异常会导致任务的失败,如果按照任务的 trigger_rule 未达成它的触发条件会有 skipped 的状态。下面是在任务中主动抛出 AirflowSkipException, AirflowFailException 用以跳过或失败当前任务, 为体验,我们创建如下 DAG 1import pendulum
2from airflow.exceptions import AirflowSkipException, AirflowFailException
3from airflow.providers.standard.operators.bash import BashOperator
4from airflow.providers.standard.operators.empty import EmptyOperator
5from airflow.providers.standard.operators.python import PythonOperator
6from airflow.sdk import DAG
7
8
9def _task3(params: dict):
10 if params.get("fail", False):
11 raise AirflowFailException("Simulated failure in task3")
12 if params.get("skip", False):
13 raise AirflowSkipException("Simulated skip in task3")
14
15with DAG(
16 dag_id="conditional_dag",
17 start_date=pendulum.today("America/Chicago"),
18) as dag:
19 task1 = EmptyOperator(task_id="task1", task_display_name="Task 1")
20 task2 = BashOperator(
21 task_id="task2",
22 bash_command="ls -l",
23 )
24 task3 = PythonOperator(
25 task_id="task3",
26 python_callable=_task3
27 )
28
29 task4 = EmptyOperator(
30 task_id="task4",
31 )
32
33 task1 >> [task2]>> task4
34 task3 >> task4如果执行时一切正常,是这个样子
要是触发是配置 Configuration JSON 为1{
2 "fail": true
3}就是这样
进行触发时配置 Configuration JSON 为1{
2 "skip": true
3}下游跟随着 skipped
假如有兴趣的话还可以对任务抛出 Airflow 的 AirflowTaskTimeout 或 AirflowTaskTerminated 时进行测试,它们与 AirflowFailException 产生的 UI 效果一样,都是显示 failed, upstream_failed. 基本上也就是 AirflowSkipException 能产生特殊的效果,导致当前任务被跳过而非失败。从这里我们还学到了,由于下游任务的默认 trigger_rule 是 all_success, 所以控制某个任务是否应被执行,可为它挂载一个悬空的上游任务,并在其中判断条件来抛出 AirflowFailExcepiton 或 AirflowSkipException. 永久链接 https://yanbin.blog/apache-airflow-branch-and-condition/, 来自 隔叶黄莺 Yanbin's Blog
[版权声明]
本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。