我想要创建一个可观察的,它只在新值与前一个值不同的情况下,从底层的热可观测(从之前的-1开始)发出值。此外,我希望立即向新订阅者发出最新的值。我想出了以下代码:
PublishSubject<Integer> hotObservable = PublishSubject.create();
Observable<Integer> observable = hotObservable
.startWith(-1)
.distinctUntilChanged()
.replay(1)
.autoConnect(0);但是,当我不自动连接,但手动订阅时,在第一个值(始终是-1,无论hotObservable在订阅observable之前发出什么)发送到java.lang.IllegalStateException: more produced than requested的新订阅服务器之后,这会失败:
Observable<Integer> observable = hotObservable
.startWith(-1)
.distinctUntilChanged()
.replay(1)
.autoConnect();
observable.subscribe().unsubscribe();后续订阅服务器正常工作,接收最后的值,然后更新。
我不能让replay(1).autoConnect(0)工作,我觉得我错过了什么-为什么订阅和取消订阅工作,而autoConnect(0)不呢?怎样才能创造出这样的可观察性呢?
除非我使用autoConnect(); observable.subscribe().unsubscribe(),否则这里的测试方法会失败
Observable<Integer> observable = hotObservable
.startWith(-1)
.distinctUntilChanged()
.replay(1)
.autoConnect(); // With (0) it fails
observable.subscribe().unsubscribe(); // Needed if we don't auto connnect
hotObservable.onNext(1);
hotObservable.onNext(2);
hotObservable.onNext(3); // I want this value to be received by new subscriber
TestSubscriber<Integer> subscriber = TestSubscriber.create();
observable.subscribe(subscriber);
subscriber.assertNoErrors();
subscriber.assertValues(3);发布于 2016-04-11 09:53:27
上面的代码在More produced than requested 1.1.3上没有得到RxJava错误。
断言失败的原因是,在任何订阅者实际请求之前,replay不会从上游请求任何东西。如果TestSubscriber是第一个订阅的,它将触发startWith发出-1,然后切换到不保留任何值的PublishSubject,这样就不会收到任何其他信息。
我相信您要寻找的是BehaviorSubject,它保留了最后的价值,并以新订阅者的值开头:
BehaviorSubject<Integer> hotObservable = BehaviorSubject.create(-1);
Observable<Integer> observable = hotObservable.distinctUntilChanged();
hotObservable.onNext(1);
hotObservable.onNext(2);
hotObservable.onNext(3);
TestSubscriber<Integer> subscriber = TestSubscriber.create();
observable.subscribe(subscriber);
subscriber.assertNoErrors();
subscriber.assertValue(3);https://stackoverflow.com/questions/36544843
复制相似问题