首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >flatMap如何管理线程?

flatMap如何管理线程?
EN

Stack Overflow用户
提问于 2018-02-17 11:09:22
回答 1查看 1K关注 0票数 2
代码语言:javascript
复制
Flux.just("a", "b", "c")
        .log(null, Level.INFO,true) // line 18
        .flatMap(value -> Mono.just(value.toUpperCase())
                              .publishOn(Schedulers.elastic()), 2)
        .log(null, Level.INFO,true) // line 21
        .subscribe();

其中一些产出:

代码语言:javascript
复制
13:03:46 [main] INFO  - | request(2)    Flux.log(App.java:18)
13:03:46 [main] INFO  - | onNext(a)     Flux.log(App.java:18)
13:03:46 [main] INFO  - | onNext(b)     Flux.log(App.java:18)
13:03:46 [elastic-2] INFO  - onNext(A)  Flux.log(App.java:21)
13:03:46 [elastic-2] INFO  - | request(1)   Flux.log(App.java:18)
13:03:46 [main] INFO  - | onNext(c)     Flux.log(App.java:18)
13:03:46 [elastic-3] INFO  - onNext(B)  Flux.log(App.java:21)
13:03:46 [elastic-3] INFO  - | request(1)   Flux.log(App.java:18)
13:03:46 [elastic-2] INFO  - onNext(C)  Flux.log(App.java:21)
13:03:46 [elastic-2] INFO  - | request(1)   Flux.log(App.java:18)
13:03:46 [main] INFO  - | onComplete()  Flux.log(App.java:18)
13:03:46 [main] INFO  - onComplete()    Flux.log(App.java:21)

问题:

  1. 为什么flatMapmain线程中请求2个元素,然后从其他线程中请求更多的元素?
  2. 为什么subscribe不被main线程处理?
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-02-17 12:03:29

为什么2请求从主线程?

第一个Subscription.request数量取决于指定的并发级别,即2。由于您在主线程中调用了.subscribe,因此将在该线程上调用第一个prefetch请求。

让我们来看看下一个模式:

.subscribe()[Thread main] -> FluxLog.source.subscribe()[Tread Main] -> FluxFlatMap.source.subscribe()[ThreadMain] -> FluxJust.subscriber.onSubscribe() -> FluxFlatMap.subscription.request(concurrency)[Thread Main]

接下来会发生什么?

那么,从这一点开始,将是硬核:)。因为您的内流将被FlatMapInner订阅,它将在Scheduler.elastic上观察所有信号(onNext、onError、onComplete) (因为您的.publishOn)。反过来,当内部流完成后,其FlatMapInnner上的onComplete将通知主FlatMapMain,后者是整个flatMap机制的驱动程序。FlatMapInnerFlatMapMain之间的交互是通过FlatMapMain.innerComplete实现的。因为从FlatMapMain的角度来看,内部FlatMapInner扮演着Queue的角色,所以所有元素都是drained。保持冷静,如果你不知道这里到底发生了什么,不要惊慌。该方法的基本思想是将数据从内部流中排出,并将其发送到下游,然后向上游请求新的数据部分。您应该记住的是,innerComplete是从FlatMapInner.onComplete调用的,后者被移动到另一个Scheduler,因此这意味着下一个Subscription.request将从Mono.just(value.toUpperCase()).publishOn(Schedulers.elastic())中指定的线程调用。

因此,从原理图上看,该流程如下所示:

FluxFlatMap.FlatMapMain.onNext [Thread Main] -> Publisher m = mapper(...) -> m.subscribe(new FluxFlatMap.FlatMapInner()) -> FluxFlatMap.FlatMapInner.onNext("a") [Thread Elastic N] -> LambdaSubscriber.onNext("c") [Thread Elastic N] -> FluxFlatMap.FlatMapInner.onComplete() [Thread Elastic N] -> FluxFlatMap.FlatMapMain.drainLoop() [Thread Elastic N] -> FluxFlatMap.FlatMapMain.drainLoop() [Thread Elastic N] { ... subscription.request(amountOfCompletedInners) -> FlatMap.FlatMapMain.onNext() [Thread Elastic N] -> .LambdaSubscriber.onNext("c") [Thread Elastic N] ->。

因此,您将在main上看到第一个请求(2),然后从弹性中看到请求(1)(因为一个内部已经完成,所以FlatMap将从上游请求另一个元素以满足并发性的需求)。

票数 3
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48840650

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档