我正在尝试创建一个需要在多个任务之间传递数据的弗莱特工作流。我在文档中查看了其中一个例子,但是试图尽可能少地重新创建blob传递,我仍然无法让它工作。
下面是我的工作流定义(当然,我的实际用例生成了更多的数据):
from flytekit.sdk.tasks import python_task, outputs, inputs
from flytekit.sdk.types import Types
from flytekit.sdk.workflow import workflow_class, Output, Input
@inputs(the_text=Types.String)
@outputs(the_blob=Types.Blob)
@python_task
def create_blob(wf_params, the_text, the_blob):
fname = "a-file.txt"
with open(fname, "w") as f:
f.write(the_text)
the_blob.set(fname)
@inputs(the_blob=Types.Blob)
@outputs(the_text=Types.String)
@python_task
def read_blob(wf_params, the_blob, the_text):
the_blob.download()
with open(the_blob.local_path) as f:
the_text.set(f.read())
@workflow_class
class PassBlob:
input_text = Input(Types.String, required=True, help="The text to write to the file")
create = create_blob(the_text=input_text)
read = read_blob(the_blob=create.outputs.the_blob)
output_text = Output(read.outputs.the_text, sdk_type=Types.String, help="The text read from the file")此工作流成功部署,当我运行它时,会发生以下情况:
create任务成功运行,并说明了它的输出:
类型: /4u/fe0c7c6326294497dac9-create-0/9c684e85918080341a14478b5f013ee6 : the_blob:read任务在行the_blob.download()上失败,记录文件夹:
欢迎来到弗莱特!版本: 0.13.3 INFO:root:输入计时上下文:(/4u/fe0c7c6326294497dac9-create-0/9c684e85918080341a14478b5f013ee6 -> /tmp/task_dir__m4xve78/512f322cd94d40898b9478db9296b12c) INFO:root:Exiting context:(/4u/fe0c7c6326294497dac9-create-0/9c684e85918080341a14478b5f013ee6 -> /tmp/task_dir__m4xve78/512f322cd94d40898b9478db9296b12c) Wall Time: 6.86999992467463e-05s,进程时间:6.75669999999999061E-05s错误:root:!!开始错误捕获由弗莱特!错误:root:Traceback(最近一次调用):文件"/code/venv/lib/python3.6/site-packages/flytekit/common/exceptions/scopes.py",第155行,在system_entry_point返回包装(*args,**kwargs)文件"/code/venv/lib/python3.6/site-packages/flytekit/common/types/impl/blobs.py",第202行,在下载_data_proxy.Data.get_data(self.remote_location,self.local_path中,文件"/code/venv/lib/python3.6/site-packages/flytekit/interfaces/data/data_proxy.py",第136行,在get_data error_string=_six.text_type(ex)中,消息:无法从/4u/fe0c7c6326294497dac9-create-0/9c684e85918080341a14478b5f013ee6到/tmp/task_dir__m4xve78/512f322cd94d40898b9478db9296b12c (recursive=False)获取数据。原始异常: Errno 2没有这样的文件或目录:'/4u/fe0c7c6326294497dac9-create-0/9c684e85918080341a14478b5f013ee6‘用户错误。错误:根:!由Flyte捕获的结束错误!信息:根:输入定时上下文:写入(/tmp/engine_dir_5x9bt65w s3://my-s3-bucket/metadata/propeller/sprixie-on-flyte-development-fe0c7c6326294497dac9/read/data/0) -> INFO:root:命令''aws‘’--终结点-url‘,'http://minio.flyte:9000','s3','cp',’-递归‘,’-acl‘,’桶-所有者-完全控制‘,/tmp/engine_dir_5x9bt65w’,'s3://my-s3-bucket/metadata/propeller/sprixie-on-flyte-development-fe0c7c6326294497dac9/read/data/0'':b‘’Completed 907 Bytes/907 Bytes (84.2Kib/s),剩下一个文件\rupload:./tmp/engine_dir_5x9bt65w/error.pb到s3://my-s3-bucket/metadata/propeller/sprixie-on-flyte-development-fe0c7c6326294497dac9/read/data/0/error.pb\n‘信息:s3://my-s3-bucket/metadata/propeller/sprixie-on-flyte-development-fe0c7c6326294497dac9/read/data/0/error.pb\n’信息:退出时间上下文:写入(/tmp/engine_dir_5x9bt65w s3://my-s3-bucket/metadata/propeller/sprixie-on-flyte-development-fe0c7c6326294497dac9/read/data/0) ->时:1.5020931999897584,处理时间:0.0021405349999999999在任务之间传递Types.Blob的正确方法是什么?我该怎么做呢?
发布于 2020-10-22 05:18:30
我假设您是在本地沙箱环境中运行这个程序(您使用的是minio,它是我们在沙箱环境中部署的测试blob存储)。请您共享用于注册工作流的flytekit.config文件。
因此,Flyte根据配置方式自动将中间数据存储在桶(S3 / GCS)中。
前缀设置用于自动将数据上载到配置的桶和前缀https://github.com/lyft/flytesnacks/blob/b980963e48eac4ab7e4a9a3e58b353ad523cee47/cookbook/sandbox.config#L7。
在v0.7.0之前的版本-使用配置中的碎片格式化程序设置- https://github.com/lyft/flytesnacks/blob/b980963e48eac4ab7e4a9a3e58b353ad523cee47/cookbook/sandbox.config#L14-L17
也请告诉我们你运行的是什么版本的Flyte。请加入松懈的渠道,我可以帮你开始。为所有的麻烦感到抱歉
https://stackoverflow.com/questions/64467317
复制相似问题