我想动态地从另一个dag中调用DAG,例如,当操作符将触发这个父DAG时,它应该会生成新的DAG。
以下代码运行时没有错误,但DAG未创建。
我看不出原因。
def generate_dag():
def do_something():
print('something')
default_args = {
'schedule_interval': '@hourly',
'start_date': datetime(2022, 1, 1),
'is_paused_upon_creation': True
}
dag = DAG("generated_dag",
default_args=default_args,
catchup=False
)
with dag:
do_something = PythonOperator(
task_id="do_something",
python_callable=do_something,
dag=dag
)
do_something
return dag
default_args = {
'owner': 'airflow',
'max_active_runs': 1,
'retries': 0
}
dag = DAG(
'createme',
default_args=default_args,
schedule_interval=None,
start_date=datetime(2019,4,29),
catchup=False,
is_paused_upon_creation=False,
)
def create_dag():
dag = generate_dag()
globals()['generated_dag'] = dag
create = PythonOperator(
task_id="create",
python_callable=create_dag,
dag=dag
)
create我不想在dags文件夹中生成文件。
发布于 2022-03-25 02:37:54
您不需要createme DAG,只需运行create_dag。天文学家有一篇很好的文章,举例说明了https://www.astronomer.io/guides/dynamically-generating-dags/。
from datetime import datetime
from airflow.operators.python import PythonOperator
from airflow import DAG
def generate_dag():
def do_something():
print('something')
default_args = {
'schedule_interval': '@hourly',
'start_date': datetime(2022, 1, 1),
'is_paused_upon_creation': True
}
dag = DAG("generated_dag",
default_args=default_args,
catchup=False
)
with dag:
do_something = PythonOperator(
task_id="do_something",
python_callable=do_something,
dag=dag
)
do_something
return dag
def create_dag():
dag = generate_dag()
globals()['generated_dag'] = dag
create_dag()https://stackoverflow.com/questions/71605031
复制相似问题