在我的气流中有两种类型的DAG:
实现下一个逻辑的正确方法是什么?
每小时三次DT1s触发。一次DT2 -每天触发一次(~10或11 PM)。但是,只有在过去一小时所有三个DT1s都处于“成功”状态时,才能触发DT1s。
我宁愿只使用气流功能来实现它。
发布于 2022-06-13 20:34:44
ExternalTaskSensor就是为此而设计的。
from airflow.sensors.external_task import ExternalTaskSensor文档页面是这里,它显示了它的用法(指定执行日期、成功状态等)。
发布于 2022-06-14 08:28:48
您将使用外部任务会话,但主要的问题是如何使用这些DAG之间的时间安排,因为这些是主要的因素,而且如果我们希望动态交叉DAG依赖项,其中一个DAG将在12:00午夜开始,而另一个DAG将安排在每10分钟或每小时执行这一壮举,我们可以使用execution_delta。
execution_delta 1.Dag_B schedule_interval的关键点应该早于Dag_A,否则将无法工作。2.在执行timedelta对象时,我们可以定义时间和分钟(它是Dag_B和Dag_A调度间隔之间的差异)。3.时间差与以前的执行情况不同,默认的execution_date与当前任务相同。对于昨天,使用积极!datetime.timedelta(days=1)
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.sensors.external_task_sensor import ExternalTaskSensor
import airflow.utils.dates
from datetime import datetime, timedelta
default_args = {
"owner": "airflow",
"start_date": airflow.utils.dates.days_ago(1)
}
with DAG(dag_id="CrossDAG_DAG_A", default_args=default_args, schedule_interval="*/10 * * * *") as dag:
sensor = ExternalTaskSensor(
task_id= 'sensor',
external_dag_id= "CrossDAG_DAG_B",
external_task_id= 't1',
execution_delta= timedelta(minutes =5)
)
last_task = DummyOperator(task_id= "last_task")
sensor >> last_task ```
you can create the second dag according to your requiremnt but i have shared the main logic here. moreover i have shared the complete code in anohter post youc an see that for more explaination.
[answere to another post][1]
[1]: https://stackoverflow.com/questions/72597384/i-have-a-externaltasksensor-error-airflowsensortimeout/72601464?noredirect=1#comment128265090_72601464https://stackoverflow.com/questions/72608382
复制相似问题