我正在创建一个使用反应堆3创建数据处理工作流的库,每个任务都有一个输入流量和一个输出流量。输入流量由用户提供。输出流量由库创建。可以将任务链接起来形成DAG。就像这样:(在科特林)
val base64 = task<String, String>("base64") {
input { Flux.just("a", "b", "c", "d", "e") }
outputFn { ... get the output values ... }
scriptFn { ... do some stuff ... }
}
val step2 = task<List<String>, String>("step2") {
input { base64.output.buffer(3) }
outputFn { ... }
scriptFn { ... }
}我需要限制整个工作流的并发性。一次只能处理配置好的输入数。在上面的示例(限制为3)中,这意味着任务base64将首先运行输入"a“、"b”和"c“,然后等待每个任务在处理"d”、"e“和"step2”任务之前完成。
在从输入流创建输出流时,如何应用这些限制?TopicProcessor能被应用吗?也许是某种定制的调度程序或处理器?背压是怎么回事?我需要担心创建缓冲区吗?
发布于 2018-10-23 09:13:03
背压从最后的支撑向上传播,贯穿整个链。但是链中的操作符可以预先要求数据(预取),甚至可以“重写”请求。例如,在buffer(3)的情况下,如果该操作符接收到一个request(1),它将在上游执行一个request(3) (“1个缓冲区== max 3元素,以便我可以请求足够的资源来填充请求的1个缓冲区”)。
如果输入总是由用户提供,这将很难抽象化.
没有一种简单的方法可以对一个给定管道(一个Flux)的多个管道甚至多个订阅进行限制源的评级。
在多个Scheduler中使用共享的publishOn将无法工作,因为publishOn选择了一个Worker线程并将其粘住。
但是,如果您的问题更具体地说是关于base64任务是有限的,也许可以从flatMap的并发参数中获得效果?
input.flatMap(someString -> asyncProcess(someString), 3, 1);这最多允许运行3次asyncProcess,每次终止时都从input的下一个值开始一个新的事件。
https://stackoverflow.com/questions/52838601
复制相似问题