首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >从另一个DAG生成DAG

从另一个DAG生成DAG
EN

Stack Overflow用户
提问于 2022-03-24 15:15:26
回答 1查看 237关注 0票数 0

我想动态地从另一个dag中调用DAG,例如,当操作符将触发这个父DAG时,它应该会生成新的DAG。

以下代码运行时没有错误,但DAG未创建。

我看不出原因。

代码语言:javascript
复制
def generate_dag():

    def do_something():
        print('something')

    default_args = {            
        'schedule_interval': '@hourly',
        'start_date': datetime(2022, 1, 1),
        'is_paused_upon_creation': True
    }

    dag = DAG("generated_dag",
              default_args=default_args,
              catchup=False
              )
    with dag:
        do_something = PythonOperator(
            task_id="do_something",
            python_callable=do_something,
            dag=dag
        )
        do_something

    return dag



default_args = {
    'owner': 'airflow',       
    'max_active_runs': 1,
    'retries': 0
}

dag = DAG(
    'createme',
    default_args=default_args,
    schedule_interval=None,
    start_date=datetime(2019,4,29),
    catchup=False,
    is_paused_upon_creation=False,
)

def create_dag():
    dag = generate_dag()
   
    globals()['generated_dag'] = dag

create = PythonOperator(
    task_id="create",
    python_callable=create_dag,    
    dag=dag
)    
create

我不想在dags文件夹中生成文件

EN

回答 1

Stack Overflow用户

发布于 2022-03-25 02:37:54

您不需要createme DAG,只需运行create_dag。天文学家有一篇很好的文章,举例说明了https://www.astronomer.io/guides/dynamically-generating-dags/

代码语言:javascript
复制
from datetime import datetime
from airflow.operators.python import PythonOperator
from airflow import DAG


def generate_dag():

    def do_something():
        print('something')

    default_args = {            
        'schedule_interval': '@hourly',
        'start_date': datetime(2022, 1, 1),
        'is_paused_upon_creation': True
    }

    dag = DAG("generated_dag",
              default_args=default_args,
              catchup=False
              )
    with dag:
        do_something = PythonOperator(
            task_id="do_something",
            python_callable=do_something,
            dag=dag
        )
        do_something

    return dag

def create_dag():
    dag = generate_dag()
   
    globals()['generated_dag'] = dag

create_dag()
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71605031

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档