首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >从List<CompletableFuture>到CompletableFuture<List>的转换

从List<CompletableFuture>到CompletableFuture<List>的转换
EN

Stack Overflow用户
提问于 2015-05-04 08:08:11
回答 9查看 46.8K关注 0票数 89

我正在尝试将List<CompletableFuture<X>>转换为CompletableFuture<List<T>>。这是非常有用的,因为您有许多异步任务,并且您需要获得所有这些任务的结果。

如果其中任何一个失败了,那么最终的未来就会失败。我就是这样实施的:

代码语言:javascript
复制
public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) {
    if(com.isEmpty()){
        throw new IllegalArgumentException();
    }
    Stream<? extends CompletableFuture<T>> stream = com.stream();
    CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>());
    return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> {
        x.add(y);
        return x;
    },exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> {
        ls1.addAll(ls2);
        return ls1;
    },exec));
}

要运行它:

代码语言:javascript
复制
ExecutorService executorService = Executors.newCachedThreadPool();
Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> {
    try {
        Thread.sleep((long) (Math.random() * 10));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return x;
}, executorService));
CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService);

如果他们中的任何一个失败了,那就失败了。即使有一百万种期货,它也能给出预期的产出。我的问题是:如果有5000多个期货,如果其中任何一个失败,我就会得到一个StackOverflowError

线程中的异常“池-1-线程-2611”java.lang.StackOverflowError在java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210) at java.util.concurrent.CompletableFuture$ThenCompose.run(CompletableFuture.java:1487) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193) at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210) at java.util.concurrent.CompletableFuture$ThenCompose.run(CompletableFuture.java:1487)

我做错什么了?

注:当任何未来失败时,上述返回的未来都会失败。所接受的答案也应包括这一点。

EN

回答 9

Stack Overflow用户

回答已采纳

发布于 2015-05-04 09:26:28

使用CompletableFuture.allOf(...)

代码语言:javascript
复制
static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
    return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
            .thenApply(v -> com.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
            );
}

关于你的执行情况的几点评论:

您对.thenComposeAsync.thenApplyAsync.thenCombineAsync的使用很可能没有达到预期的效果。这些...Async方法在一个单独的线程中运行提供给它们的函数。因此,在您的示例中,您将导致将新项添加到列表中,以便在提供的执行器中运行。没有必要将轻量级的操作填充到缓存的线程执行器中。没有充分的理由,不要使用thenXXXXAsync方法。

此外,不应使用reduce累积到可变容器中。即使当流是顺序的时候,它可能正确地工作,但是如果流是并行的,它就会失败。若要执行可变的缩减,请改用.collect

如果希望在第一次失败后立即完成整个计算,请在sequence方法中执行以下操作:

代码语言:javascript
复制
CompletableFuture<List<T>> result = CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
        .thenApply(v -> com.stream()
                .map(CompletableFuture::join)
                .collect(Collectors.toList())
        );

com.forEach(f -> f.whenComplete((t, ex) -> {
    if (ex != null) {
        result.completeExceptionally(ex);
    }
}));

return result;

此外,如果希望在第一次失败时取消其余操作,请在exec.shutdownNow();后面添加result.completeExceptionally(ex);。当然,这假设exec只存在于这一次计算中。如果没有,您将不得不循环并分别取消每个剩余的Future

票数 104
EN

Stack Overflow用户

发布于 2016-05-28 08:47:38

您可以获得Spotify的CompletableFutures库并使用allAsList方法。我认为这是受番石榴的Futures.allAsList方法的启发。

代码语言:javascript
复制
public static <T> CompletableFuture<List<T>> allAsList(
    List<? extends CompletionStage<? extends T>> stages) {

如果您不想使用库,下面是一个简单的实现:

代码语言:javascript
复制
public <T> CompletableFuture<List<T>> allAsList(final List<CompletableFuture<T>> futures) {
    return CompletableFuture.allOf(
        futures.toArray(new CompletableFuture[futures.size()])
    ).thenApply(ignored ->
        futures.stream().map(CompletableFuture::join).collect(Collectors.toList())
    );
}
票数 12
EN

Stack Overflow用户

发布于 2015-05-04 11:08:42

作为米莎指出,您过度使用了…Async操作。此外,您还构建了一个复杂的操作链,用于建模不反映程序逻辑的依赖关系:

  • 创建一个作业x,它取决于列表中的第一个和第二个作业。
  • 创建一个作业x+1,它依赖于作业x和列表中的第三个作业。
  • 创建一个作业x+2,它依赖于作业x+1和列表中的第四个作业。
  • 创建一个作业x+5000,它依赖于作业x+4999和列表中的最后一个作业。

然后,取消(显式或由于异常)这个递归组合的作业可能会递归地执行,并且可能在StackOverflowError中失败。那是依赖于实现的。

作为米莎已经展示了,有一种方法,allOf,它允许您建模您的原始意图,定义一个取决于您列表中所有作业的作业。

然而,值得注意的是,即使这样也没有必要。因为您使用的是无界线程池执行器,所以您可以简单地将收集结果的异步任务发布到列表中,这样就完成了。无论如何,等待完成意味着询问每个工作的结果。

代码语言:javascript
复制
ExecutorService executorService = Executors.newCachedThreadPool();
List<CompletableFuture<Integer>> que = IntStream.range(0, 100000)
  .mapToObj(x -> CompletableFuture.supplyAsync(() -> {
    LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos((long)(Math.random()*10)));
    return x;
}, executorService)).collect(Collectors.toList());
CompletableFuture<List<Integer>> sequence = CompletableFuture.supplyAsync(
    () -> que.stream().map(CompletableFuture::join).collect(Collectors.toList()),
    executorService);

当线程数量有限且作业可能产生额外的异步作业时,使用合成依赖操作的方法很重要,以避免等待作业从必须先完成的作业中窃取线程,但这里也是如此。

在这种特殊情况下,一个任务简单地迭代大量的先决条件任务,并在必要时等待,可能比建模大量依赖项和让每个作业通知依赖作业的完成更有效。

票数 11
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/30025428

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档