我正在运行一个使用TriggerDagRunOperator使用不同有效负载触发另一个进程两次的守护进程。
第一个开始运行,但第二个总是失败地说:
sqlalchemy.exc.IntegrityError: (_mysql_exceptions.IntegrityError) (1062, "Duplicate entry 'external_dag-2021-11-01 00:00:00.000000' for key 'dag_id'"两个TriggerDagRunOperator都有不同的execution_dates和"reset_dag_run“设置为true:
x = TriggerDagRunOperator(
task_id="x_external_dag",
trigger_dag_id="external_dag",
python_callable= pass_args_for_x,
execution_date="{{ execution_date }}",
reset_dag_run = True
)
y = TriggerDagRunOperator(
task_id="y_external_dag",
trigger_dag_id="external_dag",
python_callable=pass_args_for_y,
execution_date="{{ ds }}",
reset_dag_run = True
)关于如何解决这个问题,我已经没有什么想法了。任何帮助都将不胜感激。致以问候!
发布于 2021-11-01 23:05:15
我认为这个错误与两个DagRuns都是用同一个execution_date触发的事实有关。"{{ ds }}"和"{{ execution_date }}"是同一值、不同类型、str和DateTime的不同表示形式。还请注意,{{ execution_date }}变量将很快成为已弃用。
要解决错误,请在任务定义期间不要定义execution_date。Docstring没有指定它,但是默认值实际上是None。查看一下TriggerDagRunOperator,的execute()方法,您会发现当未定义时,正在触发的DAG的execution_date被设置为timezone.utcnow()。
按照这条路径,我认为您也不需要设置reset_dag_run = True。我还没有亲自测试过,但应该是:
x = TriggerDagRunOperator(
task_id="x_external_dag",
trigger_dag_id="external_dag",
python_callable= pass_args_for_x
)
y = TriggerDagRunOperator(
task_id="y_external_dag",
trigger_dag_id="external_dag",
python_callable=pass_args_for_y
)如果对你有用就告诉我。
https://stackoverflow.com/questions/69796402
复制相似问题