首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何指定在Composer上通过数据融合操作符触发管道时使用哪个GCP项目

如何指定在Composer上通过数据融合操作符触发管道时使用哪个GCP项目
EN

Stack Overflow用户
提问于 2021-12-09 07:38:35
回答 1查看 249关注 0票数 1

我需要通过数据融合操作符( Data,CloudDataFusionStartPipelineOperator)触发位于名为myDataFusionProject的GCP项目上的数据融合管道,该数据融合操作符的Composer实例位于另一个名为myCloudComposerProject的项目上。

我使用正式文件源代码编写了与下面的代码片段大致类似的代码:

代码语言:javascript
复制
LOCATION = "someLocation"
PIPELINE_NAME = "myDataFusionPipeline"
INSTANCE_NAME = "myDataFusionInstance"
RUNTIME_ARGS = {"output.instance":"someOutputInstance", "input.dataset":"someInputDataset", "input.project":"someInputProject"}

start_pipeline = CloudDataFusionStartPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    runtime_args=RUNTIME_ARGS,
    task_id="start_pipeline",
)

我的问题是,每次触发DAG时,Composer都会在myCloudComposerProject中查找myCloudComposerProject而不是myDataFusionProject,这会产生类似于此的错误:

代码语言:javascript
复制
googleapiclient.errors.HttpError: <HttpError 404 when requesting https://datafusion.googleapis.com/v1beta1/projects/myCloudComposerProject/locations/someLocation/instances/myDataFusionInstance?alt=json returned "Resource 'projects/myCloudComposerProject/locations/someLocation/instances/myDataFusionInstance' was not found". Details: "[{'@type': 'type.googleapis.com/google.rpc.ResourceInfo', 'resourceName': 'projects/myCloudComposerProject/locations/someLocation/instances/myDataFusionInstance'}]"

所以问题是:如何迫使我的操作符使用数据融合项目而不是Composer项目?我怀疑我可以通过添加一个新的运行时参数来实现这一点,但我不确定如何做到这一点。

最后一条信息:数据融合管道只需从BigQuery源提取数据并将所有信息发送到BigTable接收器。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-12-09 13:19:45

作为开发气流操作人员的建议,我们应该检查实现操作符的类,因为由于版本控制,文档可能缺少一些信息。

正如注释所指出的,如果您检查CloudDataFusionStartPipelineOperator,您会发现它使用了一个在project-id上获取实例基础的钩子。这个项目-id是可选的,所以您可以添加您自己的project-id

代码语言:javascript
复制
class CloudDataFusionStartPipelineOperator(BaseOperator):
 ...

    def __init__(
       ...
        project_id: Optional[str] = None,   ### NOT MENTION IN THE DOCUMENTATION 
        ...
    ) -> None:
        ...
        self.project_id = project_id 
        ...

    def execute(self, context: dict) -> str:
        ...
        instance = hook.get_instance(
            instance_name=self.instance_name,
            location=self.location,
            project_id=self.project_id, ### defaults your project-id
        )
        api_url = instance["apiEndpoint"]
        ... 

将参数添加到运算符调用应该可以解决问题。

代码语言:javascript
复制
start_pipeline = CloudDataFusionStartPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    runtime_args=RUNTIME_ARGS,
    project_id=PROJECT_ID,
    task_id="start_pipeline",
)

最后,除了正式文件网站之外,您还可以在github上查看airflow的文件。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70286201

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档