首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Apache Airflow -即使关键任务失败,DAG也会注册为成功

Apache Airflow -即使关键任务失败,DAG也会注册为成功
EN

Stack Overflow用户
提问于 2018-08-06 21:04:17
回答 2查看 2.6K关注 0票数 1

我是Apache Airflow的新手,我想编写一个DAG来将源数据库中的一组表中的一些数据移动到目标数据库中的一组表中。我正在尝试设计DAG,以便某人可以简单地为新的源表-->目标表进程编写create tableinsert into脚本,并将它们放入文件夹中。然后,在下一次运行DAG时,DAG将从文件夹中选取脚本并运行新任务。我将我的DAG设置为:

代码语言:javascript
复制
source_data_check_task_1 (Check Operator or ValueCheckOperator)
source_data_check_task_2 (Check Operator or ValueCheckOperator, Trigger on ALL_SUCCESS)
source_data_check_task_3 (Check Operator or ValueCheckOperator, Trigger on ALL_SUCCESS)

source_data_check_task_1 >> source_data_check_task_2 >> source_data_check_task_3

for tbl_name in tbl_name_list:
    tbl_exists_check (Check Operator, trigger on ALL_SUCCESS): check if `new_tbl` exists in database by querying `information_schema`
        tbl_create_task (SQL Operator, trigger on ALL_FAILED): run the `create table` SQL script
    tbl_insert_task (SQL Operator ,trigger on ONE_SUCCESS): run the `insert into` SQL script

    source_data_check_task_3 >> tbl_exists_check
    tbl_exists_check >> tbl_create_task
    tbl_exists_check >> tbl_insert_task
    tbl_create_task >> tbl_insert)task

我在这个设置中遇到了两个问题:(1)如果任何数据质量检查任务失败,tbl_create_task仍然会启动,因为它在ALL_FAILED上触发;(2)无论哪个任务失败,DAG都会显示运行是SUCCESS。如果tbl_exists_check失败了,这是很好的,因为它应该至少失败一次,但如果一些关键任务失败了(比如任何数据质量检查任务),这就不理想了。

有没有办法以不同的方式设置我的DAG来解决这些问题?

实际代码如下:

代码语言:javascript
复制
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.check_operator import ValueCheckOperator, CheckOperator
from airflow.operators.bash_operator import BashOperator
from airflow.models import Variable
from datetime import datetime, timedelta
from airflow.utils.trigger_rule import TriggerRule

sql_path = Variable.get('sql_path')

default_args = {
    'owner': 'enmyj',
    'depends_on_past':True,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0
}

dag = DAG(
    'test', 
    default_args=default_args, 
    schedule_interval=None,
    template_searchpath=sql_path
)

# check number of weeks in bill pay (made up example)
check_one = CheckOperator(
    task_id='check_one',
    conn_id='conn_name',
    sql="""select count(distinct field) from dbo.table having count(distinct field) >= 4 """,
    dag=dag
)

check_two = CheckOperator(
    task_id='check_two',
    conn_id='conn_name',
    sql="""select count(distinct field) from dbo.table having count(distinct field) <= 100""",
    dag=dag
)

check_one >> check_two

ls = ['foo','bar','baz','quz','apple']
for tbl_name in ls:
    exists = CheckOperator(
        task_id='tbl_exists_{}'.format(tbl_name),
        conn_id='conn_name',
        sql =""" select count(*) from information_schema.tables where table_schema = 'test' and table_name = '{}' """.format(tbl_name),
        trigger_rule=TriggerRule.ALL_SUCCESS,
        depends_on_past=True,
        dag = dag
    )

    create = PostgresOperator(
        task_id='tbl_create_{}'.format(tbl_name),
        postgres_conn_id='conn_name',
        database='triforcedb',
        sql = 'create table test.{} (like dbo.source)'.format(tbl_name), # will be read from SQL file
        trigger_rule=TriggerRule.ONE_FAILED,
        depends_on_past=True,
        dag = dag
    )

    insert = PostgresOperator(
        task_id='tbl_insert_{}'.format(tbl_name),
        postgres_conn_id='conn_name',
        database='triforcedb',
        sql = 'insert into test.{} (select * from dbo.source limit 10)'.format(tbl_name), # will be read from SQL file
        trigger_rule=TriggerRule.ONE_SUCCESS,
        depends_on_past=True,
        dag = dag
    )

    check_two >> exists
    exists >> create
    create >> insert
    exists >> insert
EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2018-08-06 21:39:32

您有一个利用BranchPythonOperator的完美用例,它将允许您执行检查以查看表是否存在,然后在插入到该表之前继续创建表,而不必担心TRIGGER_RULES,并从UI中使您的DAG逻辑更加清晰。

票数 3
EN

Stack Overflow用户

发布于 2018-08-10 03:37:56

下面是我最后使用的代码。这个解决方案解决了我上面的两个问题: 1.如果上游任务失败,它不会触发tbl_create任务2.如果有任何check任务失败,DAG会注册为FAILED。我觉得这个解决方案有点混乱,我希望有改进的建议,或者让它更“空气流动”的方法。

代码语言:javascript
复制
from airflow.models import DAG
from airflow.models import Variable
from airflow.operators.postgres_operator import PostgresOperator
from airflow.operators.check_operator import ValueCheckOperator, CheckOperator
from airflow.operators.python_operator import BranchPythonOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.utils.trigger_rule import TriggerRule
from datetime import datetime, timedelta
from airflow.hooks.postgres_hook import PostgresHook

sql_path = Variable.get('sql_path')

default_args = {
    'owner': 'enmyj',
    'depends_on_past':False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0
}

dag = DAG(
    'test', 
    default_args=default_args, 
    schedule_interval=None,
    template_searchpath=sql_path
)

# check number of weeks in bill pay (made up example)
check_one = CheckOperator(
    task_id='check_one',
    conn_id='conn_id',
    sql="""select count(distinct field) from dbo.table having count(distinct field) >= 4 """,
    dag=dag
)

def check_two_func():
    p = Hook('conn_id')
    sql="""select count(distinct field) from dbo.table having count(distinct field) <= 100"""
    count = p.get_records(sql)[0][0]
    if count == 0: 
        return 'dummy_fail'
    else:
        return 'dummy_success'


check_two = BranchPythonOperator(
    task_id = 'check_two',
    python_callable = check_two_func,
    dag=dag
)

dummy_fail = DummyOperator(task_id='dummy_fail',dag=dag)
dummy_success = DummyOperator(task_id='dummy_success',dag=dag)
join = DummyOperator(task_id='join',dag=dag)

check_one >> check_two
check_two >> dummy_fail
check_two >> dummy_success

ls = ['foo','bar','baz','quz','apple']
for tbl_name in ls:
    def has_table(tbl_name=tbl_name):
        p = PostgresHook('conn_id')
        sql =""" select count(*) from information_schema.tables where table_schema = 'test' and table_name = '{}' """.format(tbl_name)
        count = p.get_records(sql)[0][0] #unpack the list/tuple

        # If the query didn't return rows, branch to create table
        # otherwise, branch to dummy
        if count == 0:
            return 'tbl_create_{}'.format(tbl_name)
        else:
            return 'dummy_{}'.format(tbl_name) 

    exists = BranchPythonOperator(
        task_id='tbl_exists_{}'.format(tbl_name),
        python_callable=has_table,
        depends_on_past=False,
        dag=dag
    )

    create = PostgresOperator(
        task_id='tbl_create_{}'.format(tbl_name),
        postgres_conn_id='conn_id',
        database='database_name',
        sql = 'create table test.{} (like dbo.source)'.format(tbl_name), # will be read from SQL file
        dag = dag
    )

    insert = PostgresOperator(
        task_id='tbl_insert_{}'.format(tbl_name),
        postgres_conn_id='conn_id',
        database='database_name',
        sql = 'insert into test.{} (select * from dbo.source limit 10)'.format(tbl_name), # will be read from SQL file
        trigger_rule=TriggerRule.ONE_SUCCESS,
        dag = dag
    )

    dummy_success >> exists
    exists >> create >> insert 
    exists >> dummy >> insert
    insert >> join
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51708356

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档