你好,我在达格斯特有下面的@job
@job
def job_extract_faces():
faces = op_extract_data(op_get_data_path())
r = op_process((faces, 'a'))
r = op_process((faces, 'b'))
r = op_process((faces, 'c'))
r = op_process((faces, 'd'))问题是达格斯特说op_process的输入应该是op_extrac_data的输出。
是否有添加参数而不是build 4函数?
西汉
发布于 2022-06-16 16:23:50
我想你是在找动态图。使用这种模式,您可以将参数作为DynamicOutputs从上游op发出,并映射到跨op_process的输出。一种选择是做这样的事情:
from dagster import op, job, DynamicOut, DynamicOutput
@op(config_schema={"param_list": [str]},
out=DynamicOut(str))
def param_generator(context):
for i, p in enumerate(context.op_config["param_list"]):
yield DynamicOutput(p, mapping_key=str(i))
@job
def job_extract_faces():
faces = op_extract_data(op_get_data_path())
param_generator().map(lambda p: op_process(faces, p))这类似于文档中演示如何执行带有附加参数的动态映射的映射示例。
https://stackoverflow.com/questions/72638268
复制相似问题