首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在触发DAG后安排DAG任务?

如何在触发DAG后安排DAG任务?
EN

Stack Overflow用户
提问于 2019-06-11 21:18:42
回答 1查看 105关注 0票数 0

我正在使用TriggerDagRunOperator,以便一个控制器DAG可以触发一个目标DAG。但是,一旦控制器DAG触发了目标DAG,目标DAG就会切换到"running",但它的任何任务都不会被调度。我希望一旦控制器DAG触发目标DAG,就安排目标DAG的任务。

代码语言:javascript
复制
# Controller DAG's callable
def conditionally_trigger(context, dag_run_object):
   condition_param = context['params']['condition_param']
   if condition_param:
      return dag_run_obj
   return None

# Target DAG's callable
def say_hello():
   print("Hello")

# Controller DAG
controller_dag = DAG(
   dag_id="controller",
   default_args = {
      "owner":"Patrick Stump",
      "start_date":datetime.utcnow(),
   },
   schedule_interval='@once',
)

# Target DAG
target_dag = DAG(
   dag_id="target",
   default_args = {
      "owner":"Patrick Stump",
      "start_date":datetime.utcnow(),
   },
   schedule_interval=None,
)

# Controller DAG's task
controller_task = TriggerDagRunOperator(
   task_id="trigger_dag",
   trigger_dag_id="target",
   python_callable=conditionally_trigger,
   params={'condition_param':True},
   dag=controller_dag,
)

# Target DAG's task -- never scheduled!
target_task = PythonOperator(
   task_id="print_hello",
   python_callable=say_hello,
   dag=target_dag,
)

提前感谢:)

EN

回答 1

Stack Overflow用户

发布于 2019-06-12 21:55:15

问题可能是使用像这样的动态开始日期:"start_date":datetime.utcnow(),

我会重命名dags,并给它们一个开始日期,如2019-01-01,然后重试。

调度程序重复读取DAG,并且每次解析DAG时(utcnow()每次都将计算为新值)更改开始日期时,可能会发生意想不到的事情。

Here is some further reading on start_date.

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56544684

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档