下面是测试代码
final Flowable<Integer> f1 = Flowable.fromPublisher(s -> {
s.onNext(Integer.valueOf(1));
s.onComplete();
});
final Flowable<Integer> f2 = Flowable.fromPublisher(s -> {
s.onNext(Integer.valueOf(2));
s.onComplete();
});
Flowable.zip(f1, f2, (i1, i2) -> "" + i1 + i2)
.blockingSubscribe(System.out::println);它会变得
Exception in thread "main" java.lang.NullPointerException
at io.reactivex.internal.operators.flowable.FlowableZip$ZipSubscriber.onNext(FlowableZip.java:386)我不明白为什么?
如果我像这样更新代码
final Flowable<Integer> f1 = Flowable.<Integer>fromPublisher(s -> {
s.onNext(Integer.valueOf(1));
s.onComplete();
}).onErrorResumeNext(Flowable.empty());
final Flowable<Integer> f2 = Flowable.<Integer>fromPublisher(s -> {
s.onNext(Integer.valueOf(2));
s.onComplete();
}).onErrorResumeNext(Flowable.empty());
Flowable.zip(f1, f2, (i1, i2) -> "" + i1 + i2)
.blockingSubscribe(System.out::println);它将像预期的那样打印12。但是为什么呢?这没有意义。
发布于 2016-12-10 10:41:36
问题是,您使用fromPublisher违反了Publisher<T>的约定。
发布者需要按照Reactive Streams合同中指定的非常具体的方式进行操作。这种行为包括在拨打任何其他电话之前先呼叫Subscriber.onSubscribe(),并尊重该用户面临的压力。
因为您没有调用onSubscribe,所以内部queue变量永远不会初始化,并且在其onNext方法中调用queue.offer会导致NPE。
大概是通过使用onErrorResumeNext,实现确保了所有东西都被正确调用,“修复”了无效状态。
要解决您的问题,有两种可能:
Flowable.fromPublisher。它旨在与Reactive Streams宣言的其他实现建立桥梁,并且没有任何保障措施。相反,使用正确处理初始化的Flowable.create和backpressure.Observable,因为你的用例似乎并不关心反压力。为安全起见,再次使用Observable.create方法。https://stackoverflow.com/questions/41071413
复制相似问题