首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >整个DAG的Airflow depends_on_past

整个DAG的Airflow depends_on_past
EN

Stack Overflow用户
提问于 2017-11-28 22:24:13
回答 3查看 16.1K关注 0票数 13

在airflow中,有没有一种方法可以将depends_on_past用于整个DagRun,而不仅仅是应用于任务?

我每天都有一个DAG,周五的DagRun在第四个任务上出错了,但是周六和周日的DagRuns仍然按计划运行。使用depends_on_past = True会在相同的第4个任务上暂停DagRun,但是前3个任务仍然会运行。

我可以在DagRun DB表中看到一个state列,其中包含星期五DagRun的failed。我想要的是一种方法,配置一个DagRun不启动,如果前一个DagRun失败,不启动和运行,直到找到一个以前失败的任务。

有没有人知道这是否可能?

EN

回答 3

Stack Overflow用户

发布于 2018-04-16 16:32:16

在您的第一个任务中,设置depends_on_past=Truewait_for_downstream=True,只有在最后一次运行成功时,组合才会导致当前的dag-run运行。

因为通过将第一个任务设置为当前dag-run将等待前一个任务(depends_on_past)和所有任务(wait_for_downstream)成功

票数 13
EN

Stack Overflow用户

发布于 2020-05-07 04:02:46

这个问题有点老了,但事实证明,它是谷歌搜索的第一个结果,评分最高的答案显然具有误导性(这让我有点纠结),所以它肯定需要一个合适的答案。虽然第二个被评价的答案应该可以工作,但有一种更干净的方法可以做到这一点,我个人认为使用xcom很丑陋。

Airflow有一个特殊的操作员类,用于监控来自其他dag运行或其他dag的整体任务的状态。因此,我们需要做的是在我们的dag中的所有任务之前添加一个任务,检查上一次运行是否成功。

代码语言:javascript
复制
from airflow.sensors.external_task_sensor import ExternalTaskSensor


previous_dag_run_sensor = ExternalTaskSensor(
    task_id = 'previous_dag_run_sensor',
    dag = our_dag,
    external_dag_id = our_dag.dag_id,
    execution_delta = our_dag.schedule_interval
)

previous_dag_run_sensor.set_downstream(vertices_of_indegree_zero_from_our_dag)
票数 6
EN

Stack Overflow用户

发布于 2017-11-29 05:45:58

一种可能的解决方案是使用xcom

  1. 将2个PythonOperators start_taskend_task添加到DAG。
  2. 使所有其他任务依赖于start_task
  3. 使end_task依赖于所有其他任务(set_upstream)。
  4. end_task将始终将变量last_success = context['execution_date']推送到xcom (xcom_push)。(需要PythonOperators中的provide_context = True )。

xcom的示例用法:

https://github.com/apache/incubator-airflow/blob/master/airflow/example_dags/example_xcom.py

票数 5
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/47533903

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档