Observable<Object> obs1 = Observable
.create(subscriber -> subscriber.onNext("obs 1 event"))
.doOnSubscribe(() -> System.out.println("obs1 sub"))
.doOnUnsubscribe(() -> System.out.println("obs1 unsub"));
Observable<Object> obs2 = Observable
.create(subscriber -> subscriber.onNext("obs 2 event"))
.doOnSubscribe(() -> System.out.println("obs2 sub"))
.doOnUnsubscribe(() -> System.out.println("obs2 unsub"));
Observable
.amb(obs1, obs2)
.subscribe(System.out::println);
Thread.sleep(500);应该调用obs2 doOn*方法,并且应该只发送两个事件中的一个。程序输出:
obs1 sub
obs 1 event不调用obs2的*订阅方法。
发布于 2018-07-12 07:43:26
默认情况下,RxJava源和操作符是同步的,除非它们使用调度程序引入异步。上面没有任何代码涉及到任何调度程序,所以执行将是同步的。amb没有尝试订阅第二个,因为第一个已经赢得了比赛。
当我用
just替换create时,它的工作原理就像我想要的那样
just产生不同结果的原因是背压,而您没有在不推荐的create使用中实现它。amb首先订阅源,然后从源请求,从而获得订阅的副作用。随着您的实现中断,第一个源立即推送其项,从而使amb进入其win状态,从而防止了第二个订阅的发生。
https://stackoverflow.com/questions/51280361
复制相似问题