我有一个DAG,它是由父DAG .触发的,所以DAG没有在它中定义任何调度间隔,。
1.我想在DAG中的一个任务上设置一个sla_miss_callback。
2.当任务错过SLA时,我想得到一个电子邮件通知。
我尝试过谷歌和堆栈溢出中可用的方法。电子邮件没有按预期被触发。
共享用于测试的示例代码。
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta, datetime
import logging
def print_sla_miss(**kwargs):
logging.info("SLA missed")
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'email': 'sample@xxx.com',
'email_on_failure': True,
'email_on_retry': False,
'retries': 0
}
with DAG('sla_test', schedule_interval=None, max_active_runs=1, catchup=False,sla_miss_callback=print_sla_miss, default_args=default_args) as dag:
sleep = BashOperator(
task_id='timeout',
sla=timedelta(seconds=5),
bash_command='sleep 15',
retries=0,
dag=dag,
)提前谢谢。
发布于 2021-08-11 14:44:38
为了实现我的要求,我已经创建了一个独立的DAG,它每5分钟观察一次任务运行状态,并根据运行状态通过电子邮件通知,就像below.To这样做的一样,我将我的主DAG的执行日期发送到一个气流变量。
#importing operators and modules
from airflow import DAG
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
from airflow.api.common.experimental.get_task_instance import get_task_instance
from airflow.models import Variable
from datetime import datetime,timedelta,timezone
import dateutil
#setting default arguments
default_args = {
'owner': 'test',
'depends_on_past': False,
'start_date': datetime(2021, 1, 1),
'email': ['abc@example.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 0
}
#getting current status of task in main DAG
exec_date = dateutil.parser.parse(Variable.get('main_dag_execution_date'))
ti = get_task_instance('main_dag', 'task_to_check', exec_date)
state = ti.current_state()
start_date = ti.start_date
end_date = ti.end_date
print("start_date",start_date," end_date",end_date, " execution_date",exec_date)
#deciding the action based on status of the task
def check_task_status(**kwargs):
if state == 'running' and datetime.now(timezone.utc) > start_date + timedelta(minutes = 10):
breach_mail = 'breach_mail'
return breach_mail
elif state == 'failed':
failure_mail = 'failure_mail'
return failure_mail
else:
other_state = 'other_state'
return other_state
#print statement when status is not in breached or failed state
def print_current_state(**context):
if start_date is None:
print("task is in wait state")
else:
print("task is in " + state + " state")
with DAG('sla_check', schedule_interval='0-59/5 9-23 * * *', max_active_runs=1, catchup=False,default_args=default_args) as dag:
check_task_status = BranchPythonOperator(task_id='check_task_status', python_callable=check_task_status,
provide_context=True,
dag=dag)
breach_mail = EmailOperator(task_id='breach_mail', to='Abc@example.com',
subject='SLA for task breached',
html_content="<p>Hi,<br><br>task running belyond SLA<br>", dag=dag)
failure_mail = EmailOperator(task_id='failure_mail', to='Abc@example.com',
subject='task failed',
html_content="<p>Hi,<br><br>task failed. Please check.<br>", dag=dag)
other_state = PythonOperator(task_id='other_state', python_callable=print_current_state,
provide_context=True,
dag=dag)
check_task_status >> breach_mail
check_task_status >> failure_mail
check_task_status >> other_statehttps://stackoverflow.com/questions/67998601
复制相似问题