首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何动态删除DAG?

如何动态删除DAG?
EN

Stack Overflow用户
提问于 2021-02-20 04:37:14
回答 1查看 171关注 0票数 2

我按照Dynamically Generating DAGs in Airflow中的说明动态创建DAGs,通过变量k修改要创建的dags的数量

代码语言:javascript
复制
from datetime import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

def create_dag(dag_id,
               schedule,
               dag_number,
               default_args):

    def hello_world_py(*args):
        print('Hello World')
        print('This is DAG: {}'.format(str(dag_number)))

    dag = DAG(dag_id,
              schedule_interval=schedule,
              default_args=default_args)

    with dag:
        t1 = PythonOperator(
            task_id='hello_world',
            python_callable=hello_world_py,
            dag_number=dag_number)

    return dag


# build k dags
k = 5
for n in range(1, k + 1):
    dag_id = 'hello_world_{}'.format(str(n))

    default_args = {'owner': 'airflow',
                    'start_date': datetime(2018, 1, 1)
                    }

    schedule = '@daily'

    dag_number = n

    globals()[dag_id] = create_dag(dag_id,
                                  schedule,
                                  dag_number,
                                  default_args)

我可以使用UI和CLI检查创建的DAG。两者是同步的:

代码语言:javascript
复制
> airflow dags list
dag_id        | filepath       | owner   | paused
==============+================+=========+=======
hello_world_1 | hello_world.py | airflow | True
hello_world_2 | hello_world.py | airflow | True
hello_world_3 | hello_world.py | airflow | True
hello_world_4 | hello_world.py | airflow | True
hello_world_5 | hello_world.py | airflow | True

现在,如果我将k减少到3,则命令行界面只列出预期的3个dags。然而,UI仍然显示5个dags。

如何使UI与要创建的dags数量保持同步?如何在python中以编程方式删除DAG?我想删除DAG,就像我创建DAG一样简单。

EN

回答 1

Stack Overflow用户

发布于 2021-09-08 08:56:21

这是一种删除dag的方法。

代码语言:javascript
复制
from airflow.api.common.experimental.delete_dag import delete_dag
from airflow.utils.session import provide_session
from airflow.models import DagModel

enabled_dags = [] # dynamic enabled_dags

@provide_session
def get_all_dag_ids(session=None):
    all_objs = session.query(DagModel).all()
    return [i.dag_id for i in all_objs]

all_dag_ids = get_all_dag_ids() # all dag in database

for k in [gk for gk in all_dag_ids if not in enabled_dags]:
    delete_dag(k)
    del globals()[k]

airflow只是加载dag文件,总是添加到数据库中。如果delete dag with dag_id query all dag_id from db,则将其删除。

希望这能对你有所帮助。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66284680

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档