我很难理解Flowable BackpressureStrategy是如何与blockingSubscribe方法一起工作的--或者这对我来说似乎是意想不到的,如果有人能给我解释一下,我会很感激的。
我在当前主干的FlowableTests文件中测试了这段代码。
@Test
public void testCreateBackpressureDrop() {
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(3);
e.onNext(4);
e.onComplete();
}
}, BackpressureStrategy.DROP).blockingSubscribe(w);
verify(w, times(1)).onNext(1);
verify(w, times(1)).onNext(3);
verify(w, times(1)).onNext(4);
verify(w, times(1)).onComplete();
}如果我使用subscribe(w)和BackpressureStragegy.DROP或Backpressure.BUFFER,测试就会通过。但是,如果我使用blockingSubscribe(w),Backpressure.BUFFER会通过,但Backpressure.DROP会失败,并指出从未调用过onNext(1)。
谢谢!
发布于 2017-03-14 03:48:59
这是使用Mockito模拟Subscriber的一个典型问题:您必须在其onSubscribe中调用request(N)
@SuppressWarnings("unchecked")
public static <T> Subscriber<T> mockSubscriber() {
Subscriber<T> w = mock(Subscriber.class);
Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock a) throws Throwable {
Subscription s = a.getArgument(0);
s.request(Long.MAX_VALUE);
return null;
}
}).when(w).onSubscribe((Subscription)any());
return w;
}blockingSubscribe的复杂之处在于,它在FlowableOnSubscribe运行之后执行上面的w.onSubscribe。
https://stackoverflow.com/questions/42768400
复制相似问题