首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用BackpressureStrategy的RxJava2 Flowable.create blockingSubscribe语义

使用BackpressureStrategy的RxJava2 Flowable.create blockingSubscribe语义
EN

Stack Overflow用户
提问于 2017-03-14 00:13:19
回答 1查看 1.1K关注 0票数 0

我很难理解Flowable BackpressureStrategy是如何与blockingSubscribe方法一起工作的--或者这对我来说似乎是意想不到的,如果有人能给我解释一下,我会很感激的。

我在当前主干的FlowableTests文件中测试了这段代码。

代码语言:javascript
复制
@Test
public void testCreateBackpressureDrop() {
    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> e) throws Exception {
            e.onNext(1);
            e.onNext(3);
            e.onNext(4);
            e.onComplete();
        }
    }, BackpressureStrategy.DROP).blockingSubscribe(w);

    verify(w, times(1)).onNext(1);
    verify(w, times(1)).onNext(3);
    verify(w, times(1)).onNext(4);
    verify(w, times(1)).onComplete();
}

如果我使用subscribe(w)BackpressureStragegy.DROPBackpressure.BUFFER,测试就会通过。但是,如果我使用blockingSubscribe(w)Backpressure.BUFFER会通过,但Backpressure.DROP会失败,并指出从未调用过onNext(1)

谢谢!

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-03-14 03:48:59

这是使用Mockito模拟Subscriber的一个典型问题:您必须在其onSubscribe中调用request(N)

代码语言:javascript
复制
@SuppressWarnings("unchecked")
public static <T> Subscriber<T> mockSubscriber() {
    Subscriber<T> w = mock(Subscriber.class);

    Mockito.doAnswer(new Answer<Object>() {
        @Override
        public Object answer(InvocationOnMock a) throws Throwable {
            Subscription s = a.getArgument(0);
            s.request(Long.MAX_VALUE);
            return null;
        }
    }).when(w).onSubscribe((Subscription)any());

    return w;
}

blockingSubscribe的复杂之处在于,它在FlowableOnSubscribe运行之后执行上面的w.onSubscribe

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42768400

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档