首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >流处理线程池耗尽

流处理线程池耗尽
EN

Stack Overflow用户
提问于 2014-12-21 04:58:42
回答 3查看 979关注 0票数 3

我艰难地发现,JVM只使用一个线程池来并行处理流。我们在一个大的流上有一个I/O阻塞的函数,这会导致与不相关的并行流一起使用的不相关的快速函数的活跃性问题。

流上没有允许使用备用线程池的方法。

有没有一种简单的方法来避免这个问题,也许可以以某种方式指定使用哪个线程池?

EN

回答 3

Stack Overflow用户

发布于 2015-05-03 02:22:25

我编写了一个名为StreamEx的小型库,它可以向自定义FJP提交任务。这样你就可以写

代码语言:javascript
复制
ForkJoinPool forkJoinPool = new ForkJoinPool(2);
int[] primes = IntStreamEx.range(1, 1_000_000)
    .parallel(forkJoinPool)
    .filter(PrimesPrint::isPrime).toArray();

它只是简单地记住你的池,并启动其中的终端操作,加入结果。只是前面提到的解决方案的语法糖。

票数 2
EN

Stack Overflow用户

发布于 2016-02-08 22:35:12

您可以将阻塞操作包装在ForkJoinPool.ManagedBlocker中,如下所示:

代码语言:javascript
复制
static <T> Supplier<T> blocking(Supplier<T> supplier) {
    return new Supplier<T>() {
        volatile T result;

        @Override
        public T get() {
            try {
                ForkJoinPool.managedBlock(new ManagedBlocker() {
                    @Override
                    public boolean block() {
                        result = supplier.get();
                        return true;
                    }

                    @Override
                    public boolean isReleasable() {
                        return result != null;
                    }
                });
            }
            catch (InterruptedException e) {
                throw new RuntimeException(e);
            }

            return result;
        }
    };
}

然后,例如,像这样使用它:

代码语言:javascript
复制
Stream.generate(blocking(() -> ...))
      .parallel()
      ...
      .collect(...);

更多信息可以在这篇博客文章中找到:http://zeroturnaround.com/rebellabs/java-parallel-streams-are-bad-for-your-health/

jOOλ通过org.jooq.lambda.Blocking为所有Java8 FunctionalInterface类型提供了包装器,就像上面一样,所以您可以编写:

代码语言:javascript
复制
Stream.generate(Blocking.supplier(() -> ...))
      .parallel()
      ...
      .collect(...);

或者,例如,当过滤器阻塞时:

代码语言:javascript
复制
Stream....
      .parallel()
      .filter(Blocking.predicate(t -> blockingTest(t)))
      .collect(...);

(免责声明,我为jOOλ背后的公司工作)。

票数 2
EN

Stack Overflow用户

发布于 2014-12-21 07:53:51

这可能类似于Custom thread pool in Java 8 parallel stream

这个问题在this blog中有进一步的讨论。

代码语言:javascript
复制
ForkJoinPool forkJoinPool = new ForkJoinPool(2);
forkJoinPool.submit(() ->
    //parallel task here, for example
    range(1, 1_000_000).parallel().filter(PrimesPrint::isPrime).collect(toList())
).get();
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/27584271

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档