首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Schdulers.elastic没有在反应器中创建新线程

Schdulers.elastic没有在反应器中创建新线程
EN

Stack Overflow用户
提问于 2020-03-30 20:23:51
回答 3查看 586关注 0票数 1

我试图创建一个流,其中一个通量释放出10个条目,每个条目并行,每个条目休眠1s。因为每一项都是在一个单独的线程上发布的,所以我希望整个过程使用1s。但是日志显示它只需要10s。

我试着把subscribeOn改成publishOn,映射到doOnNext。但它们似乎都不起作用。

我对反应堆是新手,我想弄清楚我哪里出了问题。任何帮助都将不胜感激。谢谢

代码语言:javascript
复制
    public void whenIsANewThreadCreated() {
        Flux.range(1,10)
                .publishOn(Schedulers.elastic())
                .map(count -> {
                    logger.info(Thread.currentThread().getName() + " - Sleeping for 1s" + " with count: " + count);
                    try {
                        Thread.sleep(1_000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    return count;
                })
        .blockLast();
    }
代码语言:javascript
复制
2020-03-30 16:17:29.799  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 1
2020-03-30 16:17:30.802  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 2
2020-03-30 16:17:31.804  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 3
2020-03-30 16:17:32.805  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 4
2020-03-30 16:17:33.806  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 5
2020-03-30 16:17:34.808  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 6
2020-03-30 16:17:35.809  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 7
2020-03-30 16:17:36.811  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 8
2020-03-30 16:17:37.812  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 9
2020-03-30 16:17:38.814  INFO 15744 --- [      elastic-2] com.example.demo.DemoApplicationTests    : elastic-2 - Sleeping for 1s with count: 10
EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2020-03-30 20:52:56

首先必须通过调用parallel方法来创建并行流,并且必须使用runOn来实现并行性。

代码语言:javascript
复制
Flux.range(1,10)
    .parallel()
    .runOn(Schedulers.elastic())
    .map(count -> {
        System.out.println(Thread.currentThread().getName() + " - Sleeping for 1s" + " with count: " + count);
        try {
            Thread.sleep(1_000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return count;
    }).subscribe();

discouraged

  • parallel
  • 使用Schedulers.boundedElastic(),因为使用Scheduler.elastic()在默认情况下将创建基于CPU核心的线程。如果你想要更多的线程使用parallel(10) -我想这就是你想看到的.
票数 2
EN

Stack Overflow用户

发布于 2020-04-02 09:23:26

该规范要求对onNext事件进行串行调用。您的map有效地将输入onNext事件转换为阻塞1秒的onNext事件。根据规范,10个传入的onNext导致一系列的10个输出onNext,每个块用于1s => 10s阻塞。

如果您想在10条并行rails上分发阻塞工作负载,那么绝对100%必须使用parallel(10).runOn(Scheduler.elastic())。( Scheduler for runOn也可以是Schedulers.boundedElastic()Schedulers.newParallel(10))。

票数 1
EN

Stack Overflow用户

发布于 2021-09-21 06:48:46

参考资料:https://projectreactor.io/docs/core/release/reference/#faq.wrap-blocking

您可以在后台启动这些进程并获得多线程。这不是平行的。您应该在执行CPU密集型任务时使用并行调度程序,在I/O或阻塞操作时使用弹性调度程序。

代码语言:javascript
复制
public void whenIsANewThreadCreated() {
    Flux.range(1,10)
            .subscribeOn(Schedulers.boundedElastic()) // if not, main calling thread will be used
            .flatMap(count -> {
                log.info(Thread.currentThread().getName() + " - Sleeping for 1s" + " with count: " + count);
                return Mono.fromCallable(() -> method5(count)).subscribeOn(Schedulers.boundedElastic());
            })
            .blockLast();
}


Mono<Integer> method5(int count) {
    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return Mono.just(count);
}

你会得到这样的东西

代码语言:javascript
复制
 23:42:33.289 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 1
 23:42:33.342 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 2
 23:42:33.342 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 3
 23:42:33.342 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 4
 23:42:33.343 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 5
 23:42:33.343 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 6
 23:42:33.343 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 7
 23:42:33.343 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 8
 23:42:33.344 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 9
 23:42:33.344 [boundedElastic-1] INFO  com.personal.ReactiveTest - boundedElastic-1 - Sleeping for 1s with count: 10
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60938922

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档