我试图创建一个流,其中一个通量释放出10个条目,每个条目并行,每个条目休眠1s。因为每一项都是在一个单独的线程上发布的,所以我希望整个过程使用1s。但是日志显示它只需要10s。
我试着把subscribeOn改成publishOn,映射到doOnNext。但它们似乎都不起作用。
我对反应堆是新手,我想弄清楚我哪里出了问题。任何帮助都将不胜感激。谢谢
public void whenIsANewThreadCreated() {
Flux.range(1,10)
.publishOn(Schedulers.elastic())
.map(count -> {
logger.info(Thread.currentThread().getName() + " - Sleeping for 1s" + " with count: " + count);
try {
Thread.sleep(1_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return count;
})
.blockLast();
}2020-03-30 16:17:29.799 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 1
2020-03-30 16:17:30.802 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 2
2020-03-30 16:17:31.804 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 3
2020-03-30 16:17:32.805 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 4
2020-03-30 16:17:33.806 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 5
2020-03-30 16:17:34.808 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 6
2020-03-30 16:17:35.809 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 7
2020-03-30 16:17:36.811 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 8
2020-03-30 16:17:37.812 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 9
2020-03-30 16:17:38.814 INFO 15744 --- [ elastic-2] com.example.demo.DemoApplicationTests : elastic-2 - Sleeping for 1s with count: 10发布于 2020-03-30 20:52:56
首先必须通过调用parallel方法来创建并行流,并且必须使用runOn来实现并行性。
Flux.range(1,10)
.parallel()
.runOn(Schedulers.elastic())
.map(count -> {
System.out.println(Thread.currentThread().getName() + " - Sleeping for 1s" + " with count: " + count);
try {
Thread.sleep(1_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return count;
}).subscribe();discouraged
parallel Schedulers.boundedElastic(),因为使用Scheduler.elastic()在默认情况下将创建基于CPU核心的线程。如果你想要更多的线程使用parallel(10) -我想这就是你想看到的.发布于 2020-04-02 09:23:26
该规范要求对onNext事件进行串行调用。您的map有效地将输入onNext事件转换为阻塞1秒的onNext事件。根据规范,10个传入的onNext导致一系列的10个输出onNext,每个块用于1s => 10s阻塞。
如果您想在10条并行rails上分发阻塞工作负载,那么绝对100%必须使用parallel(10).runOn(Scheduler.elastic())。( Scheduler for runOn也可以是Schedulers.boundedElastic()或Schedulers.newParallel(10))。
发布于 2021-09-21 06:48:46
参考资料:https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking
您可以在后台启动这些进程并获得多线程。这不是平行的。您应该在执行CPU密集型任务时使用并行调度程序,在I/O或阻塞操作时使用弹性调度程序。
public void whenIsANewThreadCreated() {
Flux.range(1,10)
.subscribeOn(Schedulers.boundedElastic()) // if not, main calling thread will be used
.flatMap(count -> {
log.info(Thread.currentThread().getName() + " - Sleeping for 1s" + " with count: " + count);
return Mono.fromCallable(() -> method5(count)).subscribeOn(Schedulers.boundedElastic());
})
.blockLast();
}
Mono<Integer> method5(int count) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return Mono.just(count);
}你会得到这样的东西
23:42:33.289 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 1
23:42:33.342 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 2
23:42:33.342 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 3
23:42:33.342 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 4
23:42:33.343 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 5
23:42:33.343 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 6
23:42:33.343 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 7
23:42:33.343 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 8
23:42:33.344 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 9
23:42:33.344 [boundedElastic-1] INFO com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 10https://stackoverflow.com/questions/60938922
复制相似问题