我正在尝试设置一个简单的KubeFlow管道,但在以一种适用于KubeFlow的方式打包依赖项时遇到了麻烦。
代码只是下载一个配置文件并对其进行解析,然后传回解析后的配置。
但是,为了解析配置文件,它需要能够访问另一个内部python包。
我在同一项目中的存储桶上有一个包的.tar.gz存档,并添加了包的URL作为依赖项,但我收到了一条错误消息tarfile.ReadError: not a gzip file。
我知道这个文件很好,所以这是在存储桶上托管或kubeflow安装依赖项的方式的一些中间问题。
下面是一个最小的例子:
from kfp import compiler
from kfp import dsl
from kfp.components import func_to_container_op
from google.protobuf import text_format
from google.cloud import storage
import training_reader
def get_training_config(working_bucket: str,
working_directoy: str,
config_file: str) -> training_reader.TrainEvalPipelineConfig:
download_file(working_bucket, os.path.join(working_directoy, config_file), "ssd.config")
pipeline_config = training_reader.TrainEvalPipelineConfig()
with open("ssd.config", 'r') as f:
text_format.Merge(f.read(), pipeline_config)
return pipeline_config
config_op_packages = ["https://storage.cloud.google.com/my_bucket/packages/training-reader-0.1.tar.gz",
"google-cloud-storage",
"protobuf"
]
training_config_op = func_to_container_op(get_training_config,
base_image="tensorflow/tensorflow:1.15.2-py3",
packages_to_install=config_op_packages)
def output_config(config: training_reader.TrainEvalPipelineConfig) -> None:
print(config)
output_config_op = func_to_container_op(output_config)
@dsl.pipeline(
name='Post Training Processing',
description='Building the post-processing pipeline'
)
def ssd_postprocessing_pipeline(
working_bucket: str,
working_directory: str,
config_file:str):
config = training_config_op(working_bucket, working_directory, config_file)
output_config_op(config.output)
pipeline_name = ssd_postprocessing_pipeline.__name__ + '.zip'
compiler.Compiler().compile(ssd_postprocessing_pipeline, pipeline_name)发布于 2020-06-20 07:55:03
https://storage.cloud.google.com/my_bucket/packages/training-reader-0.1.tar.gz IRL需要身份验证。尝试在隐身模式下下载它,你会看到登录页面而不是文件。将URL更改为https://storage.googleapis.com/my_bucket/packages/training-reader-0.1.tar.gz对公共对象有效,但您的对象不是公共对象。
您唯一可以做的事情(如果您不能公开包)是使用google.cloud.storage库或gsutil程序从存储桶中下载文件,然后使用subprocess.run([sys.executable, '-m', 'pip', 'install', ...])手动安装它
你从哪里下载数据?
其目的是什么?
pipeline_config = training_reader.TrainEvalPipelineConfig()
with open("ssd.config", 'r') as f:
text_format.Merge(f.read(), pipeline_config)
return pipeline_config为什么不简单地执行以下操作:
def get_training_config(
working_bucket: str,
working_directory: str,
config_file: str,
output_config_path: OutputFile('TrainEvalPipelineConfig'),
):
download_file(working_bucket, os.path.join(working_directoy, config_file), output_config_path)以kubeflow安装依赖项的方式进行
。
将您的组件导出到可加载的component.yaml,您将看到KFP轻量级组件如何安装依赖项:
training_config_op = func_to_container_op(
get_training_config,
base_image="tensorflow/tensorflow:1.15.2-py3",
packages_to_install=config_op_packages,
output_component_file='component.yaml',
)附言:一些小信息:
@dsl.pipeline(
除非要使用dsl-compile命令行程序,否则不需要
pipeline_name = ssd_postprocessing_pipeline.name + '.zip‘compiler.Compiler().compile(ssd_postprocessing_pipeline,pipeline_name)
你知道你可以直接用kfp.Client(host=...).create_run_from_pipeline_func(ssd_postprocessing_pipeline, arguments={})命令来运行管道吗?
https://stackoverflow.com/questions/62196569
复制相似问题