首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何异步地将Flowable<ListenableFuture<List<T>>>压平为Flowable<T>?

如何异步地将Flowable<ListenableFuture<List<T>>>压平为Flowable<T>?
EN

Stack Overflow用户
提问于 2016-10-25 14:24:29
回答 1查看 1.6K关注 0票数 1

我使用的是RxJava 2.0.0-RC4。

在我的应用程序中,我收到了一个Flowable<ListenableFuture<List<Integer>>。(ListenableFuture来自番石榴。)

我怎样才能把这个可流动的流压成一个Flowable<Integer>

我试过这个:

代码语言:javascript
复制
Flowable<ListenableFuture<List<Integer>>> futures = ...
Flowable<Integer> ints = futures.flatMap(future -> Flowable.fromIterable(future.get()));

但这涉及到阻塞调用(Future.get())。理想情况下,我希望以一种完全非阻塞的方式创建可流动的,这样它就会在基础期货完成后立即发出项(我不需要在结果的可流动中保留元素的顺序)。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-10-25 15:23:15

你可以试试这样的方法:

代码语言:javascript
复制
@Test
public void name() throws Exception {
    ListenableFuture<List<Integer>> listListenableFuture = Futures.immediateFuture(Arrays.asList(1, 2, 3));

    Flowable<List<Integer>> futures = Flowable.fromCallable(() -> listListenableFuture.get());

    Flowable<Integer> integerFlowable = futures
            .flatMap(integers -> Flowable.fromIterable(integers));

    TestSubscriber<Integer> test = integerFlowable.test();

    test.assertValues(1, 2, 3);
}

如果引入订阅If / observeOn,则测试将失败。我将通过挂在生命周期事件中重新包装ListenableFuture。

包装看起来如下所示:

代码语言:javascript
复制
    @Test
public void name() throws Exception {
    ListenableFuture<List<Integer>> listListenableFuture = Futures.immediateFuture(Arrays.asList(1, 2, 3));

    Flowable<List<Integer>> listFlowable = Flowable.create(e -> {
        if (e.isCancelled()) {
            return;
        }

        try {
            FutureCallback<List<Integer>> futureCallback = new FutureCallback<List<Integer>>() {
                @Override
                public void onSuccess(List<Integer> result) {
                    if (!e.isCancelled()) {
                        printCurrentThread("inOnSuccess");

                        e.onNext(result);
                        e.onComplete();
                    }
                }

                @Override
                public void onFailure(Throwable t) {
                    if (!e.isCancelled()) {
                        e.onError(t);
                    }
                }
            };

            Futures.addCallback(listListenableFuture, futureCallback);

            e.setDisposable(Disposables.fromFuture(listListenableFuture, true));

        } catch (Exception ex) {
            e.onError(ex);
        }

    }, BackpressureStrategy.BUFFER);

    Flowable<Integer> integerFlowable = listFlowable
            .subscribeOn(Schedulers.computation())
            .observeOn(Schedulers.newThread())
            .doOnNext(integers -> printCurrentThread("afterObserveOn"))
            .flatMap(Flowable::fromIterable);

    TestSubscriber<Integer> test = integerFlowable.test();

    Thread.sleep(200);

    test.assertResult(1, 2, 3);
}

private void printCurrentThread(String additional) {
    System.out.println(additional + "_" + Thread.currentThread());
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/40242599

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档