首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >conditionally_trigger for TriggerDagRunOperator

conditionally_trigger for TriggerDagRunOperator
EN

Stack Overflow用户
提问于 2022-11-18 16:45:36
回答 2查看 23关注 0票数 0

我有两个dag: dag_a和dag_b (dag_a -> dag_b),在执行dag_a之后,调用TriggerDagRunOperator,这会启动dag_b。问题是,当dag_b关闭(暂停)时,dag_a的TriggerDagRunOperator会在dag_b中创建调度运行,只要dag_a运行,就会排队。打开dag_b后,队列中的任务将开始执行。我试图为TriggerDagRunOperator找到一个解决方案,即一个conditionally_trigger函数,如果dag_b暂停(OFF),它将跳过TriggerDagRunOperator任务的执行。我该怎么做?

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2022-11-18 18:46:57

您可以使用ShortCircuitOperator执行/跳过下游dag_b。然后,使用气流Rest (或shell/CLI)来确定是否暂停了dag_b

代码语言:javascript
复制
dag_a = TriggerDagRunOperator(
    trigger_dag_id='dag_a',
    ...
)

pause_check = ShortCircuitOperator(
    task_id='pause_check',
    python_callable=is_dag_paused,
    op_kwargs={
        'dag_id': 'dag_b'
    }
)

dag_b = TriggerDagRunOperator(
    trigger_dag_id='dag_b',
    ...
)

dag_a >> pause_check >> dag_b

is_dag_paused函数可以是这样的。(这里我使用Rest。)

代码语言:javascript
复制
def is_dag_paused(**kwargs):
    import requests
    from requests.auth import HTTPBasicAuth
    
    dag_id = kwargs['dag_id']
    res = requests.get(f'http://{airflow_host}/api/v1/dags/{dag_id}/details',
                       auth=HTTPBasicAuth('username', 'pasword'))  # The auth method could be different for you. 

    if res.status_code == 200:
        rjson = res.json()
        # if you return True, the downstream tasks will be executed
        # if False, it will be skipped
        return not rjson['is_paused']
    else:
        print('Error: ', res)
        exit(1)
票数 1
EN

Stack Overflow用户

发布于 2022-11-22 09:14:24

代码语言:javascript
复制
import airflow.settings
from airflow.models import DagModel
def check_status_dag(*op_args):
    session = airflow.settings.Session()
    qry = session.query(DagModel).filter(DagModel.dag_id == op_args[0])
    if not qry.value(DagModel.is_paused):
        return op_args[1]
    else: return op_args[2]

其中,check_status_dag是为执行另一个分支作出选择决策的方法,op_args是被检查为暂停状态的守护进程的dag_id,op_args1和op_args2是根据BranchPythonOperator逻辑的任务名称。

代码语言:javascript
复制
start = DummyOperator(
    task_id = 'start',
    dag=dag
    )

check_dag_B = BranchPythonOperator(
    task_id = "check_dag_B",
    python_callable = check_status_dag,
    op_args = ['dag_B','trigger_dag_B','skip_trigger_dag_B'],
    trigger_rule = 'all_done',
    dag = dag
)

trigger_dag_B = TriggerDagRunOperator(
    task_id = 'trigger_dag_B',
    trigger_dag_id = 'dag_B',
    dag = dag
)

skip_trigger_dag_B = DummyOperator(
    task_id = 'skip_trigger_dag_B',
    dag = dag
)

finish = DummyOperator(
    task_id = 'finish',
    trigger_rule = 'all_done',
    dag=dag
)

start >> check_dag_B >> [trigger_dag_B, skip_trigger_dag_B] >> finish#or continue working
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/74492876

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档