首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用生产者/消费者模型处理文件

使用生产者/消费者模型处理文件
EN

Code Review用户
提问于 2018-06-10 14:33:22
回答 1查看 1.9K关注 0票数 4

我正在编写一个多线程程序,它实现了生产者/消费者模型。通常,我希望使用一个Producer,它从文件中读取行并将它们放入BlockingQueue中,并让多个Consumers在从BlockingQueue检索行后进行一些处理,并将结果存储在一个新文件中。

请给我一些反馈意见,我应该考虑什么来实现高性能。我花了几周时间阅读关于并发和同步的文章,因为我不想错过任何东西,但是我正在寻找一些外部反馈,特别是:

  • 为了更好的性能,我应该使用什么类型的BlockingQueue实现?我不能使用固定大小的BlockingQueue,因为我们不知道文件有多少行。或者我应该使用它,即使Producer将被锁定?(如果队列已满)
  • 如果f()是生产者用来处理文件行的方法;知道我使用的是BlockingQueue,那么应该同步f()吗?如果是的话,这不会影响我的申请吗?因为其他Consumers将不得不等待锁的释放。

这是我的代码:

代码语言:javascript
复制
class Producer implements Runnable {
    private String location;
    private BlockingQueue<String> blockingQueue;

    private float numline=0;


    protected transient BufferedReader bufferedReader;
    protected transient BufferedWriter bufferedWriter;


    public Producer (String location, BlockingQueue<String> blockingQueue) {
        this.location=location;
        this.blockingQueue=blockingQueue;

        try {
            bufferedReader = new BufferedReader(new FileReader(location));

            // Create the file where the processed lines will be stored
            createCluster();

        } catch (FileNotFoundException e1) {
            e1.printStackTrace();
        }
    }

    @Override
    public void run() {
        String line=null;
        try {
            while ((line = bufferedReader.readLine()) != null) {
                // Count the read lines
                numline++;
                blockingQueue.put(line);
            }
        } catch (IOException e) {
            System.out.println("Problem reading the log file!");
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    public void createCluster () {
        try {
            String clusterName=location+".csv";
            bufferedWriter = new BufferedWriter(new FileWriter(clusterName, true));
            bufferedWriter.write("\n");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

这就是使用者,多个线程将从BlockingQueue获取结果并执行一些处理(f()),然后将结果存储在一个新文件中:

代码语言:javascript
复制
class Consumer implements Runnable {
    private String location;
    private BlockingQueue<String> blockingQueue;

    protected transient BufferedWriter bufferedWriter;

    private String clusterName;

    public Consumer (String location, BlockingQueue<String> blockingQueue) {
        this.blockingQueue=blockingQueue;
        this.location=location;

        clusterName=location+".csv";
    }

    @Override
    public void run() {
        while (true) {
            try {
                //Retrieve the lines
                String line = blockingQueue.take();
                // Call result=f(line)
                // TO DO
                //
                //bufferedWriter = new BufferedWriter(new FileWriter(clusterName, true));
                //BufferedWriter.write(result+ "\n");

            } catch (InterruptedException e) {
                e.printStackTrace();
            } 
        }
    }
}

以及我的主要类中的代码,它使用了1个生产者和3个消费者:

代码语言:javascript
复制
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);

            Producer readingThread = new Producer(location, queue);
            new Thread(readingThread).start();

            Consumer normalizers = new Consumer(location,queue);
            ExecutorService executor = Executors.newFixedThreadPool(3);
            for (int i = 1; i <= 3; i++) {
                executor.submit(normalizers);
            }
            System.out.println("Stopped");
            executor.shutdown();

最后,这个帖子真的把我弄糊涂了。它建议,如果消费者将结果存储在一个文件中,就会减慢这个过程。这可能是个问题,因为我想要性能和速度。

EN

回答 1

Code Review用户

回答已采纳

发布于 2018-06-11 11:42:36

首先,使用固定大小的阻塞队列是完全可以的。是的,最终您的生产者可能会等待队列中可用的空间,但这正是这样的情况:如果消费者“足够快”来保持队列基本为空,那么您的生产者就可以全速运行。如果使用者以最大容量运行,但仍不能保持队列,则等待他们,而不是冒内存溢出的风险。

关于速度:这不重要。只要涉及I/O,从CPU的角度来看,这个过程是如此缓慢,因此您将永远不会度量队列实现之间的显著差异。

关于整个过程:正如您所提到的帖子的答复中所提到的,使用多个线程来编写单个文件是个坏主意,而且纯粹是为了找麻烦。因此,如果您的f()函数是CPU密集型的,并且受益于多个处理器,那么应该使用第二个队列将输出发送给单个使用者。

代码语言:javascript
复制
           Producer (reads file)
                     |
                  [Queue 1]
                     |
                     V
   multiple consumers (in memory processing)
                     |
                  [Queue 2]
                     |
                     V
     Single Consumer (write output file)

该单一使用者应保持文件打开,直到进程完成,以最大限度地受益于I/O缓冲。但是,请注意,在这样的设置中,发出进程结束的信号实际上可能是一个挑战。当输入文件已被完全读取,队列为空并且所有处理单元都处于空闲状态时,可能很难“知道”。

请注意:正如您多次提到性能(这是初学者常见的错误),请搜索“过早优化是万恶之源”(古鲁·库思关于这个问题的话和许多讨论网站)。基本上:让它首先工作,测量,检查你的算法,只是作为最后的手段,引入复杂的东西,如多处理。你真的有表演问题吗?它是否与函数f()中的次优算法有关?(Codereview在这里可能会有所帮助;-)如果没有,请坚持使用单个线程,或者使用并行流处理流。如果没有测量来支持你的观点,就不要为某些想象中的问题做这件事。

票数 2
EN
页面原文内容由Code Review提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://codereview.stackexchange.com/questions/196233

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档