我正在学习气流,我看了一个随气流(example_branch_python_dop_operator_3.py)运送的示例DAG。
在本例中,如果执行日期时间的分钟为偶数,则DAG分支到一个分支,如果分钟为奇数,则为另一个分支。此外,DAG将depends_on_past设置为True作为所有任务的默认值。完整的代码是:
args = {
'owner': 'Airflow',
'start_date': airflow.utils.dates.days_ago(2),
'depends_on_past': True,
}
# BranchPython operator that depends on past
# and where tasks may run or be skipped on
# alternating runs
dag = DAG(
dag_id='example_branch_dop_operator_v3',
schedule_interval='*/1 * * * *',
default_args=args,
)
def should_run(**kwargs):
print('------------- exec dttm = {} and minute = {}'.
format(kwargs['execution_date'], kwargs['execution_date'].minute))
if kwargs['execution_date'].minute % 2 == 0:
return "dummy_task_1"
else:
return "dummy_task_2"
cond = BranchPythonOperator(
task_id='condition',
provide_context=True,
python_callable=should_run,
dag=dag,
)
dummy_task_1 = DummyOperator(task_id='dummy_task_1', dag=dag)
dummy_task_2 = DummyOperator(task_id='dummy_task_2', dag=dag)
cond >> [dummy_task_1, dummy_task_2]由于depends_on_past是真的,我本来可以预料到,在第一个DAG运行之后,任务就不能再开始了。每个任务将查看前一个任务的状态,并看到它是skipped,这是不成功的,本质上没有状态挂起。
然而,事实并非如此。以下是树视图中的结果:

如您所见,所有选定的任务都在每个DAG运行中运行。为什么会发生这种情况?我是不是误解了depends_on_past的意思?我认为每个任务都查看了在上一次DAG运行中具有相同task_id的任务的状态。
为了让它运行,我只需在主接口中打开DAG,所以我相信这些都是预定的运行。
发布于 2020-01-12 02:19:47
更改版气流1.7.1 2016-05-19
- Treat SKIPPED and SUCCESS the same way when evaluating depends_on_past=True这里似乎检查了条件:
airflow/ti_deps/deps/prev_dagrun_dep.py (master brunch)
line 75: if previous_ti.state not in {State.SKIPPED, State.SUCCESS}:https://stackoverflow.com/questions/59617464
复制相似问题