首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在反应堆3中的多个发布者之间排队工作?

如何在反应堆3中的多个发布者之间排队工作?
EN

Stack Overflow用户
提问于 2018-10-16 15:07:23
回答 1查看 260关注 0票数 0

我正在创建一个使用反应堆3创建数据处理工作流的库,每个任务都有一个输入流量和一个输出流量。输入流量由用户提供。输出流量由库创建。可以将任务链接起来形成DAG。就像这样:(在科特林)

代码语言:javascript
复制
val base64 = task<String, String>("base64") {
    input { Flux.just("a", "b", "c", "d", "e") }
    outputFn { ... get the output values ... }
    scriptFn { ... do some stuff ... }
}

val step2 = task<List<String>, String>("step2") {
    input { base64.output.buffer(3) }
    outputFn { ... }
    scriptFn { ... }
}

我需要限制整个工作流的并发性。一次只能处理配置好的输入数。在上面的示例(限制为3)中,这意味着任务base64将首先运行输入"a“、"b”和"c“,然后等待每个任务在处理"d”、"e“和"step2”任务之前完成。

在从输入流创建输出流时,如何应用这些限制?TopicProcessor能被应用吗?也许是某种定制的调度程序或处理器?背压是怎么回事?我需要担心创建缓冲区吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-10-23 09:13:03

背压从最后的支撑向上传播,贯穿整个链。但是链中的操作符可以预先要求数据(预取),甚至可以“重写”请求。例如,在buffer(3)的情况下,如果该操作符接收到一个request(1),它将在上游执行一个request(3) (“1个缓冲区== max 3元素,以便我可以请求足够的资源来填充请求的1个缓冲区”)。

如果输入总是由用户提供,这将很难抽象化.

没有一种简单的方法可以对一个给定管道(一个Flux)的多个管道甚至多个订阅进行限制源的评级。

在多个Scheduler中使用共享的publishOn将无法工作,因为publishOn选择了一个Worker线程并将其粘住。

但是,如果您的问题更具体地说是关于base64任务是有限的,也许可以从flatMap的并发参数中获得效果?

代码语言:javascript
复制
input.flatMap(someString -> asyncProcess(someString), 3, 1);

这最多允许运行3次asyncProcess,每次终止时都从input的下一个值开始一个新的事件。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52838601

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档