我有一个像这样的Flyte任务函数:
@task
def do_stuff(framework_obj):
framework_obj.get_outputs() # This calls Types.Blob.fetch(some_uri)尝试使用flytekit.sdk.types.Types.Blob.fetch加载blob URI,但收到此错误:
ERROR:flytekit: Exception when executing No temporary file system is present. Either call this method from within the context of a task or surround with a 'with LocalTestFileSystem():' block. Or specify a path when calling this function. Note: Cleanup is not automatic when a path is specified.我可以确认我可以在测试中使用with LocalTestFileSystem()加载blob,但是当实际尝试运行工作流时,我不确定为什么会出现这个错误,因为调用blob处理的函数是用@task修饰的,所以它绝对是一个Flyte任务。我还确认了Flyte web控制台上存在该任务节点。
错误引用的路径是什么,以及如何适当地调用此函数?
使用Flyte版本0.16.2
发布于 2021-04-06 02:52:56
您能提供更多关于代码的信息吗?这是flytekit版本0.15.x吗?我有点困惑,因为这个版本不应该有@task装饰器。它应该只有@python_task,这是一个较旧的接口。如果您想使用新的python本机类型API,则应该安装flytekit==0.17.0。
另外,你能指出你正在看的文档吗?我们最近更新了相当多的文档,可能有一些混淆。These是值得一看的例子。还有两个新的Python类,FlyteFile和FlyteDirectory,它们已经取代了flytekit中的Blob类(尽管这仍然是IDL type的名称)。
(我会留下这篇文章作为评论,但我现在还没有这个名气。)
帮助获取输出和从文件输出中读取的一些代码
@task
def task_file_reader():
client = SynchronousFlyteClient("flyteadmin.flyte.svc.cluster.local:81", insecure=True)
exec_id = WorkflowExecutionIdentifier(
domain="development",
project="flytesnacks",
name="iaok0qy6k1",
)
data = client.get_execution_data(exec_id)
lit = data.full_outputs.literals["o0"]
ctx = FlyteContext.current_context()
ff = TypeEngine.to_python_value(ctx, lv=lit,
expected_python_type=FlyteFile)
with open(ff, 'rb') as fh:
print(fh.readlines())https://stackoverflow.com/questions/66957164
复制相似问题