我是一个新的并行流,并试图制作一个示例程序,将计算值* 100(1至100),并将其存储在地图。在执行代码时,每次迭代都会得到不同的计数。我可能在某个地方错了,所以请指导我,任何人都知道正确的方法。
码
import java.util.*;
import java.lang.*;
import java.io.*;
import java.util.stream.Collectors;
public class Main{
static int l = 0;
public static void main (String[] args) throws java.lang.Exception {
letsGoParallel();
}
public static int makeSomeMagic(int data) {
l++;
return data * 100;
}
public static void letsGoParallel() {
List<Integer> dataList = new ArrayList<>();
for(int i = 1; i <= 100 ; i++) {
dataList.add(i);
}
Map<Integer, Integer> resultMap = new HashMap<>();
dataList.parallelStream().map(f -> {
Integer xx = 0;
{
xx = makeSomeMagic(f);
}
resultMap.put(f, xx);
return 0;
}).collect(Collectors.toList());
System.out.println("Input Size: " + dataList.size());
System.out.println("Size: " + resultMap.size());
System.out.println("Function Called: " + l);
}
}最后输出
输入大小: 100 尺寸: 100 函数调用: 98
每次运行输出不同。我想在我自己的应用程序中使用并行流,但是由于这种混乱/问题,我不能。在我的应用程序中,我有100到200个唯一的数字,需要对其执行一些相同的操作。简而言之,有处理某事的功能。
发布于 2018-10-16 06:10:06
您对HashMap和l变量的访问都是而不是线程安全的,这就是为什么每次运行时输出是不同的。
正确的方法是将Stream元素收集到Map中。
Map<Integer, Integer> resultMap =
dataList.parallelStream()
.collect(Collectors.toMap (Function.identity (), Main::makeSomeMagic));编辑:l变量仍然以而不是线程安全的方式更新,所以如果变量的最终值对您很重要,那么您必须添加自己的线程安全性。
发布于 2018-10-16 06:16:46
通过在resultMap中放置一些值,您将使用一个副作用
dataList.parallelStream().map(f -> {
Integer xx = 0;
{
xx = makeSomeMagic(f);
}
resultMap.put(f, xx);
return 0;
})API接口指出:
无状态操作,如filter和map,在处理新元素时不保留以前看到的元素的状态--每个元素可以独立于其他元素上的操作进行处理。
正在进行使用
如果流操作的行为参数是有状态的,则流管道结果可能是不确定的或不正确的。有状态lambda (或其他实现适当功能接口的对象)的结果取决于在流管道执行过程中可能发生变化的任何状态。
下面是一个类似于您的例子:
..。如果映射操作是并行执行的,则由于线程调度差异,同一输入的结果可能因运行而异,而对于无状态lambda表达式,结果总是相同的。
这解释了你的观察:每次运行输出都不同。
正确的方法是@Eran的所示
发布于 2018-10-16 06:28:40
希望它能正常工作。通过编写Synchronied函数makeSomeMagic和使用线程安全数据结构ConcurrentHashMap和编写简单语句
dataList.parallelStream().forEach(f -> resultMap.put(f, makeSomeMagic(f)));完整的代码在这里:
import java.util.*;
import java.lang.*;
import java.io.*;
import java.util.stream.Collectors;
public class Main{
static int l = 0;
public static void main (String[] args) throws java.lang.Exception {
letsGoParallel();
}
public synchronized static int makeSomeMagic( int data) { // make it synchonized
l++;
return data * 100;
}
public static void letsGoParallel() {
List<Integer> dataList = new ArrayList<>();
for(int i = 1; i <= 100 ; i++) {
dataList.add(i);
}
Map<Integer, Integer> resultMap = new ConcurrentHashMap<>();// use ConcurrentHashMap
dataList.parallelStream().forEach(f -> resultMap.put(f, makeSomeMagic(f)));
System.out.println("Input Size: " + dataList.size());
System.out.println("Size: " + resultMap.size());
System.out.println("Function Called: " + l);
}
}https://stackoverflow.com/questions/52828813
复制相似问题