我有两个Dag Dag 1和Dag 2
任务A、B和C在Dag 1中任务D、E和F在Dag 2中
如何实现第三个Dag来利用这些任务?
我想让DAG3使用任务A,然后是任务E,然后是任务C,而不需要重写函数
发布于 2021-08-25 11:12:23
创建一个返回运算符的函数,并在需要的任何DAG中重用它。
示例:
def create_my_opeartor(task_id=None, **kwargs):
#Replace with your actual operator configuration
return MyOperator(task_id=task_id, **kwargs)
def create_my_opeartor_2(task_id=None, **kwargs):
#Replace with your actual operator configuration
return BashOperator(task_id=task_id, bash_command='echo "hello world", **kwargs)
with DAG(
dag_id='DAG1',
default_args=default_args,
schedule_interval=None,
start_date=datetime(2021, 8, 24),
) as dag1:
a = create_my_opeartor(task_id='task_A')
with DAG(
dag_id='DAG2',
default_args=default_args,
schedule_interval=None,
start_date=datetime(2021, 8, 24),
) as dag2:
e = create_my_opeartor_2(task_id='task_E')
with DAG(
dag_id='DAG3',
default_args=default_args,
schedule_interval=None,
start_date=datetime(2021, 8, 24),
) as dag3:
a = create_my_opeartor(task_id='task_A')
e = create_my_opeartor2(task_id='task_E')
a >> ehttps://stackoverflow.com/questions/68921644
复制相似问题