我正在尝试将List<CompletableFuture<X>>转换为CompletableFuture<List<T>>。这是非常有用的,因为您有许多异步任务,并且您需要获得所有这些任务的结果。
如果其中任何一个失败了,那么最终的未来就会失败。我就是这样实施的:
public static <T> CompletableFuture<List<T>> sequence2(List<CompletableFuture<T>> com, ExecutorService exec) {
if(com.isEmpty()){
throw new IllegalArgumentException();
}
Stream<? extends CompletableFuture<T>> stream = com.stream();
CompletableFuture<List<T>> init = CompletableFuture.completedFuture(new ArrayList<T>());
return stream.reduce(init, (ls, fut) -> ls.thenComposeAsync(x -> fut.thenApplyAsync(y -> {
x.add(y);
return x;
},exec),exec), (a, b) -> a.thenCombineAsync(b,(ls1,ls2)-> {
ls1.addAll(ls2);
return ls1;
},exec));
}要运行它:
ExecutorService executorService = Executors.newCachedThreadPool();
Stream<CompletableFuture<Integer>> que = IntStream.range(0,100000).boxed().map(x -> CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep((long) (Math.random() * 10));
} catch (InterruptedException e) {
e.printStackTrace();
}
return x;
}, executorService));
CompletableFuture<List<Integer>> sequence = sequence2(que.collect(Collectors.toList()), executorService);如果他们中的任何一个失败了,那就失败了。即使有一百万种期货,它也能给出预期的产出。我的问题是:如果有5000多个期货,如果其中任何一个失败,我就会得到一个StackOverflowError。
线程中的异常“池-1-线程-2611”java.lang.StackOverflowError在java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210) at java.util.concurrent.CompletableFuture$ThenCompose.run(CompletableFuture.java:1487) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:193) at java.util.concurrent.CompletableFuture.internalComplete(CompletableFuture.java:210) at java.util.concurrent.CompletableFuture$ThenCompose.run(CompletableFuture.java:1487)
我做错什么了?
注:当任何未来失败时,上述返回的未来都会失败。所接受的答案也应包括这一点。
发布于 2015-05-04 09:26:28
使用CompletableFuture.allOf(...)
static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}关于你的执行情况的几点评论:
您对.thenComposeAsync、.thenApplyAsync和.thenCombineAsync的使用很可能没有达到预期的效果。这些...Async方法在一个单独的线程中运行提供给它们的函数。因此,在您的示例中,您将导致将新项添加到列表中,以便在提供的执行器中运行。没有必要将轻量级的操作填充到缓存的线程执行器中。没有充分的理由,不要使用thenXXXXAsync方法。
此外,不应使用reduce累积到可变容器中。即使当流是顺序的时候,它可能正确地工作,但是如果流是并行的,它就会失败。若要执行可变的缩减,请改用.collect。
如果希望在第一次失败后立即完成整个计算,请在sequence方法中执行以下操作:
CompletableFuture<List<T>> result = CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
com.forEach(f -> f.whenComplete((t, ex) -> {
if (ex != null) {
result.completeExceptionally(ex);
}
}));
return result;此外,如果希望在第一次失败时取消其余操作,请在exec.shutdownNow();后面添加result.completeExceptionally(ex);。当然,这假设exec只存在于这一次计算中。如果没有,您将不得不循环并分别取消每个剩余的Future。
发布于 2016-05-28 08:47:38
您可以获得Spotify的CompletableFutures库并使用allAsList方法。我认为这是受番石榴的Futures.allAsList方法的启发。
public static <T> CompletableFuture<List<T>> allAsList(
List<? extends CompletionStage<? extends T>> stages) {如果您不想使用库,下面是一个简单的实现:
public <T> CompletableFuture<List<T>> allAsList(final List<CompletableFuture<T>> futures) {
return CompletableFuture.allOf(
futures.toArray(new CompletableFuture[futures.size()])
).thenApply(ignored ->
futures.stream().map(CompletableFuture::join).collect(Collectors.toList())
);
}发布于 2015-05-04 11:08:42
作为米莎指出,您过度使用了…Async操作。此外,您还构建了一个复杂的操作链,用于建模不反映程序逻辑的依赖关系:
然后,取消(显式或由于异常)这个递归组合的作业可能会递归地执行,并且可能在StackOverflowError中失败。那是依赖于实现的。
作为米莎已经展示了,有一种方法,allOf,它允许您建模您的原始意图,定义一个取决于您列表中所有作业的作业。
然而,值得注意的是,即使这样也没有必要。因为您使用的是无界线程池执行器,所以您可以简单地将收集结果的异步任务发布到列表中,这样就完成了。无论如何,等待完成意味着询问每个工作的结果。
ExecutorService executorService = Executors.newCachedThreadPool();
List<CompletableFuture<Integer>> que = IntStream.range(0, 100000)
.mapToObj(x -> CompletableFuture.supplyAsync(() -> {
LockSupport.parkNanos(TimeUnit.MILLISECONDS.toNanos((long)(Math.random()*10)));
return x;
}, executorService)).collect(Collectors.toList());
CompletableFuture<List<Integer>> sequence = CompletableFuture.supplyAsync(
() -> que.stream().map(CompletableFuture::join).collect(Collectors.toList()),
executorService);当线程数量有限且作业可能产生额外的异步作业时,使用合成依赖操作的方法很重要,以避免等待作业从必须先完成的作业中窃取线程,但这里也是如此。
在这种特殊情况下,一个任务简单地迭代大量的先决条件任务,并在必要时等待,可能比建模大量依赖项和让每个作业通知依赖作业的完成更有效。
https://stackoverflow.com/questions/30025428
复制相似问题