我在一个运行在Java 16上的Jetty应用程序上工作,我试图将它升级到Java 17,但是对parallelStream()的调用完全造成了严重的性能问题。
唯一的更改是Java版本从16到17、--add-opens java.base/java.lang=ALL-UNNAMED --add-opens java.base/java.util=ALL-UNNAMED和运行时从openjdk:16.0.1-jdk-oraclelinux8到openjdk:17.0.1-jdk-oraclelinux8。
我们设法获得了一个线程转储,它包含了许多这样的内容:
"qtp1368594774-200" #200 prio=5 os_prio=0 cpu=475.94ms elapsed=7189.65s tid=0x00007fd49c50cc10 nid=0xd1 waiting on condition [0x00007fd48fef7000]
java.lang.Thread.State: WAITING (parking)
at jdk.internal.misc.Unsafe.park(java.base@17.0.1/Native Method)
- parking to wait for <0x00000007b73439a8> (a java.util.stream.ReduceOps$ReduceTask)
at java.util.concurrent.locks.LockSupport.park(java.base@17.0.1/LockSupport.java:341)
at java.util.concurrent.ForkJoinTask.awaitDone(java.base@17.0.1/ForkJoinTask.java:468)
at java.util.concurrent.ForkJoinTask.invoke(java.base@17.0.1/ForkJoinTask.java:687)
at java.util.stream.ReduceOps$ReduceOp.evaluateParallel(java.base@17.0.1/ReduceOps.java:927)
at java.util.stream.AbstractPipeline.evaluate(java.base@17.0.1/AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.collect(java.base@17.0.1/ReferencePipeline.java:682)
at com.stackoverflowexample.aMethodThatDoesBlockingIOUsingParallelStream()引起此问题的代码如下所示:
list.parallelStream()
.map(this::callRestServiceToGetSomeData)
.collect(Collectors.toUnmodifiableList());此图显示了从jdk16升级到jdk17 (中间的巨大峰值)之前的线程使用情况,然后删除对jdk17上的parallelStream()的调用(RHS):

Java 17 (OpenJDK-17.0.1_Linuxx64_bin.tar.gz)中的哪些更改导致了这种情况?
发布于 2022-11-11 23:05:18
我们都知道或被告知,创建一个新线程是一项繁重的工作。但在我看来做了几次测试还行。例如:下面是使用10_000线程在简单测试下面运行的内存使用情况。在我的笔记本电脑上大约花了2到3秒,jvm的使用率约为1.5G。
final int threadNum = 10_000;
final Callable<String> task = () -> {
String bigString = UUID.randomUUID().toString().repeat(1000);
assertTrue(bigString.chars().sum() > 0);
Thread.currentThread().sleep(1000);
return bigString;
};
final ExecutorService executorService = Executors.newFixedThreadPool(threadNum);
final List<Future<String>> futures = new ArrayList<>(threadNum);
for (int i = 0; i < threadNum; i++) {
futures.add(executorService.submit(task));
}
long ret = futures.stream().map(Fn.futureGet()).mapToInt(String::length).sum();
System.out.println(ret);
assertEquals(UUID.randomUUID().toString().length() * threadNum * 1000, ret);

我认为在大多数应用程序中创建/使用10_000是一个难得的机会。如果我将线程号更改为1000。同样,它花费了2到3秒,内存使用量大约为: 300 MB。

使用流api并行运行阻塞I/O调用是可能的还是一个好主意?我也这么想。下面是我的工具示例:算盘-普通
// Run above task by Stream.
ret = IntStreamEx.range(0, threadNum)
.parallel(threadNum)
.mapToObj(it -> Try.call(task))
.sequential()
.mapToInt(String::length)
.sum();
// Or other task
StreamEx.of(list)
.parallel(64) // Specify the concurrent thread number. It could be from 1 up to thousands.
.map(this::callRestServiceToGetSomeData)
.collect(Collectors.toUnmodifiableList());或者使用Java 19+中引入的19+
try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
StreamEx.of(list)
.parallel(executor)
.map(this::callRestServiceToGetSomeData)
.collect(Collectors.toUnmodifiableList());
}我知道这不是对问题的直接回答。但它可以解决原来提出这个问题的问题。
https://stackoverflow.com/questions/70384713
复制相似问题