首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >项目反应堆3中的publishOn与subscribeOn

项目反应堆3中的publishOn与subscribeOn
EN

Stack Overflow用户
提问于 2018-01-03 15:54:59
回答 3查看 14.4K关注 0票数 16

我在相同的flux上使用publishOn和subscribeOn,如下所示:

代码语言:javascript
复制
    System.out.println("*********Calling Concurrency************");
    List<Integer> elements = new ArrayList<>();
    Flux.just(1, 2, 3, 4)
      .map(i -> i * 2)
      .log()
      .publishOn(Schedulers.elastic())
      .subscribeOn(Schedulers.parallel())
      .subscribe(elements::add);
    System.out.println("-------------------------------------");

不过,当我同时使用这两种方法时,日志中不会打印任何内容。但是当我只使用publishOn时,我得到了以下信息日志:

代码语言:javascript
复制
*********Calling Concurrency************
[info] | onSubscribe([Synchronous Fuseable] FluxArray.ArraySubscription)
[info] | request(256)
[info] | onNext(1)
[info] | onNext(2)
[info] | onNext(3)
[info] | onNext(4)
[info] | onComplete()
-------------------------------------

是不是比subscribeOn更推荐使用publishOn?或者它比subscribeOn有更多的偏好?这两者之间有什么区别,什么时候使用哪一个?

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2018-01-03 16:17:24

这是我得到的一个小文档:

publishOn的应用方式与任何其他运营商相同,位于用户链的中间。它从下游获取信号,并在上游重放这些信号,同时从关联的Scheduler对工作线程执行回调。因此,它会影响后续运算符的执行位置(直到链接了另一个publishOn )。

当构建后向链时,subscribeOn将应用于订阅过程。因此,无论您将subscribeOn放在链中的什么位置,它始终会影响源发射的上下文。但是,这不会影响后续调用publishOn的行为。它们仍然切换它们后面的链部分的执行上下文。

publishOn强制下一个运算符(以及下一个运算符之后的后续运算符)在不同的线程上运行。类似地,subscribeOn强制前一个运算符(以及可能在前一个运算符之前的运算符)在不同的线程上运行。

票数 11
EN

Stack Overflow用户

发布于 2019-12-26 03:26:40

我花了一些时间来理解它,也许是因为publishOn通常在subscribeOn之前被解释,这里有一个希望更简单的外行解释。

subscribeOn意味着在指定的调度器工作线程(其他线程)上运行初始源发射(例如subscribe(), onSubscribe() and request() ),对于任何后续操作(例如onNext/onError/onComplete, map etc )也是如此,并且无论subscribeOn()的位置如何,都会发生这种行为

如果你在流畅的调用中没有做任何publishOn,那么就是这样,所有的东西都会在这样的线程上运行。

但是,假设您在中间调用了publishOn(),那么任何后续的操作符调用都将在提供给此类publishOn()的调度程序worker上运行。

下面是一个例子

代码语言:javascript
复制
Consumer<Integer> consumer = s -> System.out.println(s + " : " + Thread.currentThread().getName());

Flux.range(1, 5)
        .doOnNext(consumer)
        .map(i -> {
          System.out.println("Inside map the thread is " + Thread.currentThread().getName());
          return i * 10;
        })
        .publishOn(Schedulers.newElastic("First_PublishOn()_thread"))
        .doOnNext(consumer)
        .publishOn(Schedulers.newElastic("Second_PublishOn()_thread"))
        .doOnNext(consumer)
        .subscribeOn(Schedulers.newElastic("subscribeOn_thread"))
        .subscribe();

其结果将是

代码语言:javascript
复制
1 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
2 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
10 : First_PublishOn()_thread-6
3 : subscribeOn_thread-4
Inside map the thread is subscribeOn_thread-4
20 : First_PublishOn()_thread-6
4 : subscribeOn_thread-4
10 : Second_PublishOn()_thread-5
30 : First_PublishOn()_thread-6
20 : Second_PublishOn()_thread-5
Inside map the thread is subscribeOn_thread-4
30 : Second_PublishOn()_thread-5
5 : subscribeOn_thread-4
40 : First_PublishOn()_thread-6
Inside map the thread is subscribeOn_thread-4
40 : Second_PublishOn()_thread-5
50 : First_PublishOn()_thread-6
50 : Second_PublishOn()_thread-5

正如您可以看到的,第一个doOnNext()和下面的map()在名为subscribeOn_thread的线程上运行,这种情况会发生,直到调用任何publishOn(),然后任何后续调用都会在提供的对该publishOn()的调度程序上运行,并且在任何人调用另一个publishOn()之前,任何后续调用都会发生这种情况。

票数 18
EN

Stack Overflow用户

发布于 2020-10-18 01:18:42

以下是优秀的博客文章https://spring.io/blog/2019/12/13/flight-of-the-flux-3-hopping-threads-and-schedulers的摘录

publishOn

这是您想要跳转线程时所需的基本操作符。来自其源的传入信号在给定的Scheduler上发布,有效地将线程切换到该调度器的某个工作线程。

这对onNextonCompleteonError信号有效。也就是说,从上游源流向下游订户的信号。

因此,本质上,出现在此操作符下面的每个处理步骤都将在新的调度程序s上执行,直到另一个操作符再次切换(例如,另一个publishOn)。

代码语言:javascript
复制
Flux.fromIterable(firstListOfUrls) //contains A, B and C
    .publishOn(Schedulers.boundedElastic())
    .map(url -> blockingWebClient.get(url))
    .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));

Flux.fromIterable(secondListOfUrls) //contains D and E
    .publishOn(Schedulers.boundedElastic())
    .map(url -> blockingWebClient.get(url))
    .subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));

输出

代码语言:javascript
复制
boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C

subscribeOn

此运算符更改执行subscribe方法的位置。由于subscribe信号向上流动,它直接影响源Flux订阅和开始生成数据的位置。

因此,它似乎作用于向上和向下的操作符反应链的一部分(只要在混合中没有抛出publishOn ):

代码语言:javascript
复制
final Flux<String> fetchUrls(List<String> urls) {
  return Flux.fromIterable(urls)
           .map(url -> blockingWebClient.get(url));
}

// sample code:
fetchUrls(A, B, C)
  .subscribeOn(Schedulers.boundedElastic())
  .subscribe(body -> System.out.println(Thread.currentThread().getName + " from first list, got " + body));

fetchUrls(D, E)
  .subscribeOn(Schedulers.boundedElastic())
  .subscribe(body -> System.out.prinln(Thread.currentThread().getName + " from second list, got " + body));

输出

代码语言:javascript
复制
boundedElastic-1 from first list, got A
boundedElastic-2 from second list, got D
boundedElastic-1 from first list, got B
boundedElastic-2 from second list, got E
boundedElastic-1 from first list, got C
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/48073315

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档