我使用的是RxJava 2.0.0-RC4。
在我的应用程序中,我收到了一个Flowable<ListenableFuture<List<Integer>>。(ListenableFuture来自番石榴。)
我怎样才能把这个可流动的流压成一个Flowable<Integer>
我试过这个:
Flowable<ListenableFuture<List<Integer>>> futures = ...
Flowable<Integer> ints = futures.flatMap(future -> Flowable.fromIterable(future.get()));但这涉及到阻塞调用(Future.get())。理想情况下,我希望以一种完全非阻塞的方式创建可流动的,这样它就会在基础期货完成后立即发出项(我不需要在结果的可流动中保留元素的顺序)。
发布于 2016-10-25 15:23:15
你可以试试这样的方法:
@Test
public void name() throws Exception {
ListenableFuture<List<Integer>> listListenableFuture = Futures.immediateFuture(Arrays.asList(1, 2, 3));
Flowable<List<Integer>> futures = Flowable.fromCallable(() -> listListenableFuture.get());
Flowable<Integer> integerFlowable = futures
.flatMap(integers -> Flowable.fromIterable(integers));
TestSubscriber<Integer> test = integerFlowable.test();
test.assertValues(1, 2, 3);
}如果引入订阅If / observeOn,则测试将失败。我将通过挂在生命周期事件中重新包装ListenableFuture。
包装看起来如下所示:
@Test
public void name() throws Exception {
ListenableFuture<List<Integer>> listListenableFuture = Futures.immediateFuture(Arrays.asList(1, 2, 3));
Flowable<List<Integer>> listFlowable = Flowable.create(e -> {
if (e.isCancelled()) {
return;
}
try {
FutureCallback<List<Integer>> futureCallback = new FutureCallback<List<Integer>>() {
@Override
public void onSuccess(List<Integer> result) {
if (!e.isCancelled()) {
printCurrentThread("inOnSuccess");
e.onNext(result);
e.onComplete();
}
}
@Override
public void onFailure(Throwable t) {
if (!e.isCancelled()) {
e.onError(t);
}
}
};
Futures.addCallback(listListenableFuture, futureCallback);
e.setDisposable(Disposables.fromFuture(listListenableFuture, true));
} catch (Exception ex) {
e.onError(ex);
}
}, BackpressureStrategy.BUFFER);
Flowable<Integer> integerFlowable = listFlowable
.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.newThread())
.doOnNext(integers -> printCurrentThread("afterObserveOn"))
.flatMap(Flowable::fromIterable);
TestSubscriber<Integer> test = integerFlowable.test();
Thread.sleep(200);
test.assertResult(1, 2, 3);
}
private void printCurrentThread(String additional) {
System.out.println(additional + "_" + Thread.currentThread());
}https://stackoverflow.com/questions/40242599
复制相似问题