我有两个dag: dag_a和dag_b (dag_a -> dag_b),在执行dag_a之后,调用TriggerDagRunOperator,这会启动dag_b。问题是,当dag_b关闭(暂停)时,dag_a的TriggerDagRunOperator会在dag_b中创建调度运行,只要dag_a运行,就会排队。打开dag_b后,队列中的任务将开始执行。我试图为TriggerDagRunOperator找到一个解决方案,即一个conditionally_trigger函数,如果dag_b暂停(OFF),它将跳过TriggerDagRunOperator任务的执行。我该怎么做?
发布于 2022-11-18 18:46:57
您可以使用ShortCircuitOperator执行/跳过下游dag_b。然后,使用气流Rest (或shell/CLI)来确定是否暂停了dag_b。
dag_a = TriggerDagRunOperator(
trigger_dag_id='dag_a',
...
)
pause_check = ShortCircuitOperator(
task_id='pause_check',
python_callable=is_dag_paused,
op_kwargs={
'dag_id': 'dag_b'
}
)
dag_b = TriggerDagRunOperator(
trigger_dag_id='dag_b',
...
)
dag_a >> pause_check >> dag_bis_dag_paused函数可以是这样的。(这里我使用Rest。)
def is_dag_paused(**kwargs):
import requests
from requests.auth import HTTPBasicAuth
dag_id = kwargs['dag_id']
res = requests.get(f'http://{airflow_host}/api/v1/dags/{dag_id}/details',
auth=HTTPBasicAuth('username', 'pasword')) # The auth method could be different for you.
if res.status_code == 200:
rjson = res.json()
# if you return True, the downstream tasks will be executed
# if False, it will be skipped
return not rjson['is_paused']
else:
print('Error: ', res)
exit(1)发布于 2022-11-22 09:14:24
import airflow.settings
from airflow.models import DagModel
def check_status_dag(*op_args):
session = airflow.settings.Session()
qry = session.query(DagModel).filter(DagModel.dag_id == op_args[0])
if not qry.value(DagModel.is_paused):
return op_args[1]
else: return op_args[2]其中,check_status_dag是为执行另一个分支作出选择决策的方法,op_args是被检查为暂停状态的守护进程的dag_id,op_args1和op_args2是根据BranchPythonOperator逻辑的任务名称。
start = DummyOperator(
task_id = 'start',
dag=dag
)
check_dag_B = BranchPythonOperator(
task_id = "check_dag_B",
python_callable = check_status_dag,
op_args = ['dag_B','trigger_dag_B','skip_trigger_dag_B'],
trigger_rule = 'all_done',
dag = dag
)
trigger_dag_B = TriggerDagRunOperator(
task_id = 'trigger_dag_B',
trigger_dag_id = 'dag_B',
dag = dag
)
skip_trigger_dag_B = DummyOperator(
task_id = 'skip_trigger_dag_B',
dag = dag
)
finish = DummyOperator(
task_id = 'finish',
trigger_rule = 'all_done',
dag=dag
)
start >> check_dag_B >> [trigger_dag_B, skip_trigger_dag_B] >> finish#or continue workinghttps://stackoverflow.com/questions/74492876
复制相似问题