作为一个气流新手,我在看example_branch_operator
"""Example DAG demonstrating the usage of the BranchPythonOperator."""
import random
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import BranchPythonOperator
from airflow.utils.dates import days_ago
args = {
'owner': 'airflow',
}
with DAG(
dag_id='example_branch_operator',
default_args=args,
start_date=days_ago(2),
schedule_interval="@daily",
tags=['example', 'example2'],
) as dag:
run_this_first = DummyOperator(
task_id='run_this_first',
)
options = ['branch_a', 'branch_b', 'branch_c', 'branch_d']
branching = BranchPythonOperator(
task_id='branching',
python_callable=lambda: random.choice(options),
)
run_this_first >> branching
join = DummyOperator(
task_id='join',
trigger_rule='none_failed_or_skipped',
)
for option in options:
t = DummyOperator(
task_id=option,
)
dummy_follow = DummyOperator(
task_id='follow_' + option,
)
branching >> t >> dummy_follow >> join看看join操作符,我希望它收集所有的分支,但是它只是在每个分支的末尾发生的另一个任务。如果执行多个分支,join将多次运行。
(是的,是的,应该是幂等的,但这不是问题的重点)

这是一个错误,一个名不详的任务,还是我遗漏了什么?
发布于 2021-05-19 21:05:09
树视图显示每个DAG根节点的完整分支。聚合在单个任务上的多个分支将被多次显示,但它们只执行一次。请查看此DAG的图形视图:

https://stackoverflow.com/questions/67610636
复制相似问题