首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >sla_miss_callback发送关于Apache气流中缺少的SLA任务的电子邮件

sla_miss_callback发送关于Apache气流中缺少的SLA任务的电子邮件
EN

Stack Overflow用户
提问于 2021-06-16 07:57:37
回答 1查看 1K关注 0票数 1

我有一个DAG,它是由父DAG .触发的,所以DAG没有在它中定义任何调度间隔,

1.我想在DAG中的一个任务上设置一个sla_miss_callback。

2.当任务错过SLA时,我想得到一个电子邮件通知。

我尝试过谷歌和堆栈溢出中可用的方法。电子邮件没有按预期被触发。

共享用于测试的示例代码。

代码语言:javascript
复制
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import timedelta, datetime
import logging

def print_sla_miss(**kwargs):
    logging.info("SLA missed")


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2021, 1, 1),
    'email': 'sample@xxx.com',
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0
}

with DAG('sla_test', schedule_interval=None, max_active_runs=1, catchup=False,sla_miss_callback=print_sla_miss, default_args=default_args) as dag:

    sleep = BashOperator(
        task_id='timeout',
        sla=timedelta(seconds=5),
        bash_command='sleep 15',
        retries=0,
        dag=dag,
    )

提前谢谢。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-08-11 14:44:38

为了实现我的要求,我已经创建了一个独立的DAG,它每5分钟观察一次任务运行状态,并根据运行状态通过电子邮件通知,就像below.To这样做的一样,我将我的主DAG的执行日期发送到一个气流变量。

代码语言:javascript
复制
#importing operators and modules
from airflow import DAG
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.python_operator import PythonOperator
from airflow.operators.email_operator import EmailOperator
from airflow.api.common.experimental.get_task_instance import get_task_instance
from airflow.models import Variable
from datetime import datetime,timedelta,timezone
import dateutil

#setting default arguments
default_args = {
    'owner': 'test',
    'depends_on_past': False,
    'start_date': datetime(2021, 1, 1),
    'email': ['abc@example.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 0
}

#getting current status of task in main DAG
exec_date = dateutil.parser.parse(Variable.get('main_dag_execution_date'))
ti = get_task_instance('main_dag', 'task_to_check', exec_date)
state = ti.current_state()
start_date = ti.start_date
end_date = ti.end_date
print("start_date",start_date," end_date",end_date, " execution_date",exec_date)

#deciding the action based on status of the task
def check_task_status(**kwargs):

    if state == 'running' and datetime.now(timezone.utc) > start_date + timedelta(minutes = 10):
        breach_mail = 'breach_mail'
        return breach_mail
    elif state == 'failed':
        failure_mail = 'failure_mail'
        return failure_mail
    else:
        other_state = 'other_state'
        return other_state

#print statement when status is not in breached or failed state
def print_current_state(**context):
    if start_date is None:
        print("task is in wait state")
    else:
        print("task is in " + state + " state")


with DAG('sla_check', schedule_interval='0-59/5 9-23 * * *', max_active_runs=1, catchup=False,default_args=default_args) as dag:

    check_task_status = BranchPythonOperator(task_id='check_task_status', python_callable=check_task_status,
                                    provide_context=True,
                                    dag=dag)

    breach_mail = EmailOperator(task_id='breach_mail', to='Abc@example.com',
                                      subject='SLA for task breached',
                                      html_content="<p>Hi,<br><br>task running belyond SLA<br>", dag=dag)

    failure_mail = EmailOperator(task_id='failure_mail', to='Abc@example.com',
                                      subject='task failed',
                                      html_content="<p>Hi,<br><br>task failed. Please check.<br>", dag=dag)

    other_state = PythonOperator(task_id='other_state', python_callable=print_current_state,
                                    provide_context=True,
                                    dag=dag)

    check_task_status >> breach_mail
    check_task_status >> failure_mail
    check_task_status >> other_state
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67998601

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档