我正在尝试创建一个dag,它有两个操作符,它们是动态创建的,这取决于json配置文件拥有的“管道”的数量。该文件存储在变量dag_datafusion_args中。然后我有一个标准的bash操作符,我在最后有一个名为success的任务,它向slack发送一条消息,告诉slack已经结束。另外两个任务是python运算符,它们是动态生成并并行运行的。我使用的是编写器,当我将dag放入存储桶中时,它会出现在when服务器ui上,但当我单击查看dag时,会出现以下消息:“dag "dag_lucas4”似乎缺失。“',如果我在kubernetes集群上通过CLI直接测试任务,它可以工作!但是我似乎不能让web用户界面出现。我试着按照这里的一些人的建议,通过安装python包来重启but服务器,我尝试了3x,但没有成功。有人知道它会是什么吗?
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from aux_py_files.med.med_airflow_functions import *
from google.cloud import storage
from datetime import timedelta
TEMPLATE_SEARCH_PATH = '/home/airflow/gcs/plugins/'
INDEX=1
default_args = {
'owner':'lucas',
'start_date': '2021-01-10',
'email': ['xxxx'],
'email_on_failure': False,
'email_on_success': False,
'retries': 3,
'retry_delay': timedelta(minutes=2),
'on_failure_callback': post_message_fail_to_slack
}
dag_datafusion_args=return_datafusion_config_file('med')
with DAG('dag_lucas4', default_args = default_args, schedule_interval="30 23 * * *", template_searchpath = [TEMPLATE_SEARCH_PATH]) as dag:
extract_ftp_csv_files_load_in_gcs = BashOperator(
task_id='extract_ftp_csv_files_load_in_gcs',
bash_command='aux_sh_files/med/script.sh'
)
success = PythonOperator(
task_id='success',
python_callable=post_message_success_to_slack,
op_kwargs={'dag_name':'dag_lucas2'}
)
for pipeline,args in dag_datafusion_args.items():
configure_pipeline=PythonOperator(
task_id=f'configure_pipeline{str(INDEX)}',
python_callable=setPipelineArguments,
op_kwargs={'dag_name':'med', 'pipeline_name':pipeline},
provide_context=True
)
start_pipeline = PythonOperator(
task_id= f'start_pipeline{str(INDEX)}',
python_callable=start_pipeline_wrapper,
op_kwargs={'configure_pipeline_task':f'configure_pipeline{str(INDEX)}'},
retries=3,
provide_context=True
)
[extract_ftp_csv_files_load_in_gcs,configure_pipeline] >> start_pipeline >> success
INDEX += 1发布于 2021-01-13 01:57:31
Cloud Composer中的Airflow-Webserver在租户项目中运行,worker和scheduler在客户项目中运行。租户项目只不过是它的google侧管理环境,用于某些部分的气流组件。因此,the服务器UI不能完全访问您的项目资源。因为它不能在你的项目环境下运行。这样我就可以用return_datafusion_config_file读取我的配置json文件。最好的方法是使用该文件创建一个ENV变量。
https://stackoverflow.com/questions/65670362
复制相似问题