首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用生产者/消费者模型处理文件

使用生产者/消费者模型处理文件
EN

Stack Overflow用户
提问于 2018-06-07 15:40:50
回答 1查看 1.8K关注 0票数 3

在最近删除的一篇文章中,我提出了以下问题:

我正在编写一个多线程程序,它实现了生产者/消费者模型。通常,我希望使用一个生产者,它从文件中读取行并将它们放入BlockingQueue中,并让多个使用者在从BlockingQueue检索行后进行一些处理,并将结果存储在一个新文件中。

我希望,如果你能给我一些反馈意见,我应该考虑什么,以实现高性能。我花了几周时间阅读关于并发和同步的文章,因为我不想错过任何东西,但是我正在寻找一些外部反馈。请找到以下要点,我需要有关的信息。

  • 为了更好的性能,我应该使用什么类型的BlockingQueue实现?我不能使用固定大小的BlockingQueue,因为我们不知道文件上有多少行。或者我应该使用它,即使制片人会被锁定?(如果队列已满)
  • If 'f()‘是生产者用来处理文件行的方法。知道我使用的是BlockingQueue,应该同步f()吗?如果是的话,这不会影响我的申请吗?因为其他使用者会等待锁的释放。

我希望我没说错什么。

你建议在问问题之前实现一些东西,所以我删除了这篇文章,并试图实现这个模型。这是我的密码。

我让一个线程从一个文件中读取并放入一个BlockingQueue中的生成器。

代码语言:javascript
复制
class Producer implements Runnable {
    private String location;
    private BlockingQueue<String> blockingQueue;

    private float numline=0;


    protected transient BufferedReader bufferedReader;
    protected transient BufferedWriter bufferedWriter;


    public Producer (String location, BlockingQueue<String> blockingQueue) {
        this.location=location;
        this.blockingQueue=blockingQueue;

        try {
            bufferedReader = new BufferedReader(new FileReader(location));

            // Create the file where the processed lines will be stored
            createCluster();

        } catch (FileNotFoundException e1) {
            e1.printStackTrace();
        }
    }

    @Override
    public void run() {
        String line=null;
        try {
            while ((line = bufferedReader.readLine()) != null) {
                // Count the read lines
                numline++;
                blockingQueue.put(line);
            }
        } catch (IOException e) {
            System.out.println("Problem reading the log file!");
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    public void createCluster () {
        try {
            String clusterName=location+".csv";
            bufferedWriter = new BufferedWriter(new FileWriter(clusterName, true));
            bufferedWriter.write("\n");
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

}

使用者多个线程将从BlockingQueue进行一些处理'f()‘,并将结果存储在一个新文件中。

代码语言:javascript
复制
class Consumer implements Runnable {
    private String location;
    private BlockingQueue<String> blockingQueue;

    protected transient BufferedWriter bufferedWriter;

    private String clusterName;

    public Consumer (String location, BlockingQueue<String> blockingQueue) {
        this.blockingQueue=blockingQueue;
        this.location=location;

        clusterName=location+".csv";
    }

    @Override
    public void run() {
        while (true) {
            try {
                //Retrieve the lines
                String line = blockingQueue.take();
                String result = doNormalize (line);
                // TO DO
                //
                //bufferedWriter = new BufferedWriter(new FileWriter(clusterName, true));
                //BufferedWriter.write(result+ "\n");

            } catch (InterruptedException e) {
                e.printStackTrace();
            } 
        }
    }

//Pattern pattern, Matcher matcher
    private String doNormalize(String line){
        String rules [] = getRules(); // return an array of Regex
        String tmp="";

        for (String rule : rules) {
            Pattern pattern = Pattern.compile(rule);
            Matcher matcher = pattern.matcher(line);

            if (matcher.find()){
                Set<String> namedGroups = getNamedGroupCandidates(rule);
                Iterator<String> itr = namedGroups.iterator();
                while(itr.hasNext()){
                    String value=itr.next();
                    tmp=tmp+matcher.group(value)+", ";
                }


        tmp = tmp + "\t";
                    break;
                }
            }
            return tmp;

        }
private Set<String> getNamedGroupCandidates(String regex) {
            Set<String> namedGroups = new TreeSet<String>();
            Matcher m = Pattern.compile("\\(\\?<([a-zA-Z][a-zA-Z0-9]*)>").matcher(regex);
            while (m.find()) {
                namedGroups.add(m.group(1));
            }
            return namedGroups;
        }
}

以及我主课上的代码。它使用1个生产者和3个消费者

代码语言:javascript
复制
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);

            Producer readingThread = new Producer(location, queue);
            new Thread(readingThread).start();

            Consumer normalizers = new Consumer(location,queue);
            ExecutorService executor = Executors.newFixedThreadPool(3);
            for (int i = 1; i <= 3; i++) {
                executor.submit(normalizers);
            }
            System.out.println("Stopped");
            executor.shutdown();

我知道我的代码是不完整的,因为我需要关闭和刷新读取器并编写等等。但是,你能告诉我到目前为止在实现生产者/消费者模型时所犯的错误吗?在方法f()上,它也是一个处理一行并产生结果的方法,我不认为我应该同步它,因为我希望所有的使用者同时使用。

编辑

最后,这个post真的把我搞糊涂了,它建议如果消费者存储on的结果,它会减慢这个过程。这可能是个问题,因为我想要性能和速度。

击败,

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2018-06-17 12:38:39

对于我的第二个问题:“SingleConsumer要”知道“多个消费者已经消耗/处理了所有行”。我的灵感来自于这个post结合了这样的评论:每个消费者都应该向队列2发送一条“我终止”消息,如果单个输出消费者收到了所有这些,它也可以终止。

因此,对于消费者,下面是我在run()方法中所写的内容:

代码语言:javascript
复制
@Override
public void run() {
// A Consumer keeps taking elements from the queue 1, as long as the Producer is
// producing and as long as queue 1 is not empty.
    while (true) {
        try {

            //Retrieve the lines
            String line = firstBlockingQueue.take(); 
If a special terminating value is found.
            if (line==POISON_PILL) {
// The consumer notifies other consumers and the SignleConsumer that operates on queue 2
// and then terminates.
                firstBlockingQueue.put(POISON_PILL);
                secondBlockingQueue.put(SINGLE_POISIN_PILL);
                return;
            }
            // Put the normalized events on the new Queue
            String result = doNormalize (line);
            if (result!=null) {
                secondBlockingQueue.put(result);
            }

        } catch (InterruptedException e) {
            e.printStackTrace();
        } 
    }
} 

至于SinglerConsumer,它应该计算消费者发送的“我完成了处理”消息,或者我正在使用它作为SINGLE_POISON_PILL。并在该计数器到达队列1中的消费者数量时终止。

代码语言:javascript
复制
while (true) {
    try {
        //Retrieve the lines
        String line = secondBlockingQueue.take();
        if (line==SINGLE_POISIN_PILL) {

            setCounter(getCounter()+1);
            if (getCounter()== threadNumber) {
                System.out.println("All "+getCounter()+" threads have finished.  \n Stopping..");
                return;
            }
        }

        try {
            if (line != SINGLE_POISIN_PILL) {
                System.out.println(line);
                bufferedWriter.write(line+"\n");
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    } catch (InterruptedException e) {
        e.printStackTrace();
    } 
}

对于我的第二个问题,显然我所要做的就是补充:

代码语言:javascript
复制
        if (line==SINGLE_POISIN_PILL) {
            setCounter(getCounter()+1);
            if (getCounter()== threadNumber) {
                System.out.println("All "+getCounter()+" threads have finished.  \n Stopping..");
                try {
         if (bufferedWriter != null) 
         {
             bufferedWriter.flush();
             bufferedWriter.close();
         }
     } catch (IOException e) {
         e.printStackTrace();
     }
                return;
            }
        }

一旦我冲过并关闭了缓冲区,缓冲区就开始写入。

希望得到你的反馈。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50745284

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档