首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Rxjava Scheduler.trampoline与连接映射

Rxjava Scheduler.trampoline与连接映射
EN

Stack Overflow用户
提问于 2019-12-25 21:47:25
回答 2查看 115关注 0票数 0

似乎基于文档,Scheduler.trampoline确保元素发出的是先入先出(即按顺序)。concat map的要点似乎也是为了确保所有内容都被适当地排列起来,然后发出。所以我想知道,应用subscribeOn./.observeOn(Scheduler.trampoline()),然后进行连接映射运算符,而不是常规的映射操作,是否有任何意义。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2019-12-26 13:43:14

是的,这是有道理的。举个例子:

代码语言:javascript
复制
Observable.just(1, 2, 3, 4, 5)
    .subscribeOn(Schedulers.trampoline())
    .flatMap(
        a -> {
          if (a < 3) {
            return Observable.just(a).delay(3, TimeUnit.SECONDS);
          } else {
            return Observable.just(a);
          }
        })
    .doOnNext(
        a -> System.out.println("Element: " + a + ", on: " + Thread.currentThread().getName()))
    .subscribe();

下面是输出:

代码语言:javascript
复制
Element: 3, on: main
Element: 4, on: main
Element: 5, on: main
Element: 1, on: RxComputationScheduler-1
Element: 2, on: RxComputationScheduler-2

这里发生的情况是1和2依次到达flatMap运算符。但是现在,这些元素的内部流被延迟了3秒。请注意,flatMap急切地subscribes到内部流。也就是说,它不会在subscribing到下一个内部流(这就是concatMap所做的事情)之前等待一个流完成(使用onComplete)。

因此,1和2的内部流延迟了3秒。您可以说这是一个需要花费一些时间的外部I/O调用。同时,接下来的3个元素(3,4,5)进入flatMap,它们的流立即结束。这就是为什么您在输出中看到保持序列的原因。

然后3秒过去,元素1和2被发出。请注意,不能保证1会在2之前。

现在用concatMap替换flatMap,您将看到序列保持不变:

代码语言:javascript
复制
Element: 1, on: RxComputationScheduler-1
Element: 2, on: RxComputationScheduler-2
Element: 3, on: RxComputationScheduler-2
Element: 4, on: RxComputationScheduler-2
Element: 5, on: RxComputationScheduler-2

为什么?因为这就是concatMap的工作方式。元素1出现,并在I/O调用中使用。内部流对应的内部流需要3秒的时间才会发出onComplete。在第一个流发出onComplete之前,concatMap不订阅与其余元素对应的内部流。一旦完成,下一个流(Observable.just(2).delay(3, TimeUnit.SECONDS))就是subscribed to,依此类推。因此,您可以看到顺序是如何维护的。

关于这两个运算符,你需要记住的是:当元素到达时,flatMap急切地subscribes到内部流。另一方面,在subscribes到下一个流之前,concatMap会等待一个流完成。这就是为什么不能用concatMap进行并行调用的原因。

票数 1
EN

Stack Overflow用户

发布于 2019-12-28 19:57:15

不怎么有意思。trampoline本质上是在一个线程上执行工作,这些线程以先进先出的顺序调用它的Worker.schedule方法。

Observable.subscribeOn(Schedulers.trampoline())的情况下,它将是线程订阅,因此应用它没有实际效果。

Observable.observeOn(Schedulers.trampoline())的情况下,它将是线程信令项,因此也没有实际效果。

concatMap在发送上游项信号的线程或内部Observable完成的线程上执行映射器函数。操作员本质上已经有一个内置的弹床,所以上游的项目和下游的完成不会重叠。在3.x中,将有一个overload接受SchedulerSchedulers.trampoline()对此也没有实际效果。

Schedulers.trampoline()最好的用例是在不需要异步的单元测试中。因此,您可以参数化您的subscribeOn/observeOn使用,或者使用调度程序挂钩并替换标准的调度程序:

代码语言:javascript
复制
 RxJavaPlugins.setComputationSchedulerHandler(s -> Schedulers.trampoline());
 RxJavaPlugins.setIoSchedulerHandler(s -> Schedulers.trampoline());
 RxJavaPlugins.setNewThreadSchedulerHandler(s -> Schedulers.trampoline());

一旦你完成了,

代码语言:javascript
复制
 RxJavaPlugins.reset();
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59478996

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档