首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >StreamEx.parallel().forEach()不会在.map()之后并行运行

StreamEx.parallel().forEach()不会在.map()之后并行运行
EN

Stack Overflow用户
提问于 2016-10-21 13:01:45
回答 2查看 791关注 0票数 8

我注意到,如果我使用StreamEx库将流与自定义ForkJoinPool并行,如下所示-随后的操作确实在该池中的并行线程中运行。但是,如果我添加map()操作并并行结果流--只使用池中的一个线程。

下面是演示此问题的最小工作示例的完整代码(没有所有导入)。executeAsParallelFromList()和executeAsParallelAfterMap()方法之间唯一的区别是添加.map(.)在.parallel()之前调用。

代码语言:javascript
复制
import one.util.streamex.StreamEx;

public class ParallelExample {

private static final Logger logger = LoggerFactory.getLogger(ParallelExample.class);
private static ForkJoinPool s3ThreadPool = new ForkJoinPool(3);

public static List<String> getTestList(){
    int listSize = 10;
    List<String> testList = new ArrayList<>();
    for (int i=0; i<listSize; i++)
        testList.add("item_" + i);
    return testList;
}

public static void executeAsParallelFromList(){
    logger.info("executeAsParallelFromList():");
    List<String> testList = getTestList();
    StreamEx<String> streamOfItems = StreamEx
            .of(testList)
            .parallel(s3ThreadPool);
    logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
    streamOfItems.forEach(item -> handleItem(item));
}

public static void executeAsParallelAfterMap(){
    logger.info("executeAsParallelAfterMap():");
    List<String> testList = getTestList();
    StreamEx<String> streamOfItems = StreamEx
            .of(testList)
            .map(item -> item+"_mapped")
            .parallel(s3ThreadPool);
    logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
    streamOfItems.forEach(item -> handleItem(item));
}

private static void handleItem(String item){
    // do something with the item - just print for now
    logger.info("I'm handling item: {}", item);
}

}

执行这两种方法的单元测试:

代码语言:javascript
复制
public class ParallelExampleTest {

@Test
public void testExecuteAsParallelFromList() {
    ParallelExample.executeAsParallelFromList();
}

@Test
public void testExecuteAsParallelFromStreamEx() {
    ParallelExample.executeAsParallelAfterMap();
}

}

执行结果:

代码语言:javascript
复制
08:49:12.992 [main] INFO  marina.streams.ParallelExample - executeAsParallelFromList():
08:49:13.002 [main] INFO  marina.streams.ParallelExample - streamOfItems.isParallel(): true
08:49:13.040 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_6
08:49:13.040 [ForkJoinPool-1-worker-2] INFO  marina.streams.ParallelExample - I'm handling item: item_2
08:49:13.040 [ForkJoinPool-1-worker-3] INFO  marina.streams.ParallelExample - I'm handling item: item_1
08:49:13.041 [ForkJoinPool-1-worker-2] INFO  marina.streams.ParallelExample - I'm handling item: item_4
08:49:13.041 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_8
08:49:13.041 [ForkJoinPool-1-worker-3] INFO  marina.streams.ParallelExample - I'm handling item: item_0
08:49:13.041 [ForkJoinPool-1-worker-2] INFO  marina.streams.ParallelExample - I'm handling item: item_3
08:49:13.041 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_9
08:49:13.041 [ForkJoinPool-1-worker-3] INFO  marina.streams.ParallelExample - I'm handling item: item_5
08:49:13.041 [ForkJoinPool-1-worker-2] INFO  marina.streams.ParallelExample - I'm handling item: item_7

08:49:13.043 [main] INFO  marina.streams.ParallelExample - executeAsParallelAfterMap():
08:49:13.043 [main] INFO  marina.streams.ParallelExample - streamOfItems.isParallel(): true
08:49:13.044 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_0_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_1_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_2_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_3_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_4_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_5_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_6_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_7_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_8_mapped
08:49:13.044 [ForkJoinPool-1-worker-1] INFO  marina.streams.ParallelExample - I'm handling item: item_9_mapped

如您所见,在执行executeAsParallelFromList()时使用所有三个线程,但在执行executeAsParallelAfterMap()时只使用一个线程。

为什么?

谢谢!

玛丽娜

注意:这个例子是故意简化的--我试着使它尽可能地最小化来演示这个问题。显然,在现实生活中,map()、handleItem()等文件中有更多的内容,而输入的数据更有趣(我试图并行地处理AWS S3存储桶/前缀)。

EN

回答 2

Stack Overflow用户

回答已采纳

发布于 2016-10-21 19:45:14

问题是,一旦调用map(...)方法,StreamEx就会创建具有顺序/并行配置的底层Java8流(即顺序配置),然后调用parallel(...)似乎不会更新底层Java8流。

解决办法取决于你正在努力实现的目标。如果您对您的map(...)操作也能并行运行感到高兴,那么只需将parallel(...)操作向上移动,使其成为继of(...)之后的第一件事。

但是,如果希望在并行操作之前按顺序执行某些操作,那么最好使用两个流。例如,遵循示例代码的样式:

代码语言:javascript
复制
public static void executeAsParallelAfterMapV2() {
    logger.info("executeAsParallelAfterMapV2():");
    List<String> testList = getTestList();
    StreamEx<String> sequentialStream = StreamEx
            .of(testList)
            .map(item -> {
                logger.info("Mapping {}", item);
                return item + "_mapped";
            });
    logger.info("sequentialStream.isParallel(): {}", sequentialStream.isParallel());

    List<String> afterSequentialProcessing = sequentialStream.toList();
    StreamEx<String> streamOfItems = StreamEx.of(afterSequentialProcessing)
            .parallel(s3ThreadPool);
    logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
    streamOfItems.forEach(item -> handleItem(item));
}

这提供了如下内容:

代码语言:javascript
复制
20:43:36.835 [main] INFO scott.streams.ParallelExample - executeAsParallelAfterMapV2():
20:43:36.883 [main] INFO scott.streams.ParallelExample - sequentialStream.isParallel(): false
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_0
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_1
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_2
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_3
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_4
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_5
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_6
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_7
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_8
20:43:36.886 [main] INFO scott.streams.ParallelExample - Mapping item_9
20:43:36.886 [main] INFO scott.streams.ParallelExample - streamOfItems.isParallel(): true
20:43:36.889 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_6_mapped
20:43:36.889 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_2_mapped
20:43:36.890 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_8_mapped
20:43:36.890 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_5_mapped
20:43:36.890 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_4_mapped
20:43:36.890 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_9_mapped
20:43:36.890 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_1_mapped
20:43:36.890 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_3_mapped
20:43:36.890 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_7_mapped
20:43:36.890 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_0_mapped

.

出于兴趣,如果直接创建Java8流(不使用StreamEx),并将parallel()操作置于map(...)下面,那么它将更新(整体)流的类型为并行:

代码语言:javascript
复制
public static void executeAsParallelAfterMapJava8Stream() throws InterruptedException {
    logger.info("executeAsParallelAfterMapJava8Stream():");
    List<String> testList = getTestList();

    s3ThreadPool.submit(() -> {
        Stream<String> streamOfItems = testList.stream()
                .map(item -> {
                    logger.info("Mapping {}", item);
                    return item + "_mapped";
                })
                .parallel();
        logger.info("streamOfItems.isParallel(): {}", streamOfItems.isParallel());
        streamOfItems.forEach(item -> handleItem(item));
    }).join();
}

如果您创建了一个类似的单元测试,那么您将得到以下类似的内容:

代码语言:javascript
复制
20:36:23.469 [main] INFO scott.streams.ParallelExample - executeAsParallelAfterMapJava8Stream():
20:36:23.517 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - streamOfItems.isParallel(): true
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - Mapping item_6
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - Mapping item_2
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - Mapping item_8
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_6_mapped
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_2_mapped
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_8_mapped
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - Mapping item_5
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - Mapping item_4
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - Mapping item_9
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_5_mapped
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_4_mapped
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_9_mapped
20:36:23.520 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - Mapping item_1
20:36:23.520 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - Mapping item_3
20:36:23.520 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - Mapping item_7
20:36:23.521 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_1_mapped
20:36:23.521 [ForkJoinPool-1-worker-2] INFO scott.streams.ParallelExample - I'm handling item: item_3_mapped
20:36:23.521 [ForkJoinPool-1-worker-3] INFO scott.streams.ParallelExample - I'm handling item: item_7_mapped
20:36:23.521 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - Mapping item_0
20:36:23.521 [ForkJoinPool-1-worker-1] INFO scott.streams.ParallelExample - I'm handling item: item_0_mapped
票数 3
EN

Stack Overflow用户

发布于 2016-12-04 05:42:22

简单的回答:这是个bug。我卷宗固定它。测试忽略了这一点,因为测试只检查所有操作是否都在指定的池中执行,但不要检查是否使用不同的池线程(有时,如果并行化不起作用,例如只对一个元素流)。

在0.6.4版本中提供了一个修复程序。在以前的解决问题的版本中,您可能会考虑使用.parallel().parallel(fjp):它应该正确地并行化。

请考虑将StreamEx问题报告给官方的StreamEx 问题跟踪器。这些天我只偶尔访问StackOverflow,因此可能会忽略这里报告的问题。

票数 6
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/40177377

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档