下面的代码使用Project Reactor跨有限数量的工作线程分发阻塞I/O操作:
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;
...
List<Item> processItems(List<Item> items) {
final int parallelDegree = 10;
final Scheduler scheduler = Schedulers.newParallel("myScheduler", parallelDegree, true);
return Flux.fromIterable(items)
.parallel(parallelDegree)
.runOn(scheduler)
.map(this::doSomeBlockingIo)
.sequential()
.publishOn(Schedulers.immediate())
.collectList()
.block();
...
Item doSomeBlockingIo(Item item) {
// perform some non-deterministic, blocking I/O with side-effects
...
return someNewItem;
}代码看起来按原样运行良好。但是它是健壮的和惯用的吗?
注意:我已经检查过了,在项目反应堆文档(包括JavaDocs)中没有明确禁止此用例的内容。
找个朋友。
发布于 2020-01-29 19:17:20
它将工作得很好,它很健壮,但您使用并行调度器来阻塞IO工作的事实并不是最优的(并且不是特别惯用的;当有reactor经验的人看到并行调度器时,他们希望看到它运行非阻塞IO。)
这里更好的方法是将你的并行调度器换成一个你选择的上限(在你的例子中是10)的bounded elastic scheduler --这将会旋转并在必要时重用支持工作者,直到你的上限。
https://stackoverflow.com/questions/59960254
复制相似问题