我正在编写一个多线程程序,它实现了生产者/消费者模型。通常,我希望使用一个Producer,它从文件中读取行并将它们放入BlockingQueue中,并让多个Consumers在从BlockingQueue检索行后进行一些处理,并将结果存储在一个新文件中。
请给我一些反馈意见,我应该考虑什么来实现高性能。我花了几周时间阅读关于并发和同步的文章,因为我不想错过任何东西,但是我正在寻找一些外部反馈,特别是:
BlockingQueue实现?我不能使用固定大小的BlockingQueue,因为我们不知道文件有多少行。或者我应该使用它,即使Producer将被锁定?(如果队列已满)f()是生产者用来处理文件行的方法;知道我使用的是BlockingQueue,那么应该同步f()吗?如果是的话,这不会影响我的申请吗?因为其他Consumers将不得不等待锁的释放。这是我的代码:
class Producer implements Runnable {
private String location;
private BlockingQueue<String> blockingQueue;
private float numline=0;
protected transient BufferedReader bufferedReader;
protected transient BufferedWriter bufferedWriter;
public Producer (String location, BlockingQueue<String> blockingQueue) {
this.location=location;
this.blockingQueue=blockingQueue;
try {
bufferedReader = new BufferedReader(new FileReader(location));
// Create the file where the processed lines will be stored
createCluster();
} catch (FileNotFoundException e1) {
e1.printStackTrace();
}
}
@Override
public void run() {
String line=null;
try {
while ((line = bufferedReader.readLine()) != null) {
// Count the read lines
numline++;
blockingQueue.put(line);
}
} catch (IOException e) {
System.out.println("Problem reading the log file!");
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void createCluster () {
try {
String clusterName=location+".csv";
bufferedWriter = new BufferedWriter(new FileWriter(clusterName, true));
bufferedWriter.write("\n");
} catch (IOException e) {
e.printStackTrace();
}
}
}这就是使用者,多个线程将从BlockingQueue获取结果并执行一些处理(f()),然后将结果存储在一个新文件中:
class Consumer implements Runnable {
private String location;
private BlockingQueue<String> blockingQueue;
protected transient BufferedWriter bufferedWriter;
private String clusterName;
public Consumer (String location, BlockingQueue<String> blockingQueue) {
this.blockingQueue=blockingQueue;
this.location=location;
clusterName=location+".csv";
}
@Override
public void run() {
while (true) {
try {
//Retrieve the lines
String line = blockingQueue.take();
// Call result=f(line)
// TO DO
//
//bufferedWriter = new BufferedWriter(new FileWriter(clusterName, true));
//BufferedWriter.write(result+ "\n");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}以及我的主要类中的代码,它使用了1个生产者和3个消费者:
BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);
Producer readingThread = new Producer(location, queue);
new Thread(readingThread).start();
Consumer normalizers = new Consumer(location,queue);
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 1; i <= 3; i++) {
executor.submit(normalizers);
}
System.out.println("Stopped");
executor.shutdown();最后,这个帖子真的把我弄糊涂了。它建议,如果消费者将结果存储在一个文件中,就会减慢这个过程。这可能是个问题,因为我想要性能和速度。
发布于 2018-06-11 11:42:36
首先,使用固定大小的阻塞队列是完全可以的。是的,最终您的生产者可能会等待队列中可用的空间,但这正是这样的情况:如果消费者“足够快”来保持队列基本为空,那么您的生产者就可以全速运行。如果使用者以最大容量运行,但仍不能保持队列,则等待他们,而不是冒内存溢出的风险。
关于速度:这不重要。只要涉及I/O,从CPU的角度来看,这个过程是如此缓慢,因此您将永远不会度量队列实现之间的显著差异。
关于整个过程:正如您所提到的帖子的答复中所提到的,使用多个线程来编写单个文件是个坏主意,而且纯粹是为了找麻烦。因此,如果您的f()函数是CPU密集型的,并且受益于多个处理器,那么应该使用第二个队列将输出发送给单个使用者。
Producer (reads file)
|
[Queue 1]
|
V
multiple consumers (in memory processing)
|
[Queue 2]
|
V
Single Consumer (write output file)该单一使用者应保持文件打开,直到进程完成,以最大限度地受益于I/O缓冲。但是,请注意,在这样的设置中,发出进程结束的信号实际上可能是一个挑战。当输入文件已被完全读取,队列为空并且所有处理单元都处于空闲状态时,可能很难“知道”。
请注意:正如您多次提到性能(这是初学者常见的错误),请搜索“过早优化是万恶之源”(古鲁·库思关于这个问题的话和许多讨论网站)。基本上:让它首先工作,测量,检查你的算法,只是作为最后的手段,引入复杂的东西,如多处理。你真的有表演问题吗?它是否与函数f()中的次优算法有关?(Codereview在这里可能会有所帮助;-)如果没有,请坚持使用单个线程,或者使用并行流处理流。如果没有测量来支持你的观点,就不要为某些想象中的问题做这件事。
https://codereview.stackexchange.com/questions/196233
复制相似问题