我目前正在尝试设置一个气流监测,理想情况下,当DAG被执行时,它会发送一封电子邮件,在邮件中包含一些有关所有包含的任务的信息,如任务的最终状态、运行时等。
我目前未能解决的问题是:
除此之外,我还有一个额外的问题,我所寻找的解决方案在某种意义上必须很简单,它要么应该是2-3行代码,要么可以泛化成Python函数,因为我经验较少的同事必须能够理解和再现其他DAG上的步骤。
关于如何建立电子邮件发送的更聪明的想法是非常欢迎的。
谢谢您提前提出的建议!
发布于 2021-04-13 23:17:20
在DAG中将邮件作为组件发送有意义吗?如果是这样的话,我怎样才能简单地保证这个任务会在所有其他任务之后运行呢?
我认为这是实现你想要的目标的一种方式。您可以创建一个任务,将所有“叶子”(没有下游依赖项的任务)连接到具有其他任务状态的DAG状态(在此场景中仍在运行)的最终任务。
def send_task_summary_t(**context):
tis = context['dag_run'].get_task_instances()
for ti in tis:
print(ti.__dict__)
dag = DAG(...)
job_status = PythonOperator(
task_id='_job_status',
python_callable=send_task_summary,
provide_context=True,
trigger_rule=TriggerRule.ALL_DONE,
dag=dag
)
leaves = [task for task in dag.tasks if not task.downstream_list]
exclude = ['_job_status']
for l in leaves:
if l.task_id not in exclude:
job_status.set_upstream(l)如何获得与DAG运行关联的所有任务实例的状态?
与EmailOperator不同,我建议使用PythonOperator,因为您需要包含获取任务状态所需信息的上下文。在上面的片段的基础上,我利用send_email实用工具发送了一封电子邮件。
from airflow.utils.email import send_email
def send_task_summary_t(**context):
ti = context['task']
dr = context['dag_run']
body = ti.render_template(None, "path/to/template", context)
send_email(to="alan@example.com", subject=f"{dr} summary", html_content=body)您也可以使用Jinja模板构建您的电子邮件。
<html>
<body>
<div>
<table>
{% for ti in dag_run.get_task_instances(): -%}
<tr>
<td class='{{ti.state}}' >
<a href='{{ host_server }}/admin/airflow/log?execution_date={{ts}}&task_id={{ti.task_id}}&dag_id={{dag.dag_id}}'>{{ti.state}}</a></td>
<td class="{{ti.operator}}">
<a href='{{ host_server }}/admin/airflow/graph?root={{ti.task_id}}&dag_id={{dag.dag_id}}&execution_date={{ts}}'>{{ti.task_id}}</a></td>
<td><a href='{{ host_server }}/admin/airflow/tree?base_date={{ts}}&num_runs=50&root={{ti.task_id}}&dag_id={{dag.dag_id}}'>{{ti.start_date}}</a></td>
<td><a href='{{ host_server }}/admin/airflow/gantt?root={{ti.task_id}}&dag_id={{dag.dag_id}}&execution_date={{ts}}'>{{ti.end_date}}</a></td>
<td><a href='{{ host_server }}/admin/airflow/duration?root={{ti.task_id}}&base_date={{ts}}&days=9999&dag_id={{dag.dag_id}}'>{{ti.duration}}</a></td>
</tr>
{% endfor -%}
</table>
</div>
</body>
</html>另一种可以这样做的方法是为DAG对象利用回调。
from airflow.models import DAG
from datetime import datetime
def send_task_summary(context):
tis = context['dag_run'].get_task_instances()
for ti in tis:
print(ti.__dict__)
dag = DAG(
dag_id='my_dag',
schedule_interval='@once',
start_date=datetime(2020, 1, 1),
on_failure_callback=send_task_summary
) https://stackoverflow.com/questions/67074146
复制相似问题