首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >气流: DAG运行的监控解决方案

气流: DAG运行的监控解决方案
EN

Stack Overflow用户
提问于 2021-04-13 11:39:17
回答 1查看 674关注 0票数 1

我目前正在尝试设置一个气流监测,理想情况下,当DAG被执行时,它会发送一封电子邮件,在邮件中包含一些有关所有包含的任务的信息,如任务的最终状态、运行时等。

我目前未能解决的问题是:

  • 如何获得与DAG运行关联的所有任务实例的状态?
  • 在DAG中将邮件作为组件发送有意义吗?
  • 如果是这样的话,我怎样才能简单地保证这个任务会在所有其他任务之后运行呢?

除此之外,我还有一个额外的问题,我所寻找的解决方案在某种意义上必须很简单,它要么应该是2-3行代码,要么可以泛化成Python函数,因为我经验较少的同事必须能够理解和再现其他DAG上的步骤。

关于如何建立电子邮件发送的更聪明的想法是非常欢迎的。

谢谢您提前提出的建议!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-04-13 23:17:20

在DAG中将邮件作为组件发送有意义吗?如果是这样的话,我怎样才能简单地保证这个任务会在所有其他任务之后运行呢?

我认为这是实现你想要的目标的一种方式。您可以创建一个任务,将所有“叶子”(没有下游依赖项的任务)连接到具有其他任务状态的DAG状态(在此场景中仍在运行)的最终任务。

代码语言:javascript
复制
    def send_task_summary_t(**context):
        tis = context['dag_run'].get_task_instances()
        for ti in tis:
            print(ti.__dict__)

    dag = DAG(...)

    job_status = PythonOperator(
        task_id='_job_status',
        python_callable=send_task_summary,
        provide_context=True,
        trigger_rule=TriggerRule.ALL_DONE,
        dag=dag
    )

    leaves = [task for task in dag.tasks if not task.downstream_list]
    exclude = ['_job_status']
    for l in leaves:
        if l.task_id not in exclude:
            job_status.set_upstream(l)

如何获得与DAG运行关联的所有任务实例的状态?

与EmailOperator不同,我建议使用PythonOperator,因为您需要包含获取任务状态所需信息的上下文。在上面的片段的基础上,我利用send_email实用工具发送了一封电子邮件。

代码语言:javascript
复制
from airflow.utils.email import send_email

def send_task_summary_t(**context):
    ti = context['task']
    dr = context['dag_run']
    body = ti.render_template(None, "path/to/template", context)
    send_email(to="alan@example.com", subject=f"{dr} summary", html_content=body)

您也可以使用Jinja模板构建您的电子邮件。

代码语言:javascript
复制
<html>
    <body>
       <div>
          <table>
                {% for ti in dag_run.get_task_instances(): -%}
                    <tr>
                        <td class='{{ti.state}}' >
                            <a href='{{ host_server }}/admin/airflow/log?execution_date={{ts}}&task_id={{ti.task_id}}&dag_id={{dag.dag_id}}'>{{ti.state}}</a></td>
                        <td class="{{ti.operator}}">
                            <a href='{{ host_server }}/admin/airflow/graph?root={{ti.task_id}}&dag_id={{dag.dag_id}}&execution_date={{ts}}'>{{ti.task_id}}</a></td>
                        <td><a href='{{ host_server }}/admin/airflow/tree?base_date={{ts}}&num_runs=50&root={{ti.task_id}}&dag_id={{dag.dag_id}}'>{{ti.start_date}}</a></td>
                        <td><a href='{{ host_server }}/admin/airflow/gantt?root={{ti.task_id}}&dag_id={{dag.dag_id}}&execution_date={{ts}}'>{{ti.end_date}}</a></td>
                        <td><a href='{{ host_server }}/admin/airflow/duration?root={{ti.task_id}}&base_date={{ts}}&days=9999&dag_id={{dag.dag_id}}'>{{ti.duration}}</a></td>
                    </tr>
                {% endfor -%}
            </table>
        </div>
    </body>
</html>

另一种可以这样做的方法是为DAG对象利用回调

代码语言:javascript
复制
from airflow.models import DAG
from datetime import datetime

def send_task_summary(context):
    tis = context['dag_run'].get_task_instances()
    for ti in tis:
        print(ti.__dict__)

dag = DAG(
        dag_id='my_dag',
        schedule_interval='@once',
        start_date=datetime(2020, 1, 1),
        on_failure_callback=send_task_summary
) 
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/67074146

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档