试图设置一个简单的电报通知,dag立即在没有日志的情况下失败,如果我使用除air字节以外的任何操作员,一切都正常,如果我删除电报操作员仍然工作良好,下面的代码,有人知道原因可能是什么吗?
from airflow import DAG
from airflow.utils.dates import days_ago
from airflow.providers.airbyte.operators.airbyte import AirbyteTriggerSyncOperator
from airflow.providers.telegram.operators.telegram import TelegramOperator
def on_fail_callback(context):
send_message = TelegramOperator(
task_id='send_message_telegram',
telegram_conn_id='telegram_id',
chat_id='-1001426579030',
text=f"""
\ud83d\ude21 Task has failed.
Task: {context.get('task_instance').task_id}
DAG: {context.get('task_instance').dag_id}
Execution Time: {context.get('execution_date')}
Exception: {context.get('exception')}
""",
dag=dag)
return send_message.execute(context=context)
with DAG(dag_id='trigger_airbyte_exchangerates',
default_args={'owner': 'airflow'},
schedule_interval="30 0 * * *",
on_failure_callback=on_fail_callback,
start_date=days_ago(1)
) as dag:
exchangerates_to_bq = AirbyteTriggerSyncOperator(
task_id='airbyte_exchangerates',
airbyte_conn_id='Airbyte',
connection_id='3ab557d2-7a00-4c09-8437-003be6c0b274',
asynchronous=False,
timeout=3600,
wait_seconds=3
)与其他接线员一起,电报的讯息会立即发出。
发布于 2022-08-24 07:56:31
我不使用AirbyteTriggerSyncOperator。但是,如果我需要电报通知,当任务中断时,我使用:
send_telegram_message = TelegramOperator(
task_id='send_telegram_message',
telegram_conn_id='Telegram',
chat_id='',
text='',
dag=dag,
trigger_rule='one_failed'
)然后: list_all_my_task >> send_telegram_message
只有当最重要的任务之一中断时,才会发送电报信息。否则,将跳过电报通知。使用“触发规则”。
https://stackoverflow.com/questions/71574767
复制相似问题