首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Cloud Composer无法在dag服务器UI中呈现动态DAG "DAG似乎缺失“

Cloud Composer无法在dag服务器UI中呈现动态DAG "DAG似乎缺失“
EN

Stack Overflow用户
提问于 2021-01-12 00:03:45
回答 1查看 66关注 0票数 0

我正在尝试创建一个dag,它有两个操作符,它们是动态创建的,这取决于json配置文件拥有的“管道”的数量。该文件存储在变量dag_datafusion_args中。然后我有一个标准的bash操作符,我在最后有一个名为success的任务,它向slack发送一条消息,告诉slack已经结束。另外两个任务是python运算符,它们是动态生成并并行运行的。我使用的是编写器,当我将dag放入存储桶中时,它会出现在when服务器ui上,但当我单击查看dag时,会出现以下消息:“dag "dag_lucas4”似乎缺失。“',如果我在kubernetes集群上通过CLI直接测试任务,它可以工作!但是我似乎不能让web用户界面出现。我试着按照这里的一些人的建议,通过安装python包来重启but服务器,我尝试了3x,但没有成功。有人知道它会是什么吗?

代码语言:javascript
复制
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from aux_py_files.med.med_airflow_functions import *
from google.cloud import storage
from datetime import timedelta
​
TEMPLATE_SEARCH_PATH = '/home/airflow/gcs/plugins/'
INDEX=1
default_args = {
'owner':'lucas',
'start_date': '2021-01-10',
'email': ['xxxx'],
'email_on_failure': False,
'email_on_success': False,
'retries': 3,
'retry_delay': timedelta(minutes=2),
'on_failure_callback': post_message_fail_to_slack
}
​
dag_datafusion_args=return_datafusion_config_file('med')
​
with DAG('dag_lucas4', default_args = default_args, schedule_interval="30 23 * * *", template_searchpath = [TEMPLATE_SEARCH_PATH]) as dag:
​
    extract_ftp_csv_files_load_in_gcs = BashOperator(
        task_id='extract_ftp_csv_files_load_in_gcs',
        bash_command='aux_sh_files/med/script.sh' 
    )
​
    success = PythonOperator(
                task_id='success',
                python_callable=post_message_success_to_slack,
                op_kwargs={'dag_name':'dag_lucas2'}
    )
​
    for pipeline,args in dag_datafusion_args.items():
​
​
        configure_pipeline=PythonOperator(
                task_id=f'configure_pipeline{str(INDEX)}',
                python_callable=setPipelineArguments,
                op_kwargs={'dag_name':'med', 'pipeline_name':pipeline},
                provide_context=True       
        )
            
        start_pipeline = PythonOperator(
                task_id= f'start_pipeline{str(INDEX)}',
                python_callable=start_pipeline_wrapper,
                op_kwargs={'configure_pipeline_task':f'configure_pipeline{str(INDEX)}'},
                retries=3,
                provide_context=True  
        )
​
        [extract_ftp_csv_files_load_in_gcs,configure_pipeline] >> start_pipeline >> success
        INDEX += 1
EN

回答 1

Stack Overflow用户

发布于 2021-01-13 01:57:31

Cloud Composer中的Airflow-Webserver在租户项目中运行,worker和scheduler在客户项目中运行。租户项目只不过是它的google侧管理环境,用于某些部分的气流组件。因此,the服务器UI不能完全访问您的项目资源。因为它不能在你的项目环境下运行。这样我就可以用return_datafusion_config_file读取我的配置json文件。最好的方法是使用该文件创建一个ENV变量。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65670362

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档