如何使用5个CompletableFutures异步执行20个可运行任务(或1个任务20次)?
这就是我得到的:
Runnable task = () -> {
long startTime = System.currentTimeMillis();
Random random = new Random();
while (System.currentTimeMillis() - startTime < 3000) {
DoubleStream.generate(() -> random.nextDouble())
.limit(random.nextInt(100))
.map(n -> Math.cos(n))
.sum();
}
System.out.println("Done");
};
for (int i = 0; i < 4; i++) {
CompletableFuture<Void> future1 = CompletableFuture.runAsync(task);
CompletableFuture<Void> future2 = CompletableFuture.runAsync(task);
CompletableFuture<Void> future3 = CompletableFuture.runAsync(task);
CompletableFuture<Void> future4 = CompletableFuture.runAsync(task);
CompletableFuture<Void> future5 = CompletableFuture.runAsync(task);
future1.get();
future2.get();
future3.get();
future4.get();
future5.get();
}如果我执行这段代码,我可以看到它只异步运行了3次future.get():3次,然后是在1For()迭代期间剩下的2次
因此,我想尽可能异步地完成所有20个任务
发布于 2018-04-15 16:13:08
可以使用allOf将多个任务作为一个任务同时运行。首先,我创建了5个任务的组合(与您的问题相同),但随后我添加了10个任务(并且只运行了两次),并获得了一半的执行时间。
for (int i = 0; i < 2; i++) {
CompletableFuture<Void> future1 = CompletableFuture.runAsync(task);
CompletableFuture<Void> future2 = CompletableFuture.runAsync(task);
// and so on until ten
CompletableFuture<Void> future10 = CompletableFuture.runAsync(task);
CompletableFuture<Void> combined = CompletableFuture.allOf(future1, future2, future3, future4, future5, future6, future7, future8, future9, future10);
combined.get();
}发布于 2018-04-16 21:12:14
CompletableFuture的缺省执行器是ForkJoinPool的公共池,它的缺省目标并行度与CPU核心数减1相匹配。因此,如果您有四个核心,最多有三个作业将被异步执行。由于您强制每5个作业等待完成一次,因此您将获得三个并行执行,然后在每个循环迭代中执行两个并行执行。
如果您想要获得特定的执行策略,比如您选择的并行性,最好的方法是指定一个正确配置的执行器。然后,您应该让executor管理并行性,而不是在循环中等待。
ExecutorService pool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 20; i++) {
CompletableFuture.runAsync(task, pool);
}
pool.shutdown();
pool.awaitTermination(1, TimeUnit.DAYS); // wait for the completion of all tasks这允许五个并行作业,但将让五个线程中的每个线程在一个作业完成后立即获得一个新作业,而不是等待下一个循环迭代。
但是当你说
所以,我想尽可能异步地完成所有20个任务
不清楚为什么要在调度五个作业之后强制等待。最大并行度可以通过
ExecutorService pool = Executors.newCachedThreadPool();
for (int i = 0; i < 20; i++) {
CompletableFuture.runAsync(task, pool);
}
pool.shutdown();
pool.awaitTermination(1, TimeUnit.DAYS); // wait for the completion of all tasks这可能会产生与作业一样多的线程,除非一个作业在所有作业都被调度之前完成,就像在这种情况下,工作线程可能会选择一个新作业。
但是这个逻辑根本不需要CompletableFuture。您还可以使用:
ExecutorService pool = Executors.newCachedThreadPool();
// schedule 20 jobs and return when all completed
pool.invokeAll(Collections.nCopies(20, Executors.callable(task)));
pool.shutdown();但是当您的工作不涉及I/O或任何其他类型的等待响应时。释放CPU,创建比CPU核心更多的线程是没有意义的。配置为处理器数量的池更可取。
ExecutorService pool = Executors.newWorkStealingPool(
Runtime.getRuntime().availableProcessors());
// schedule 20 jobs at return when all completed
pool.invokeAll(Collections.nCopies(20, Executors.callable(task)));
pool.shutdown();在你的特殊情况下,这可能会运行较慢,因为当你的作业使用系统时间时,当有更多的线程比核心时,看起来运行得更快,但实际上做的工作更少。但对于普通的计算任务,这将提高性能。
发布于 2018-04-15 16:13:55
将以下系统属性设置为您希望公共派生联接池使用的线程数:
java.util.concurrent.ForkJoinPool.common.parallelism 请参阅ForkJoinPool
原因是在构建可完成的未来时,您没有指定自己的fork join池,所以它隐式地使用
ForkJoinPool.commonPool()https://stackoverflow.com/questions/49839557
复制相似问题