Flux.just("a", "b", "c")
.log(null, Level.INFO,true) // line 18
.flatMap(value -> Mono.just(value.toUpperCase())
.publishOn(Schedulers.elastic()), 2)
.log(null, Level.INFO,true) // line 21
.subscribe();其中一些产出:
13:03:46 [main] INFO - | request(2) Flux.log(App.java:18)
13:03:46 [main] INFO - | onNext(a) Flux.log(App.java:18)
13:03:46 [main] INFO - | onNext(b) Flux.log(App.java:18)
13:03:46 [elastic-2] INFO - onNext(A) Flux.log(App.java:21)
13:03:46 [elastic-2] INFO - | request(1) Flux.log(App.java:18)
13:03:46 [main] INFO - | onNext(c) Flux.log(App.java:18)
13:03:46 [elastic-3] INFO - onNext(B) Flux.log(App.java:21)
13:03:46 [elastic-3] INFO - | request(1) Flux.log(App.java:18)
13:03:46 [elastic-2] INFO - onNext(C) Flux.log(App.java:21)
13:03:46 [elastic-2] INFO - | request(1) Flux.log(App.java:18)
13:03:46 [main] INFO - | onComplete() Flux.log(App.java:18)
13:03:46 [main] INFO - onComplete() Flux.log(App.java:21)问题:
flatMap从main线程中请求2个元素,然后从其他线程中请求更多的元素?subscribe不被main线程处理?发布于 2018-02-17 12:03:29
为什么2请求从主线程?
第一个Subscription.request数量取决于指定的并发级别,即2。由于您在主线程中调用了.subscribe,因此将在该线程上调用第一个prefetch请求。
让我们来看看下一个模式:
.subscribe()[Thread main] -> FluxLog.source.subscribe()[Tread Main] -> FluxFlatMap.source.subscribe()[ThreadMain] -> FluxJust.subscriber.onSubscribe() -> FluxFlatMap.subscription.request(concurrency)[Thread Main]
接下来会发生什么?
那么,从这一点开始,将是硬核:)。因为您的内流将被FlatMapInner订阅,它将在Scheduler.elastic上观察所有信号(onNext、onError、onComplete) (因为您的.publishOn)。反过来,当内部流完成后,其FlatMapInnner上的onComplete将通知主FlatMapMain,后者是整个flatMap机制的驱动程序。FlatMapInner和FlatMapMain之间的交互是通过FlatMapMain.innerComplete实现的。因为从FlatMapMain的角度来看,内部FlatMapInner扮演着Queue的角色,所以所有元素都是drained。保持冷静,如果你不知道这里到底发生了什么,不要惊慌。该方法的基本思想是将数据从内部流中排出,并将其发送到下游,然后向上游请求新的数据部分。您应该记住的是,innerComplete是从FlatMapInner.onComplete调用的,后者被移动到另一个Scheduler,因此这意味着下一个Subscription.request将从Mono.just(value.toUpperCase()).publishOn(Schedulers.elastic())中指定的线程调用。
因此,从原理图上看,该流程如下所示:
FluxFlatMap.FlatMapMain.onNext [Thread Main] -> Publisher m = mapper(...) -> m.subscribe(new FluxFlatMap.FlatMapInner()) -> FluxFlatMap.FlatMapInner.onNext("a") [Thread Elastic N] -> LambdaSubscriber.onNext("c") [Thread Elastic N] -> FluxFlatMap.FlatMapInner.onComplete() [Thread Elastic N] -> FluxFlatMap.FlatMapMain.drainLoop() [Thread Elastic N] -> FluxFlatMap.FlatMapMain.drainLoop() [Thread Elastic N] { ... subscription.request(amountOfCompletedInners) -> FlatMap.FlatMapMain.onNext() [Thread Elastic N] -> .LambdaSubscriber.onNext("c") [Thread Elastic N] ->。
因此,您将在main上看到第一个请求(2),然后从弹性中看到请求(1)(因为一个内部已经完成,所以FlatMap将从上游请求另一个元素以满足并发性的需求)。
https://stackoverflow.com/questions/48840650
复制相似问题