首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何并行处理文件行?

如何并行处理文件行?
EN

Stack Overflow用户
提问于 2018-05-16 15:02:09
回答 3查看 2.2K关注 0票数 8

我想读取一个大文件,处理每一行并将结果插入数据库。我的目标是并行处理行,因为每个进程都是一个长时间的任务。因此,我希望一个线程继续读取,多个线程继续处理,一个线程继续在块中插入到db。

我把它分解如下:

1)逐行读取文件(容易)

2)将每一行发送到线程池(3个线程),因为处理是一个长期运行的任务。在线程池繁忙时阻止进一步的行读取。

3)将每个处理过的行从每个头池写入StringBuffer

4)监视缓冲区大小,并将结果以块形式写入数据库(例如每1000个条目)。

代码语言:javascript
复制
ExecutorService executor = Executors.newFixedThreadPool(3);

StringBuffer sb = new StringBuffer();

String line;
AtomicInteger count = new AtomicInteger(0);
while ((line = reader.read()) != null) {
    count.getAndIncrement();
    Future<String> future = executor.submit(() -> {
        return processor.process(line);
    });

    //PROBLEM: this blocks until the future returns
    sb.append(future.get());

    if (count.get() == 100) {
        bufferChunk = sb;
        count = new AtomicInteger(0);
        sb = new StringBuffer();

        databaseService.batchInsert(bufferChunk.toString());
    }
}

问题:

  • future.get()将始终阻塞读取器,直到将来返回结果为止。
  • 缓冲区“监视”可能做得不对。

可能我做得不对。但我怎样才能做到这一点?

Sidenote:文件大小约为10 to,因此我无法首先将整个文件读入内存以准备并行任务。

EN

回答 3

Stack Overflow用户

发布于 2018-05-16 15:40:58

我发现下面的解决方案很优雅。它只是众多可能的其中之一,但在概念上很简单,而且

  • 它抑制了阅读,
  • 只积累在最后准备报告的最小状态量。
  • 不需要显式处理线程。

我只将实际的测试方法放在这里,并在专用GitHub回购中提供完整的测试设置和辅助数据结构。

代码语言:javascript
复制
private final AtomicInteger count = new AtomicInteger();

private final Consumer<String> processor = (value) -> {
    count.incrementAndGet();
};

@Test
public void onlyReadWhenExecutorAvailable() throws Exception {

    Executor executor = Executors.newCachedThreadPool();

    CompletableFuture<Void> done = CompletableFuture.completedFuture(null);
    for (Semaphore semaphore = new Semaphore(CONCURRENCY_LEVEL); ; ) {
        String value = reader.read();
        if (value == null) {
            break;
        }

        semaphore.acquire();

        CompletableFuture<Void> future = CompletableFuture.completedFuture(value)
            .thenAcceptAsync(v -> {
                processor.accept(v);
                semaphore.release();
            }, executor);

        done = done.thenCompose($ -> future);
    }
    done.get();

    assertEquals(ENTRIES, count.get());
}
票数 2
EN

Stack Overflow用户

发布于 2018-05-16 15:09:43

  1. 读取文件大小。(File.length()方法)并将其拆分为线程数。
  2. 使用RandomAccessFile搜索在@1. https://docs.oracle.com/javase/7/docs/api/java/io/RandomAccessFile.html中找到的you索引之前的任何新行字符。
  3. 向每个线程发送新的索引/偏移集+ RandomAccessFile,并对每个线程进行读访问。
  4. 子类InputStream,在RandomAccessFile之上创建一个新的InputStream并开始读取。
票数 0
EN

Stack Overflow用户

发布于 2018-05-17 11:42:11

经过更深入的研究,我发现在这个答案中给出的BlockingExecutor最接近我想要达到的目标:

https://stackoverflow.com/a/43109689/1194415

它基本上是extends ThreadPoolExecutor与一个Semaphore锁相结合的。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50374271

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档