首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用BlockingQueue的

使用BlockingQueue的
EN

Stack Overflow用户
提问于 2013-12-08 05:20:05
回答 2查看 420关注 0票数 0

我正在尝试在我的代码中实现队列的使用。重点是我想让它打印出文件中的单词总数,这意味着我需要它在完成后将所有结果加在一起。

目前,我的程序所做的是,我有一个阅读器,它遍历文件,并返回一个包含文件名和其中字数的字符串。然后,我使用main方法为args数组中给定的每个参数运行for循环。每次我们浏览一个新文档来检查有多少个单词时,我们都会让它成为一个新的主题。

代码语言:javascript
复制
public static void main(final String[] args) {
    Thread t = null;
    if (args.length >= 1) {
        String destinationFileName = args[(args.length-1)];
            for (int l = 0; l < (args.length); l++) {
                final int q = l;
                final Thread y = t;
                Runnable r = new Runnable() {
                    public void run() {
                        String res = readTextFile(args[q]);
                        System.out.println(res);
                    }
                };
                t = new Thread(r);
                t.start();
            }
    } else {
        System.err.println("Not enough input files");
    }
}

那么,我如何创建一个队列,让它们以某种方式相互等待,这样它就不会犯在完全相同的时间添加结果的错误?

EN

回答 2

Stack Overflow用户

发布于 2013-12-08 06:09:45

阻塞队列在这里似乎是不必要的。只需让每个线程将其结果添加到线程安全列表中,该列表可以像这样构造:

代码语言:javascript
复制
final List<String> results = 
        Collections.synchronizedList(new ArrayList<String>());

接下来,您希望等到所有线程都完成后再聚合结果。您可以通过在每个线程上调用join来完成此操作。将每个线程添加到名为threads的列表中,然后在启动所有线程后,调用以下代码:

代码语言:javascript
复制
for(Thread t : threads) {
    t.join();
}

这段代码将有效地等待每个线程完成后再继续执行。

票数 0
EN

Stack Overflow用户

发布于 2013-12-08 06:14:37

这是一个非常常见的示例,当需要多个线程来处理IO操作时,如从磁盘读取文件,我猜这是为了指导目的,对于现实生活中的示例,请考虑查看map reduce框架,如Hadoop

了解如何使用hadoop here完成类似的任务。

但是,下面是一个伪示例:

代码语言:javascript
复制
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

class ConsumerProducer implements Runnable {
    private final BlockingQueue<String> map;
    private final BlockingQueue<Map<String, Integer>> reduce;

    ConsumerProducer(BlockingQueue<String> map,
            BlockingQueue<Map<String, Integer>> reduce) {
        this.map = map;
        this.reduce = reduce;
    }

    public void run() {
        try {
            while (true) {
                Map<String, Integer> wordToOccurrences = this.consume(map
                        .take());
                this.produce(wordToOccurrences);
            }
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    private void produce(Map<String, Integer> wordToOccurrences)
            throws InterruptedException {
        reduce.put(wordToOccurrences);
    }

    public Map<String, Integer> consume(String fileName) {
        // read the file and return 'word' -> number of occurrences
        return new HashMap<String, Integer>();
    }
}

class Setup {

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<String> map = new LinkedBlockingQueue<String>();
        BlockingQueue<Map<String, Integer>> reduce = new LinkedBlockingQueue<Map<String, Integer>>();

        for (String fileName : args) {
            map.put(fileName);
            // assuming every thread process single file, for other options see
            // http://docs.oracle.com/javase/tutorial/essential/concurrency/pools.html
            ConsumerProducer c = new ConsumerProducer(map, reduce);
            new Thread(c).start();
        }

        for (int i = 0; i < args.length; i++) {
            Map<String, Integer> wordToOccurrences = reduce.take();
            // start consuming results
        }

        // print merged map of total results
    }
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/20446788

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档