来到稍微复杂一点的流程,虽说 DAG 不能有循环但分支还是可以有的。比如下面的分支流程
start >> [task1, task2] >> end
在 Airflow UI 中展示出来就是
Airflow 默认下游 Task 的
trigger_rule
是 all_success
, 即要求上游的所有 Task 都必须成功才会执行,否则跟随着失败或跳过,这对于并行处理然后汇集结果的应用是合理的。
本文将要使用到的是分支(BranchPythonOperator), 比如共用一个 DAG, 周中与周末执行不同的分支,或根据条件从不同的数据源采集数据,下流的任务则需在任何一个分支成功即可触发。对上面的流程加上辅助任务,使其表达性更强
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
import pendulum from airflow.sdk import DAG from airflow.providers.standard.operators.python import BranchPythonOperator from airflow.providers.standard.operators.empty import EmptyOperator def determine_branch(params): return params.get("branch", "task1") with DAG( dag_id="branch_dag", schedule="@daily", start_date=pendulum.today(), ): start = EmptyOperator( task_id="start" ) task1 = EmptyOperator( task_id="task1" ) task2 = EmptyOperator( task_id="task2" ) task3 = EmptyOperator( task_id="task3" ) join = EmptyOperator( task_id="join" ) end = EmptyOperator( task_id="end" ) branch = BranchPythonOperator( task_id="choose_one_branch", python_callable=determine_branch ) start >> branch >> [task1, task2, task3] >> join >> end |
(这里埋下一个 Bug, 这个流程永远都不会成功,以后将修复它)
在 determine_branch 中对触发 DAG 时传入的 params 参数进行判断以选择合适的分支,以此我们可对流程进行多样的测试。
上面流程反应在 Airflow UI 中是
(Apache Airflow 用来从简单的 Python 代码生成流程图也不错)
手工触发不执行,不传入额外的 params, 然我们看到运行的效果是
无论我们的 params 选择为何, 选择 task1, task2 或是 task3, 最后 join 和 end 总是被 skipped
. 当然如果被选择的分支 task1, task2, 或 task3 执行失败的话,join 和 end 也会被标记为失败 -- 还要是 task1, task2, 或 task3 不是 EmptyOperator 的情况下。
原因是由于 join 任务要求它所有的上流任务 task1, task2, 和 task3 都要成功才会执行,这种全都要与 branch 的多选一的语义是相违背的。Airflow 不会自动感知它的上上游是否分支来更改 trigger_rule 的默认规则,而是要我们自己进行配置,所以我们得对 join
任务进行修改
1 2 3 4 |
join = EmptyOperator( task_id="join", trigger_rule="none_failed_min_one_success" ) |
即只要它上游没有失败的有一个成功就行,这正好符合分支的要求。或者也可以用另一个规则 none_failed
, 只要没有失败的上游任务就执行,在上游为分支的情形下与 non_failed_min_one_success
行为是一致的。
Airflow 可选的 trigger_rule 如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
class TriggerRule(str, Enum): """Class with task's trigger rules.""" ALL_SUCCESS = "all_success" ALL_FAILED = "all_failed" ALL_DONE = "all_done" ALL_DONE_SETUP_SUCCESS = "all_done_setup_success" ONE_SUCCESS = "one_success" ONE_FAILED = "one_failed" ONE_DONE = "one_done" NONE_FAILED = "none_failed" NONE_SKIPPED = "none_skipped" ALWAYS = "always" NONE_FAILED_MIN_ONE_SUCCESS = "none_failed_min_one_success" ALL_SKIPPED = "all_skipped" |
从字面意义都很好理解,只要我们酌情选择就是。
好了,回到我们增加上 trigger_rule="none_failed_main_one_sucees"
的 join 任务后,再手工无条件触发执行
再来一个测试,我们在触发 DAG 时输入 Configuration JSON
1 2 3 |
{ "branch": "task2" } |
那么根据 determine_branch 中的逻辑就会选择 "task2", 看看运行的效果
没有问题,符合我们的预期。
determine_branch 是一个 Python 函数,所以如果赋予它 *context 参数
determine_branch(**context)
则可以根据 context 中的任何条件动态决定取用哪个分支,比如根据上游往 XComs 中放入的数据决定分支
# 默认 key 为 "return_value"
context["task_instance"].xcom_pull(task_ids="taskX") # 或 ti.xcom_pull(task_ids="taskX"), 声明了 ti 参数时
注:XComs 是 Airflow 用来跨任务交互数据的机制(cross-communication), 实际的存储在数据库中序列化后的数据,任务的返回值以 task_ids="<task_id>", key 为 "return_value" 自动存储(但必须是可序列化的),或者任务中手工调用 xcom_push 存储
context["task_instance"].xcom_push(key="key", value="value")
# 取数据
context["task_instance"].xcom_pull(tasks_ids="taskX", key="key")
另外, Airflow 重载的 >>
操作符还是很形像的,而且写法很自由,例如上面的
1 |
start >> branch >> [task1, task2, task3] >> join >> end |
可以分解成
1 2 3 4 |
start >> branch >> [task1, task2, task3] task1 >> join >> end task2 >> join >> end task3 >> join >> end |
或者
1 2 |
start >> branch >> [task1, task2, task3] [task1, task2, task3] >> join >> end |
除了分支 BranchPythonOperator, 还有 ShortCircuitOperator 也可算作某种形式的分支。
条件流程
一个任务可根据上游任务的执行状况决定自己是 skipped, failed, upstream_failed, 在此处之所以称作条件流程,倒不如说是任务状态更为合适。一个任务抛出了未知异常会导致任务的失败,如果按照任务的 trigger_rule 未达成它的触发条件会有 skipped 的状态。下面是在任务中主动抛出 AirflowSkipException, AirflowFailException 用以跳过或失败当前任务, 为体验,我们创建如下 DAG
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
import pendulum from airflow.exceptions import AirflowSkipException, AirflowFailException from airflow.providers.standard.operators.bash import BashOperator from airflow.providers.standard.operators.empty import EmptyOperator from airflow.providers.standard.operators.python import PythonOperator from airflow.sdk import DAG def _task3(params: dict): if params.get("fail", False): raise AirflowFailException("Simulated failure in task3") if params.get("skip", False): raise AirflowSkipException("Simulated skip in task3") with DAG( dag_id="conditional_dag", start_date=pendulum.today("America/Chicago"), ) as dag: task1 = EmptyOperator(task_id="task1", task_display_name="Task 1") task2 = BashOperator( task_id="task2", bash_command="ls -l", ) task3 = PythonOperator( task_id="task3", python_callable=_task3 ) task4 = EmptyOperator( task_id="task4", ) task1 >> [task2]>> task4 task3 >> task4 |
如果执行时一切正常,是这个样子
要是触发是配置 Configuration JSON 为
1 2 3 |
{ "fail": true } |
就是这样
1 2 3 |
{ "skip": true } |
下游跟随着 skipped
假如有兴趣的话还可以对任务抛出 Airflow 的 AirflowTaskTimeout 或 AirflowTaskTerminated 时进行测试,它们与 AirflowFailException 产生的 UI 效果一样,都是显示 failed, upstream_failed. 基本上也就是 AirflowSkipException 能产生特殊的效果,导致当前任务被跳过而非失败。
从这里我们还学到了,由于下游任务的默认 trigger_rule 是 all_success, 所以控制某个任务是否应被执行,可为它挂载一个悬空的上游任务,并在其中判断条件来抛出 AirflowFailExcepiton 或 AirflowSkipException.
本文链接 https://yanbin.blog/apache-airflow-branch-and-condition/, 来自 隔叶黄莺 Yanbin Blog
[版权声明] 本文采用 署名-非商业性使用-相同方式共享 4.0 国际 (CC BY-NC-SA 4.0) 进行许可。