我有一份任务清单。每个任务都是相互独立的(它们不使用彼此的结果)。
当有1000个任务并使用顺序流处理这些任务时。
tasks.forEach(task->{
// long running task
task.run();
System.out.println("Thread: " + Thread.currentThread().getName());
});..then,第二个任务在第一个任务之后运行,依此类推。循环以阻塞和顺序模式运行(第二个任务只在第一个任务完成后完成)。
:并行处理每个任务的最佳方法是什么?
这是最好的方法吗?
tasks.parallelStream().forEach(task->{
// long running task
task.run();
System.out.println("Thread: " + Thread.currentThread().getName());
});根据如果可能的话,我应该总是使用并行流吗?,应该避免使用并行流。在我的例子中,这些任务是相互独立的,我不需要使用parallelStream()提供的同步开销。但是,在使用parallelStream()时没有禁用同步开销的选项。或?
有比parallelStream()更好的用例方法吗?
发布于 2019-11-21 22:27:31
对您来说,一个很好的解决方案是使用CompletableFuture.allOf。像这样使用它:
ExecutorService ex = //Whatever executor you want;
CompletableFuture.allOf((CompletableFuture<Void>[]) tasks.stream()
.map(task -> CompletableFuture.runAsync((() -> /* Do task */), ex))
.toArray());这样做,您可以执行异步的,非阻塞的.此外,您将得到一个关于类型转换的编译器警告,但我认为在您的情况下,忽略它可能是安全的。
ExecutorService.submit将触发任务,但是当您使用get获取任何结果时,它将阻塞并检索。CompletableFuture在获取数据时不会阻塞。当您希望在所有并行任务完成后看到某种结果返回时,就会出现这种情况。
更多的解释可以找到这里。
另外,在您最初的问题中,您问使用parallelStream是否是一个好主意,我的回答是,这不是一个好主意,因为如果有一个任务阻塞线程,那么您将遇到问题(假设您在代码中的所有位置都使用了parallelStream )。
此外,CompletableFuture可以接受它自己的线程池(您可以自定义线程池)并在那里运行。注意上面代码中runAsync的第二个参数。
如果您只是想有一个火灾和忘记机制,而不关心结果,那么使用ExecutorService.invokeAll是一个很好的方式来做事情。你可以这样使用它:
executorService.invokeAll(tasks.stream().map(task -> new Callable<Void>() {
@Override
public Void call() throws Exception {
// run task;
return null;
}
})
.collect(Collectors.toList())); 但是,在这种情况下,为什么要使用带有自己的CompletableFuture的ExecutorService呢?
发布于 2019-11-21 21:50:10
在Java8中,parallelStream()使用在JVM启动时初始化的ForkJoinCommonPool,它包含一个固定数量的线程,这些线程更适合按照“分而治之”的模式工作。在您的例子中,由于它们都是孤立的,所以使用ExecutorService可能更合适。
https://stackoverflow.com/questions/58984189
复制相似问题