我想读取一个大文件,处理每一行并将结果插入数据库。我的目标是并行处理行,因为每个进程都是一个长时间的任务。因此,我希望一个线程继续读取,多个线程继续处理,一个线程继续在块中插入到db。
我把它分解如下:
1)逐行读取文件(容易)
2)将每一行发送到线程池(3个线程),因为处理是一个长期运行的任务。在线程池繁忙时阻止进一步的行读取。
3)将每个处理过的行从每个头池写入StringBuffer
4)监视缓冲区大小,并将结果以块形式写入数据库(例如每1000个条目)。
ExecutorService executor = Executors.newFixedThreadPool(3);
StringBuffer sb = new StringBuffer();
String line;
AtomicInteger count = new AtomicInteger(0);
while ((line = reader.read()) != null) {
count.getAndIncrement();
Future<String> future = executor.submit(() -> {
return processor.process(line);
});
//PROBLEM: this blocks until the future returns
sb.append(future.get());
if (count.get() == 100) {
bufferChunk = sb;
count = new AtomicInteger(0);
sb = new StringBuffer();
databaseService.batchInsert(bufferChunk.toString());
}
}问题:
future.get()将始终阻塞读取器,直到将来返回结果为止。可能我做得不对。但我怎样才能做到这一点?
Sidenote:文件大小约为10 to,因此我无法首先将整个文件读入内存以准备并行任务。
发布于 2018-05-16 15:40:58
我发现下面的解决方案很优雅。它只是众多可能的其中之一,但在概念上很简单,而且
我只将实际的测试方法放在这里,并在专用GitHub回购中提供完整的测试设置和辅助数据结构。
private final AtomicInteger count = new AtomicInteger();
private final Consumer<String> processor = (value) -> {
count.incrementAndGet();
};
@Test
public void onlyReadWhenExecutorAvailable() throws Exception {
Executor executor = Executors.newCachedThreadPool();
CompletableFuture<Void> done = CompletableFuture.completedFuture(null);
for (Semaphore semaphore = new Semaphore(CONCURRENCY_LEVEL); ; ) {
String value = reader.read();
if (value == null) {
break;
}
semaphore.acquire();
CompletableFuture<Void> future = CompletableFuture.completedFuture(value)
.thenAcceptAsync(v -> {
processor.accept(v);
semaphore.release();
}, executor);
done = done.thenCompose($ -> future);
}
done.get();
assertEquals(ENTRIES, count.get());
}发布于 2018-05-16 15:09:43
发布于 2018-05-17 11:42:11
经过更深入的研究,我发现在这个答案中给出的BlockingExecutor最接近我想要达到的目标:
https://stackoverflow.com/a/43109689/1194415
它基本上是extends ThreadPoolExecutor与一个Semaphore锁相结合的。
https://stackoverflow.com/questions/50374271
复制相似问题