嗨,我需要定义一个带有任务的DAG,任务必须每天调用4次。但是,当我发布到DAG时,我得到了一个airflow.exceptions.DuplicationTaskIdFound错误,请参见下面的DAG定义:
import sys
import time
from airflow.models import DAG,Variable
from airflow.operators.bash import BashOperator
from datetime import datetime,timedelta
from airflow.operators.dummy import DummyOperator
from airflow.providers.ssh.hooks.ssh import SSHHook
from airflow.providers.ssh.operators.ssh import SSHOperator
from airflow.operators.python import PythonOperator
from airflow.sensors.external_task import ExternalTaskMarker, ExternalTaskSensor
from airflow.utils.dates import days_ago
import pendulum
sys.path.append("../..")
from common.airflow_dep_coordinator import airfow_coordinator
from common import airflow_utli
from common import airflow_config
default_args = airflow_utli.default_args
default_args['owner'] = 'bi'
default_args['sla'] = timedelta(hours=6)
default_args['retries'] = 3
default_args['start_date'] = datetime(2022,4,29, tzinfo=airflow_utli.local_tz) #TODO please override start date of this dag
airflow_config.EMAIL_RECIEVER_LIST=Variable.get("bi_email_reciever_list",deserialize_json=True , default_var=None)
# ---------------------------------------------------------------------------
#Get data string
# @beforeOfDay, the day before current day
# @with_dash, whether the result string contains dash, default False
# ---------------------------------------------------------------------------
def getdate(beforeOfDay, with_dash=False):
today = datetime.now()
offset = timedelta(days=-beforeOfDay)
str_date_format ='%Y%m%d'
if(with_dash):
str_date_format ='%Y-%m-%d'
date_str = (today + offset).strftime(str_date_format)
return date_str
# ---------------------------------------------------------------------------
#Get etl task, return SSHOperator
# ---------------------------------------------------------------------------
def batch_etl_task(dag,target_table_name, param=None,ssh_conn_id=SSHHOOK_NAME):
sh_command = COMMAND_BASE+ COMMAND.format(target_table_name=target_table_name, param=param)
print(sh_command)
task = SSHOperator(
dag=dag, task_id=target_table_name, ssh_conn_id=ssh_conn_id, command=sh_command
)
return task
# =========================================================================
# DAG definition
# =========================================================================
dag=DAG(
dag_id='lakehouse_dws_otp_app_user_daily',
default_args=default_args,
tags=['bi','dws','otp','app'],
schedule_interval='00 04 * * *',
dagrun_timeout=timedelta(hours=6),
concurrency=12,
catchup=False,
params={"pday": getdate(2),"fday":getdate(1),
"pdaym1": getdate(3),"fdaym1":getdate(2),
"pdaym7": getdate(9),"fdaym7":getdate(8),
"pdaym30": getdate(32),"fdaym30":getdate(31)
},
sla_miss_callback=airflow_utli.default_sla_callback
)
#etl task
task_dws_fact_com_otp_app_visitor_snp_t=batch_etl_task(dag=dag,target_table_name='AnotherTargetTable',param='{{ params.pday }} {{params.fday}}')
task_dws_fact_com_otp_app_active_retention_snp_t=batch_etl_task(dag=dag,target_table_name='target_table',param='{{ params.pday }} {{params.fday}}')
task_dws_fact_com_otp_app_active_retention_snp_t_m1=batch_etl_task(dag=dag,target_table_name='target_table',param='{{ params.pdaym1 }} {{params.fdaym1}}')
task_dws_fact_com_otp_app_active_retention_snp_t_m7=batch_etl_task(dag=dag,target_table_name='target_table',param='{{ params.pdaym7 }} {{params.fdaym7}}')
task_dws_fact_com_otp_app_active_retention_snp_t_m30=batch_etl_task(dag=dag,target_table_name='target_table',param='{{ params.pdaym30 }} {{params.fdaym30}}')
### End
end_dws_otp_register_daily=DummyOperator(
task_id='end_dws_otp_register_daily',
dag=dag)
### Dependence, TODO please setup the dependency tree
[task_dws_fact_com_otp_app_new_visitor_t,task_dws_fact_com_otp_app_active_au_snp_t]>>task_dws_fact_com_otp_app_active_retention_snp_t>>task_dws_fact_com_otp_app_active_retention_snp_t_m1正如您所看到的,我需要调用etl来用不同的参数更新4次target_table,但这不是工作,似乎DAG不允许我用同一个目标表4次定义任务,那么在一个DAG中实现这个目的有什么最佳实践吗?
发布于 2022-06-18 16:16:31
您已经实现了batch_etl_task,以将SSHOperator的task_id设置为target_table_name的名称。
必须传递task_id的唯一名称才能注册DagNode。
https://stackoverflow.com/questions/72658702
复制相似问题