我正在练习一点并发性。
public class WordOccurrencesBigFile {
private String words;
private ConcurrentHashMap<String, Pair<String, Integer>> wordOccurrencesMap = new ConcurrentHashMap<>();
public WordOccurrencesBigFile(String wordsLine) {
this.words = wordsLine;
}
public void processWords() {
parseWordsLines();
printOrderAlphabetically();
printOrderByCount();
printByInsertionOrder();
}
private void parseWordsLines() {
String[] wordsLinesArray = words.split("\n");
ExecutorService executor = Executors.newFixedThreadPool(5);
for(String wordsLine: wordsLinesArray) {
executor.execute(() -> parseWords(wordsLine));
}
executor.shutdown();
while (!executor.isTerminated()) {
}
System.out.println("Finished all threads");
}
private void parseWords(String wordsLine) {
System.out.println(Thread.currentThread().getName() + " Start.");
System.out.println(Thread.currentThread().getName() + " Processing line: '" + wordsLine + "'");
String[] wordsArray = wordsLine.split(" ");
synchronized(this){
for (String word : wordsArray) {
Pair<String, Integer> pair = null;
if (!wordOccurrencesMap.containsKey(word)) {
pair = new Pair<>(word, 1);
//System.out.println(Thread.currentThread().getName() + " Creating Pair: " + pair);
} else {
pair = wordOccurrencesMap.get(word);
pair.setValue(pair.getValue() + 1);
//System.out.println(Thread.currentThread().getName() + " Updating Pair: " + pair);
}
wordOccurrencesMap.put(word, pair);
}
}
System.out.println(Thread.currentThread().getName() + " End.");
}
public static void main(String[] args) {
String wordsLines = "bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa\n"+
"bb cc aa ccc bb cc cc aa";
WordOccurrencesBigFile wordOccurrences = new
WordOccurrencesBigFile(wordsLines);
wordOccurrences.processWords();
}}
在parseWordsLines上创建一个由5个线程池组成的ExecutorService,并用字符串实例化WordOccurrencesBigFile类,该字符串由"\n“创建多行。其目的是让每一行由不同的线程处理,并在Map上插入唯一单词的计数。
我原以为通过使用ConcurrentHashMap就足以处理我有多个线程在地图上读写的事实。但是,在我执行类的大部分时间里,我都会得到不同的计数。(奇怪的是,主要是"bb“这个词。
但是添加同步(这个)问题是固定的。
有人能解释一下为什么这个行为是解决这个问题的最好方法吗?我应该把“这个”传递给同步块或者被线程访问的对象吗?
非常感谢。
发布于 2018-08-22 11:09:37
ConcurrentHashMap是线程安全的,它可以确保每个操作都是线程安全的。
但这些操作并不是原子操作:
if (!wordOccurrencesMap.containsKey(word)) {
pair = new Pair<>(word, 1);
//System.out.println(Thread.currentThread().getName() + " Creating Pair: " + pair);
} else {
pair = wordOccurrencesMap.get(word);
pair.setValue(pair.getValue() + 1);
//System.out.println(Thread.currentThread().getName() + " Updating Pair: " + pair);
}
wordOccurrencesMap.put(word, pair);您可以使用单个操作来代替:
wordOccurrencesMap.compute(word,
(s, pair) -> pair == null ?
new Pair<>(word, 1) : pair.setValue(pair.getValue() + 1));发布于 2018-08-22 11:19:27
添加synchronized(this)可以解决这个问题,但是您将失去多线程和并行化的所有好处。
您需要的是ConcurrentMap的ConcurrentMap方法。因此,for循环的主体将转换为
Pair<String, Integer> pair = wordOccurrencesMap.computeIfAbsent(word, w -> new Pair<>(w, 0));
synchronized(pair) {
pair.setValue(pair.getValue()+1);
}现在可以省略synchronized(this)块了。
编辑:但是您必须确保当第一个线程调用pair.setValue()时,其他线程不能调用pair.setValue(),正如注释所指出的那样。
发布于 2018-08-22 11:11:28
正如@Thomas在注释中提到的,反增量不是原子的,这意味着,
if (!wordOccurrencesMap.containsKey(word)) {
------>// two threads can enter in this block and results will be different.
pair = new Pair<>(word, 1);
//System.out.println(Thread.currentThread().getName() + " Creating Pair: " + pair);
}请查看this post以获得更多信息,因为它详细解释了相同的问题。
https://stackoverflow.com/questions/51965383
复制相似问题