首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在solid输出上的Dagster循环

在solid输出上的Dagster循环
EN

Stack Overflow用户
提问于 2021-03-12 21:50:14
回答 1查看 614关注 0票数 2

我有一个由两个实体组成的Dagster管道(下面是可重现的例子)。第一个(return_some_list)输出一些对象的列表。第二个实体(print_num)接受第一个列表(而不是整个列表)中的元素,并对该元素进行一些处理。

对于第一个实体返回的列表中的每个元素,我应该如何调用第二个实体?请解释任何最好的实践。

我不确定这是否是最好的方法(让我知道),但是我想为第一个实体输出的每个元素生成一个不同的print_num实体实例。这将帮助我在将来并行化solid,并更好地处理长/计算密集型solid。

代码语言:javascript
复制
from dagster import execute_pipeline, pipeline, solid

@solid
def return_some_list(context):
    return [1,2,3,4,5]

@solid
def print_num(context, some_num: int):
    print(some_num)
    return some_num


@pipeline
def some_pipeline():
    output_list = return_some_list()
    for some_num in output_list:
        print_num(some_num)

if __name__ == "__main__":
    result = execute_pipeline(some_pipeline)
EN

回答 1

Stack Overflow用户

发布于 2021-03-14 03:38:40

事实证明,有一个实验性的特性(希望将成为正式的),它允许基于可迭代输出的元素创建任务。工作代码如下:

代码语言:javascript
复制
from dagster import execute_pipeline, pipeline, solid, Output, OutputDefinition
from dagster.experimental import DynamicOutput, DynamicOutputDefinition
from typing import List


@solid
def return_some_list(context):
    return [1, 2, 3, 4, 5]


@solid(output_defs=[DynamicOutputDefinition(int)])
def generate_subtasks(context, nums: List[int]):
    context.log.info(str(nums))
    for num in nums:
        yield DynamicOutput(num, mapping_key=f'subtask_{num}')


@solid
def print_num(context, some_num: int):
    context.log.info(str(some_num))
    return some_num


@pipeline
def some_pipeline():
    output_list = return_some_list()
    generate_subtasks(output_list).map(print_num)


if __name__ == "__main__":
    result = execute_pipeline(some_pipeline)

在这里,return_some_list返回一个可迭代的。我们想要为这个迭代器的每个元素运行一个实体。我们在实体generate_subtasks中执行此操作,它将生成一个包含元素和将为其生成子任务的名称的DynamicOutputDynamicOutput的类型信息在solid规范中的DynamicOutputDefinition中给出。

为了连接这些实体,我们首先通过return_some_list获取列表。然后调用generate_subtasks,这是一个生成器,并将print_num函数map到它的每个输出。

运行整个流水线应该会为generate_subtasks生成的每个子任务打印大量信息,如下所示(只显示了输出的一部分):

代码语言:javascript
复制
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_4] - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_4] - HANDLED_OUTPUT - Handled output "result" using output manager "io_manager"
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_4] - STEP_SUCCESS - Finished execution of step "print_num[subtask_4]" in 2.1ms.
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - STEP_START - Started execution of step "print_num[subtask_5]".
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - LOADED_INPUT - Loaded input "some_num" using input manager "io_manager", from output "result" of step "test"
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - STEP_INPUT - Got input "some_num" of type "Int". (Type check passed).
2021-03-13 21:27:53 - dagster - INFO - system - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - print_num[subtask_5] - 5
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - STEP_OUTPUT - Yielded output "result" of type "Any". (Type check passed).
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - HANDLED_OUTPUT - Handled output "result" using output manager "io_manager"
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - print_num[subtask_5] - STEP_SUCCESS - Finished execution of step "print_num[subtask_5]" in 1.98ms.
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - ENGINE_EVENT - Finished steps in process (pid: 33738) in 44ms
2021-03-13 21:27:53 - dagster - DEBUG - some_pipeline - fa5d6a93-0c05-4663-aa5b-25c95852a9a4 - 33738 - PIPELINE_SUCCESS - Finished execution of pipeline "some_pipeline".

哦,还有一件很酷的事情: Dagster执行类型检查,如果你给它一个输入错误的参数,它很快就会失败。所以,如果我们给map函数提供print_str,它甚至会拒绝运行。

票数 5
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66601150

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档