在最近删除的一篇文章中,我提出了以下问题:
我正在编写一个多线程程序,它实现了生产者/消费者模型。通常,我希望使用一个生产者,它从文件中读取行并将它们放入BlockingQueue中,并让多个使用者在从BlockingQueue检索行后进行一些处理,并将结果存储在一个新文件中。
我希望,如果你能给我一些反馈意见,我应该考虑什么,以实现高性能。我花了几周时间阅读关于并发和同步的文章,因为我不想错过任何东西,但是我正在寻找一些外部反馈。请找到以下要点,我需要有关的信息。
我希望我没说错什么。
你建议在问问题之前实现一些东西,所以我删除了这篇文章,并试图实现这个模型。这是我的密码。
我让一个线程从一个文件中读取并放入一个BlockingQueue中的生成器。
class Producer implements Runnable {
private String location;
private BlockingQueue<String> blockingQueue;
private float numline=0;
protected transient BufferedReader bufferedReader;
protected transient BufferedWriter bufferedWriter;
public Producer (String location, BlockingQueue<String> blockingQueue) {
this.location=location;
this.blockingQueue=blockingQueue;
try {
bufferedReader = new BufferedReader(new FileReader(location));
// Create the file where the processed lines will be stored
createCluster();
} catch (FileNotFoundException e1) {
e1.printStackTrace();
}
}
@Override
public void run() {
String line=null;
try {
while ((line = bufferedReader.readLine()) != null) {
// Count the read lines
numline++;
blockingQueue.put(line);
}
} catch (IOException e) {
System.out.println("Problem reading the log file!");
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void createCluster () {
try {
String clusterName=location+".csv";
bufferedWriter = new BufferedWriter(new FileWriter(clusterName, true));
bufferedWriter.write("\n");
} catch (IOException e) {
e.printStackTrace();
}
}
}使用者多个线程将从BlockingQueue进行一些处理'f()‘,并将结果存储在一个新文件中。
class Consumer implements Runnable {
private String location;
private BlockingQueue<String> blockingQueue;
protected transient BufferedWriter bufferedWriter;
private String clusterName;
public Consumer (String location, BlockingQueue<String> blockingQueue) {
this.blockingQueue=blockingQueue;
this.location=location;
clusterName=location+".csv";
}
@Override
public void run() {
while (true) {
try {
//Retrieve the lines
String line = blockingQueue.take();
String result = doNormalize (line);
// TO DO
//
//bufferedWriter = new BufferedWriter(new FileWriter(clusterName, true));
//BufferedWriter.write(result+ "\n");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//Pattern pattern, Matcher matcher
private String doNormalize(String line){
String rules [] = getRules(); // return an array of Regex
String tmp="";
for (String rule : rules) {
Pattern pattern = Pattern.compile(rule);
Matcher matcher = pattern.matcher(line);
if (matcher.find()){
Set<String> namedGroups = getNamedGroupCandidates(rule);
Iterator<String> itr = namedGroups.iterator();
while(itr.hasNext()){
String value=itr.next();
tmp=tmp+matcher.group(value)+", ";
}
tmp = tmp + "\t";
break;
}
}
return tmp;
}
private Set<String> getNamedGroupCandidates(String regex) {
Set<String> namedGroups = new TreeSet<String>();
Matcher m = Pattern.compile("\\(\\?<([a-zA-Z][a-zA-Z0-9]*)>").matcher(regex);
while (m.find()) {
namedGroups.add(m.group(1));
}
return namedGroups;
}
}以及我主课上的代码。它使用1个生产者和3个消费者
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
Producer readingThread = new Producer(location, queue);
new Thread(readingThread).start();
Consumer normalizers = new Consumer(location,queue);
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 1; i <= 3; i++) {
executor.submit(normalizers);
}
System.out.println("Stopped");
executor.shutdown();我知道我的代码是不完整的,因为我需要关闭和刷新读取器并编写等等。但是,你能告诉我到目前为止在实现生产者/消费者模型时所犯的错误吗?在方法f()上,它也是一个处理一行并产生结果的方法,我不认为我应该同步它,因为我希望所有的使用者同时使用。
编辑
最后,这个post真的把我搞糊涂了,它建议如果消费者存储on的结果,它会减慢这个过程。这可能是个问题,因为我想要性能和速度。
击败,
发布于 2018-06-17 12:38:39
对于我的第二个问题:“SingleConsumer要”知道“多个消费者已经消耗/处理了所有行”。我的灵感来自于这个post结合了这样的评论:每个消费者都应该向队列2发送一条“我终止”消息,如果单个输出消费者收到了所有这些,它也可以终止。
因此,对于消费者,下面是我在run()方法中所写的内容:
@Override
public void run() {
// A Consumer keeps taking elements from the queue 1, as long as the Producer is
// producing and as long as queue 1 is not empty.
while (true) {
try {
//Retrieve the lines
String line = firstBlockingQueue.take();
If a special terminating value is found.
if (line==POISON_PILL) {
// The consumer notifies other consumers and the SignleConsumer that operates on queue 2
// and then terminates.
firstBlockingQueue.put(POISON_PILL);
secondBlockingQueue.put(SINGLE_POISIN_PILL);
return;
}
// Put the normalized events on the new Queue
String result = doNormalize (line);
if (result!=null) {
secondBlockingQueue.put(result);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
} 至于SinglerConsumer,它应该计算消费者发送的“我完成了处理”消息,或者我正在使用它作为SINGLE_POISON_PILL。并在该计数器到达队列1中的消费者数量时终止。
while (true) {
try {
//Retrieve the lines
String line = secondBlockingQueue.take();
if (line==SINGLE_POISIN_PILL) {
setCounter(getCounter()+1);
if (getCounter()== threadNumber) {
System.out.println("All "+getCounter()+" threads have finished. \n Stopping..");
return;
}
}
try {
if (line != SINGLE_POISIN_PILL) {
System.out.println(line);
bufferedWriter.write(line+"\n");
}
} catch (IOException e) {
e.printStackTrace();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}对于我的第二个问题,显然我所要做的就是补充:
if (line==SINGLE_POISIN_PILL) {
setCounter(getCounter()+1);
if (getCounter()== threadNumber) {
System.out.println("All "+getCounter()+" threads have finished. \n Stopping..");
try {
if (bufferedWriter != null)
{
bufferedWriter.flush();
bufferedWriter.close();
}
} catch (IOException e) {
e.printStackTrace();
}
return;
}
}一旦我冲过并关闭了缓冲区,缓冲区就开始写入。
希望得到你的反馈。
https://stackoverflow.com/questions/50745284
复制相似问题