我按照Dynamically Generating DAGs in Airflow中的说明动态创建DAGs,通过变量k修改要创建的dags的数量
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
def create_dag(dag_id,
schedule,
dag_number,
default_args):
def hello_world_py(*args):
print('Hello World')
print('This is DAG: {}'.format(str(dag_number)))
dag = DAG(dag_id,
schedule_interval=schedule,
default_args=default_args)
with dag:
t1 = PythonOperator(
task_id='hello_world',
python_callable=hello_world_py,
dag_number=dag_number)
return dag
# build k dags
k = 5
for n in range(1, k + 1):
dag_id = 'hello_world_{}'.format(str(n))
default_args = {'owner': 'airflow',
'start_date': datetime(2018, 1, 1)
}
schedule = '@daily'
dag_number = n
globals()[dag_id] = create_dag(dag_id,
schedule,
dag_number,
default_args)我可以使用UI和CLI检查创建的DAG。两者是同步的:
> airflow dags list
dag_id | filepath | owner | paused
==============+================+=========+=======
hello_world_1 | hello_world.py | airflow | True
hello_world_2 | hello_world.py | airflow | True
hello_world_3 | hello_world.py | airflow | True
hello_world_4 | hello_world.py | airflow | True
hello_world_5 | hello_world.py | airflow | True现在,如果我将k减少到3,则命令行界面只列出预期的3个dags。然而,UI仍然显示5个dags。
如何使UI与要创建的dags数量保持同步?如何在python中以编程方式删除DAG?我想删除DAG,就像我创建DAG一样简单。
发布于 2021-09-08 08:56:21
这是一种删除dag的方法。
from airflow.api.common.experimental.delete_dag import delete_dag
from airflow.utils.session import provide_session
from airflow.models import DagModel
enabled_dags = [] # dynamic enabled_dags
@provide_session
def get_all_dag_ids(session=None):
all_objs = session.query(DagModel).all()
return [i.dag_id for i in all_objs]
all_dag_ids = get_all_dag_ids() # all dag in database
for k in [gk for gk in all_dag_ids if not in enabled_dags]:
delete_dag(k)
del globals()[k]airflow只是加载dag文件,总是添加到数据库中。如果delete dag with dag_id query all dag_id from db,则将其删除。
希望这能对你有所帮助。
https://stackoverflow.com/questions/66284680
复制相似问题