我艰难地发现,JVM只使用一个线程池来并行处理流。我们在一个大的流上有一个I/O阻塞的函数,这会导致与不相关的并行流一起使用的不相关的快速函数的活跃性问题。
流上没有允许使用备用线程池的方法。
有没有一种简单的方法来避免这个问题,也许可以以某种方式指定使用哪个线程池?
发布于 2015-05-03 02:22:25
我编写了一个名为StreamEx的小型库,它可以向自定义FJP提交任务。这样你就可以写
ForkJoinPool forkJoinPool = new ForkJoinPool(2);
int[] primes = IntStreamEx.range(1, 1_000_000)
.parallel(forkJoinPool)
.filter(PrimesPrint::isPrime).toArray();它只是简单地记住你的池,并启动其中的终端操作,加入结果。只是前面提到的解决方案的语法糖。
发布于 2016-02-08 22:35:12
您可以将阻塞操作包装在ForkJoinPool.ManagedBlocker中,如下所示:
static <T> Supplier<T> blocking(Supplier<T> supplier) {
return new Supplier<T>() {
volatile T result;
@Override
public T get() {
try {
ForkJoinPool.managedBlock(new ManagedBlocker() {
@Override
public boolean block() {
result = supplier.get();
return true;
}
@Override
public boolean isReleasable() {
return result != null;
}
});
}
catch (InterruptedException e) {
throw new RuntimeException(e);
}
return result;
}
};
}然后,例如,像这样使用它:
Stream.generate(blocking(() -> ...))
.parallel()
...
.collect(...);更多信息可以在这篇博客文章中找到:http://zeroturnaround.com/rebellabs/java-parallel-streams-are-bad-for-your-health/
jOOλ通过org.jooq.lambda.Blocking为所有Java8 FunctionalInterface类型提供了包装器,就像上面一样,所以您可以编写:
Stream.generate(Blocking.supplier(() -> ...))
.parallel()
...
.collect(...);或者,例如,当过滤器阻塞时:
Stream....
.parallel()
.filter(Blocking.predicate(t -> blockingTest(t)))
.collect(...);(免责声明,我为jOOλ背后的公司工作)。
发布于 2014-12-21 07:53:51
这可能类似于Custom thread pool in Java 8 parallel stream
这个问题在this blog中有进一步的讨论。
ForkJoinPool forkJoinPool = new ForkJoinPool(2);
forkJoinPool.submit(() ->
//parallel task here, for example
range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList())
).get();https://stackoverflow.com/questions/27584271
复制相似问题