我在相同的flux上使用publishOn和subscribeOn,如下所示:
System.out.println("*********Calling Concurrency************");
List<Integer> elements = new ArrayList<>();
Flux.just(1, 2, 3, 4)
.map(i -> i * 2)
.log()
.publishOn(Schedulers.elastic())
.subscribeOn(Schedulers.parallel())
.subscribe(elements::add);
System.out.println("-------------------------------------");不过,当我同时使用这两种方法时,日志中不会打印任何内容。但是当我只使用publishOn时,我得到了以下信息日志:
*********Calling Concurrency************
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(256)
[info] | onNext(1)
[info] | onNext(2)
[info] | onNext(3)
[info] | onNext(4)
[info] | onComplete()
-------------------------------------是不是比subscribeOn更推荐使用publishOn?或者它比subscribeOn有更多的偏好?这两者之间有什么区别,什么时候使用哪一个?
发布于 2018-01-03 16:17:24
这是我得到的一个小文档:
publishOn的应用方式与任何其他运营商相同,位于用户链的中间。它从下游获取信号,并在上游重放这些信号,同时从关联的Scheduler对工作线程执行回调。因此,它会影响后续运算符的执行位置(直到链接了另一个publishOn )。
当构建后向链时,subscribeOn将应用于订阅过程。因此,无论您将subscribeOn放在链中的什么位置,它始终会影响源发射的上下文。但是,这不会影响后续调用publishOn的行为。它们仍然切换它们后面的链部分的执行上下文。
和
publishOn强制下一个运算符(以及下一个运算符之后的后续运算符)在不同的线程上运行。类似地,subscribeOn强制前一个运算符(以及可能在前一个运算符之前的运算符)在不同的线程上运行。
发布于 2019-12-26 03:26:40
我花了一些时间来理解它,也许是因为publishOn通常在subscribeOn之前被解释,这里有一个希望更简单的外行解释。
subscribeOn意味着在指定的调度器工作线程(其他线程)上运行初始源发射(例如subscribe(), onSubscribe() and request() ),对于任何后续操作(例如onNext/onError/onComplete, map etc )也是如此,并且无论subscribeOn()的位置如何,都会发生这种行为
如果你在流畅的调用中没有做任何publishOn,那么就是这样,所有的东西都会在这样的线程上运行。
但是,假设您在中间调用了publishOn(),那么任何后续的操作符调用都将在提供给此类publishOn()的调度程序worker上运行。
下面是一个例子
Consumer<Integer> consumer = s -> System.out.println(s + " : " + Thread.currentThread().getName());
Flux.range(1, 5)
.doOnNext(consumer)
.map(i -> {
System.out.println("Inside map the thread is " + Thread.currentThread().getName());
return i * 10;
})
.publishOn(Schedulers.newElastic("First_PublishOn()_thread"))
.doOnNext(consumer)
.publishOn(Schedulers.newElastic("Second_PublishOn()_thread"))
.doOnNext(consumer)
.subscribeOn(Schedulers.newElastic("subscribeOn_thread"))
.subscribe();其结果将是
1 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
2 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
10 : First_PublishOn()_thread-6
3 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
20 : First_PublishOn()_thread-6
4 : subscribeOn_thread-4
10 : Second_PublishOn()_thread-5
30 : First_PublishOn()_thread-6
20 : Second_PublishOn()_thread-5
Inside map the thread is subscribeOn_thread-4
30 : Second_PublishOn()_thread-5
5 : subscribeOn_thread-4
40 : First_PublishOn()_thread-6
Inside map the thread is subscribeOn_thread-4
40 : Second_PublishOn()_thread-5
50 : First_PublishOn()_thread-6
50 : Second_PublishOn()_thread-5正如您可以看到的,第一个doOnNext()和下面的map()在名为subscribeOn_thread的线程上运行,这种情况会发生,直到调用任何publishOn(),然后任何后续调用都会在提供的对该publishOn()的调度程序上运行,并且在任何人调用另一个publishOn()之前,任何后续调用都会发生这种情况。
发布于 2020-10-18 01:18:42
以下是优秀的博客文章https://spring.io/blog/2019/12/13/flight-of-the-flux-3-hopping-threads-and-schedulers的摘录
publishOn
这是您想要跳转线程时所需的基本操作符。来自其源的传入信号在给定的Scheduler上发布,有效地将线程切换到该调度器的某个工作线程。
这对onNext、onComplete和onError信号有效。也就是说,从上游源流向下游订户的信号。
因此,本质上,出现在此操作符下面的每个处理步骤都将在新的调度程序s上执行,直到另一个操作符再次切换(例如,另一个publishOn)。
Flux.fromIterable(firstListOfUrls) //contains A, B and C
.publishOn(Schedulers.boundedElastic())
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
Flux.fromIterable(secondListOfUrls) //contains D and E
.publishOn(Schedulers.boundedElastic())
.map(url -> blockingWebClient.get(url))
.subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));输出
boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got CsubscribeOn
此运算符更改执行subscribe方法的位置。由于subscribe信号向上流动,它直接影响源Flux订阅和开始生成数据的位置。
因此,它似乎作用于向上和向下的操作符反应链的一部分(只要在混合中没有抛出publishOn ):
final Flux<String> fetchUrls(List<String> urls) {
return Flux.fromIterable(urls)
.map(url -> blockingWebClient.get(url));
}
// sample code:
fetchUrls(A, B, C)
.subscribeOn(Schedulers.boundedElastic())
.subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));
fetchUrls(D, E)
.subscribeOn(Schedulers.boundedElastic())
.subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));输出
boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got Chttps://stackoverflow.com/questions/48073315
复制相似问题