首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >airflow.exceptions.DuplicateTaskIdFound异常

airflow.exceptions.DuplicateTaskIdFound异常
EN

Stack Overflow用户
提问于 2022-06-17 11:28:57
回答 1查看 176关注 0票数 0

嗨,我需要定义一个带有任务的DAG,任务必须每天调用4次。但是,当我发布到DAG时,我得到了一个airflow.exceptions.DuplicationTaskIdFound错误,请参见下面的DAG定义:

代码语言:javascript
复制
import sys
import time
from airflow.models import DAG,Variable
from airflow.operators.bash import BashOperator
from datetime import datetime,timedelta
from airflow.operators.dummy import DummyOperator
from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.operators.python import PythonOperator
from airflow.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor
from airflow.utils.dates import days_ago
import pendulum
sys.path.append("../..")
from common.airflow_dep_coordinator import airfow_coordinator
from common import airflow_utli
from common import airflow_config

default_args = airflow_utli.default_args
default_args['owner'] = 'bi'
default_args['sla'] = timedelta(hours=6)
default_args['retries'] = 3
default_args['start_date'] = datetime(2022,4,29, tzinfo=airflow_utli.local_tz) #TODO please override start date of this dag

airflow_config.EMAIL_RECIEVER_LIST=Variable.get("bi_email_reciever_list",deserialize_json=True , default_var=None)

# ---------------------------------------------------------------------------
#Get data string
# @beforeOfDay, the day before current day
# @with_dash, whether the result string contains dash, default False
# ---------------------------------------------------------------------------
def getdate(beforeOfDay, with_dash=False):
        today = datetime.now()
        offset = timedelta(days=-beforeOfDay)
        str_date_format ='%Y%m%d'
        if(with_dash):
            str_date_format ='%Y-%m-%d'
        date_str = (today + offset).strftime(str_date_format)
        return date_str

# ---------------------------------------------------------------------------
#Get etl task, return SSHOperator
# ---------------------------------------------------------------------------
def batch_etl_task(dag,target_table_name, param=None,ssh_conn_id=SSHHOOK_NAME):
    sh_command =  COMMAND_BASE+ COMMAND.format(target_table_name=target_table_name, param=param)
    print(sh_command)
    task = SSHOperator(
        dag=dag, task_id=target_table_name, ssh_conn_id=ssh_conn_id, command=sh_command
    )
    return task

# =========================================================================
# DAG definition
# =========================================================================
dag=DAG(
    dag_id='lakehouse_dws_otp_app_user_daily', 
    default_args=default_args,
    tags=['bi','dws','otp','app'], 
    schedule_interval='00 04 * * *', 
    dagrun_timeout=timedelta(hours=6),
    concurrency=12,
    catchup=False,
    params={"pday": getdate(2),"fday":getdate(1),
            "pdaym1": getdate(3),"fdaym1":getdate(2),
            "pdaym7": getdate(9),"fdaym7":getdate(8),
            "pdaym30": getdate(32),"fdaym30":getdate(31)
           },
    sla_miss_callback=airflow_utli.default_sla_callback
)

#etl task
task_dws_fact_com_otp_app_visitor_snp_t=batch_etl_task(dag=dag,target_table_name='AnotherTargetTable',param='{{ params.pday }} {{params.fday}}')

task_dws_fact_com_otp_app_active_retention_snp_t=batch_etl_task(dag=dag,target_table_name='target_table',param='{{ params.pday }} {{params.fday}}')
task_dws_fact_com_otp_app_active_retention_snp_t_m1=batch_etl_task(dag=dag,target_table_name='target_table',param='{{ params.pdaym1 }} {{params.fdaym1}}')
task_dws_fact_com_otp_app_active_retention_snp_t_m7=batch_etl_task(dag=dag,target_table_name='target_table',param='{{ params.pdaym7 }} {{params.fdaym7}}')
task_dws_fact_com_otp_app_active_retention_snp_t_m30=batch_etl_task(dag=dag,target_table_name='target_table',param='{{ params.pdaym30 }} {{params.fdaym30}}')

### End
end_dws_otp_register_daily=DummyOperator(
task_id='end_dws_otp_register_daily',
dag=dag)

### Dependence, TODO please setup the dependency tree
[task_dws_fact_com_otp_app_new_visitor_t,task_dws_fact_com_otp_app_active_au_snp_t]>>task_dws_fact_com_otp_app_active_retention_snp_t>>task_dws_fact_com_otp_app_active_retention_snp_t_m1

正如您所看到的,我需要调用etl来用不同的参数更新4次target_table,但这不是工作,似乎DAG不允许我用同一个目标表4次定义任务,那么在一个DAG中实现这个目的有什么最佳实践吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-06-18 16:16:31

您已经实现了batch_etl_task,以将SSHOperatortask_id设置为target_table_name的名称。

必须传递task_id的唯一名称才能注册DagNode

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72658702

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档