首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >为什么rx-java邮政编码失败

为什么rx-java邮政编码失败
EN

Stack Overflow用户
提问于 2016-12-10 09:59:47
回答 1查看 161关注 0票数 0

下面是测试代码

代码语言:javascript
复制
    final Flowable<Integer> f1 = Flowable.fromPublisher(s -> {
        s.onNext(Integer.valueOf(1));
        s.onComplete();
    });


    final Flowable<Integer> f2 = Flowable.fromPublisher(s -> {
        s.onNext(Integer.valueOf(2));
        s.onComplete();
    });

    Flowable.zip(f1, f2, (i1, i2) -> "" + i1 + i2)
            .blockingSubscribe(System.out::println);

它会变得

代码语言:javascript
复制
Exception in thread "main" java.lang.NullPointerException
    at io.reactivex.internal.operators.flowable.FlowableZip$ZipSubscriber.onNext(FlowableZip.java:386)

我不明白为什么?

如果我像这样更新代码

代码语言:javascript
复制
    final Flowable<Integer> f1 = Flowable.<Integer>fromPublisher(s -> {
        s.onNext(Integer.valueOf(1));
        s.onComplete();
    }).onErrorResumeNext(Flowable.empty());


    final Flowable<Integer> f2 = Flowable.<Integer>fromPublisher(s -> {
        s.onNext(Integer.valueOf(2));
        s.onComplete();
    }).onErrorResumeNext(Flowable.empty());

    Flowable.zip(f1, f2, (i1, i2) -> "" + i1 + i2)
            .blockingSubscribe(System.out::println);

它将像预期的那样打印12。但是为什么呢?这没有意义。

EN

回答 1

Stack Overflow用户

发布于 2016-12-10 10:41:36

问题是,您使用fromPublisher违反了Publisher<T>的约定。

发布者需要按照Reactive Streams合同中指定的非常具体的方式进行操作。这种行为包括在拨打任何其他电话之前先呼叫Subscriber.onSubscribe(),并尊重该用户面临的压力。

因为您没有调用onSubscribe,所以内部queue变量永远不会初始化,并且在其onNext方法中调用queue.offer会导致NPE。

大概是通过使用onErrorResumeNext,实现确保了所有东西都被正确调用,“修复”了无效状态。

要解决您的问题,有两种可能:

  1. 不使用Flowable.fromPublisher。它旨在与Reactive Streams宣言的其他实现建立桥梁,并且没有任何保障措施。相反,使用正确处理初始化的Flowable.create和backpressure.
  2. Use非反压感知的Observable,因为你的用例似乎并不关心反压力。为安全起见,再次使用Observable.create方法。
票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/41071413

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档