我需要通过数据融合操作符( Data,CloudDataFusionStartPipelineOperator)触发位于名为myDataFusionProject的GCP项目上的数据融合管道,该数据融合操作符的Composer实例位于另一个名为myCloudComposerProject的项目上。
我使用正式文件和源代码编写了与下面的代码片段大致类似的代码:
LOCATION = "someLocation"
PIPELINE_NAME = "myDataFusionPipeline"
INSTANCE_NAME = "myDataFusionInstance"
RUNTIME_ARGS = {"output.instance":"someOutputInstance", "input.dataset":"someInputDataset", "input.project":"someInputProject"}
start_pipeline = CloudDataFusionStartPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
runtime_args=RUNTIME_ARGS,
task_id="start_pipeline",
)我的问题是,每次触发DAG时,Composer都会在myCloudComposerProject中查找myCloudComposerProject而不是myDataFusionProject,这会产生类似于此的错误:
googleapiclient.errors.HttpError: <HttpError 404 when requesting https://datafusion.googleapis.com/v1beta1/projects/myCloudComposerProject/locations/someLocation/instances/myDataFusionInstance?alt=json returned "Resource 'projects/myCloudComposerProject/locations/someLocation/instances/myDataFusionInstance' was not found". Details: "[{'@type': 'type.googleapis.com/google.rpc.ResourceInfo', 'resourceName': 'projects/myCloudComposerProject/locations/someLocation/instances/myDataFusionInstance'}]"所以问题是:如何迫使我的操作符使用数据融合项目而不是Composer项目?我怀疑我可以通过添加一个新的运行时参数来实现这一点,但我不确定如何做到这一点。
最后一条信息:数据融合管道只需从BigQuery源提取数据并将所有信息发送到BigTable接收器。
发布于 2021-12-09 13:19:45
作为开发气流操作人员的建议,我们应该检查实现操作符的类,因为由于版本控制,文档可能缺少一些信息。
正如注释所指出的,如果您检查CloudDataFusionStartPipelineOperator,您会发现它使用了一个在project-id上获取实例基础的钩子。这个项目-id是可选的,所以您可以添加您自己的project-id。
class CloudDataFusionStartPipelineOperator(BaseOperator):
...
def __init__(
...
project_id: Optional[str] = None, ### NOT MENTION IN THE DOCUMENTATION
...
) -> None:
...
self.project_id = project_id
...
def execute(self, context: dict) -> str:
...
instance = hook.get_instance(
instance_name=self.instance_name,
location=self.location,
project_id=self.project_id, ### defaults your project-id
)
api_url = instance["apiEndpoint"]
... 将参数添加到运算符调用应该可以解决问题。
start_pipeline = CloudDataFusionStartPipelineOperator(
location=LOCATION,
pipeline_name=PIPELINE_NAME,
instance_name=INSTANCE_NAME,
runtime_args=RUNTIME_ARGS,
project_id=PROJECT_ID,
task_id="start_pipeline",
)https://stackoverflow.com/questions/70286201
复制相似问题