首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >气流on_failure_callback

气流on_failure_callback
EN

Stack Overflow用户
提问于 2021-01-30 11:42:27
回答 1查看 17.5K关注 0票数 11

我有一个气流DAG,它有两个任务:

  • read_csv
  • process_file

他们自己工作得很好。我故意在熊猫数据栏中创建了一个错误,以了解on_failure_callback是如何工作的,并查看它是否被触发。从日志上看,它似乎没有:

代码语言:javascript
复制
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/airflow/models/taskinstance.py", line 1197, in handle_failure
    task.on_failure_callback(context)
TypeError: on_failure_callback() takes 0 positional arguments but 1 was given

为什么on_failure_callback不能工作?

以下是DAG的可视化表示:

以下是代码:

代码语言:javascript
复制
try:

    from datetime import timedelta
    from airflow import DAG
    from airflow.operators.python_operator import PythonOperator
    from datetime import datetime
    import pandas as pd

    # Setting up Triggers
    from airflow.utils.trigger_rule import TriggerRule

    # for Getting Variables from airlfow
    from airflow.models import Variable

    print("All Dag modules are ok ......")
except Exception as e:
    print("Error  {} ".format(e))


def read_csv(**context):
    data = [{"name":"Soumil","title":"Full Stack Software Engineer"}, { "name":"Nitin","title":"Full Stack Software Engineer"},]
    df = pd.DataFramee(data=data)

    dag_config = Variable.get("VAR1")
    print("VAR 1 is : {} ".format(dag_config))
    context['ti'].xcom_push(key='mykey', value=df)


def process_file(**context):
    instance = context.get("ti").xcom_pull(key='mykey')
    print(instance.head(2))
    return "Process complete "


def on_failure_callback(**context):
    print("Fail works  !  ")



with DAG(dag_id="invoices_dag",
         schedule_interval="@once",
         default_args={
             "owner": "airflow",
             "start_date": datetime(2020, 11, 1),
             "retries": 1,
             "retry_delay": timedelta(minutes=1),
             'on_failure_callback': on_failure_callback,
         },
         catchup=False) as dag:

    read_csv = PythonOperator(
        task_id="read_csv",
        python_callable=read_csv,
        op_kwargs={'filename': "Soumil.csv"},
        provide_context=True
    )

    process_file = PythonOperator(
        task_id="process_file",
        python_callable=process_file,
        provide_context=True
    )




read_csv >> process_file





# ====================================Notes====================================

# all_success           -> triggers when all tasks arecomplete
# one_success           -> trigger when one task is complete
# all_done              -> Trigger when all Tasks are Done
# all_failed            -> Trigger when all task Failed
# one_failed            -> one task is failed
# none_failed           -> No Task Failed

# ==============================================================================



# ============================== Executor====================================

# There are Three main  types of executor
# -> Sequential Executor  run single task in linear fashion wih no parllelism default Dev
# -> Local Exector  run each task in seperate process
# -> Celery Executor Run each worker node within multi node architecture Most scalable

# ===========================================================================
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-01-30 13:14:04

您需要为您的函数指定一个参数,该参数可以接收上下文,这是由于气流如何触发回调

代码语言:javascript
复制
def on_failure_callback(context):
    print("Fail works  !  ")

请注意,对于您的实现,您无法从消息中判断哪个任务失败了,因此您可能希望在错误消息中添加以下任务详细信息:

代码语言:javascript
复制
def on_failure_callback(context):
    ti = context['task_instance']
    print(f"task {ti.task_id } failed in dag { ti.dag_id } ")
票数 14
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65967548

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档