如何在不进入日志的情况下获得操作符失败的原因。因为我想通过松弛发布原因作为通知吗?
谢了,习
发布于 2022-04-05 22:36:27
我可以想到这样做的一种方式如下。
https://www.astronomer.io/guides/error-notifications-in-airflow/
检查上面的SlackAPIPostOperator
发布于 2022-04-06 00:25:48
Exception=context.get(‘exception’)是给出故障的确切原因的函数
使用slack的on_failure_callback示例:
step_checker = EmrStepSensor(task_id='watch_step',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow',
key='return_value') }}",
step_id="{{task_instance.xcom_pull(task_ids='add_steps',key='return_value')[0] }}",
aws_conn_id='aws_default',
on_failure_callback=task_fail_slack_alert,)
def task_fail_slack_alert(context):
SLACK_CONN_ID = 'slack'
slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
slack_msg = """
:red_circle: Task Failed.
*Task*: {task}
*Dag*: {dag}
*Execution Time*: {exec_date}
*Log Url*: {log_url}
*Error*:{exception}
""".format(
task=context.get('task_instance').task_id,
dag=context.get('task_instance').dag_id,
exec_date=context.get('execution_date'),
log_url=context.get('task_instance').log_url,
exception=context.get('exception')
)
failed_alert = SlackWebhookOperator(
task_id='slack_test',
http_conn_id='slack',
webhook_token=slack_webhook_token,
message=slack_msg,
username='airflow',
dag=dag)
return failed_alert.execute(context=context)https://stackoverflow.com/questions/71746014
复制相似问题