我正在尝试在我的代码中实现队列的使用。重点是我想让它打印出文件中的单词总数,这意味着我需要它在完成后将所有结果加在一起。
目前,我的程序所做的是,我有一个阅读器,它遍历文件,并返回一个包含文件名和其中字数的字符串。然后,我使用main方法为args数组中给定的每个参数运行for循环。每次我们浏览一个新文档来检查有多少个单词时,我们都会让它成为一个新的主题。
public static void main(final String[] args) {
Thread t = null;
if (args.length >= 1) {
String destinationFileName = args[(args.length-1)];
for (int l = 0; l < (args.length); l++) {
final int q = l;
final Thread y = t;
Runnable r = new Runnable() {
public void run() {
String res = readTextFile(args[q]);
System.out.println(res);
}
};
t = new Thread(r);
t.start();
}
} else {
System.err.println("Not enough input files");
}
}那么,我如何创建一个队列,让它们以某种方式相互等待,这样它就不会犯在完全相同的时间添加结果的错误?
发布于 2013-12-08 06:09:45
阻塞队列在这里似乎是不必要的。只需让每个线程将其结果添加到线程安全列表中,该列表可以像这样构造:
final List<String> results =
Collections.synchronizedList(new ArrayList<String>());接下来,您希望等到所有线程都完成后再聚合结果。您可以通过在每个线程上调用join来完成此操作。将每个线程添加到名为threads的列表中,然后在启动所有线程后,调用以下代码:
for(Thread t : threads) {
t.join();
}这段代码将有效地等待每个线程完成后再继续执行。
发布于 2013-12-08 06:14:37
这是一个非常常见的示例,当需要多个线程来处理IO操作时,如从磁盘读取文件,我猜这是为了指导目的,对于现实生活中的示例,请考虑查看map reduce框架,如Hadoop
了解如何使用hadoop here完成类似的任务。
但是,下面是一个伪示例:
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
class ConsumerProducer implements Runnable {
private final BlockingQueue<String> map;
private final BlockingQueue<Map<String, Integer>> reduce;
ConsumerProducer(BlockingQueue<String> map,
BlockingQueue<Map<String, Integer>> reduce) {
this.map = map;
this.reduce = reduce;
}
public void run() {
try {
while (true) {
Map<String, Integer> wordToOccurrences = this.consume(map
.take());
this.produce(wordToOccurrences);
}
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
private void produce(Map<String, Integer> wordToOccurrences)
throws InterruptedException {
reduce.put(wordToOccurrences);
}
public Map<String, Integer> consume(String fileName) {
// read the file and return 'word' -> number of occurrences
return new HashMap<String, Integer>();
}
}
class Setup {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<String> map = new LinkedBlockingQueue<String>();
BlockingQueue<Map<String, Integer>> reduce = new LinkedBlockingQueue<Map<String, Integer>>();
for (String fileName : args) {
map.put(fileName);
// assuming every thread process single file, for other options see
// http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html
ConsumerProducer c = new ConsumerProducer(map, reduce);
new Thread(c).start();
}
for (int i = 0; i < args.length; i++) {
Map<String, Integer> wordToOccurrences = reduce.take();
// start consuming results
}
// print merged map of total results
}
}https://stackoverflow.com/questions/20446788
复制相似问题