我想要几个能读取文件的threads。这些文件是包含多个文本文件本身的ZIP文件。因此,每个文件都必须逐行读取。
文件的任何内容都应该发送到某种类型的queue。队列本身应该从工作线程无限地处理。
如果可能的话,如何实现这样的场景呢?我想出了一些伪代码,但我真的不知道如何实现:
Queue<String> queue;
//multiple threads:
BufferedReader br;
queue.add(br.readLine());
//processing thread for the queue:
queue.stream().parallel().forEach(line -> convertAndWrite(line));
//worker function:
private void convertAndWrite(String line) {
//convert the line to an output format,
//and write each line eg to an output file or perist in DB, whatever
}发布于 2016-09-06 03:59:29
让我们看看How to interconect non-paralel stream with parallel stream(one producer multiple consumers)的答案。对于此问题,使用无法并行化的流填充阻塞队列。实现了一个可并行化的拆分器来排空此队列。如果您希望文件是连续的,那么您可能有一个正在填充队列的读取器。
然后使用StreamSupport从拆分器创建一个流。阻塞队列支持并发修改,因此拆分器实现可以并行化,从而可以并行化您的流。如果你的下游编写器是可并行化的,那么你的整个消费者端就可以并行化。
如果您的阅读器遇到异常,那么您可以将一个流结束标记(可能是作为最后一个子句的一部分)推入BlockingQueue并重新抛出。由于只有一个tryAdvance调用方(请参阅AbstractSpliterator),因此一个流结束标记就足以终止所有并行流。
发布于 2016-09-07 00:02:13
要实现监视文件夹中的新文件外观的任务,我将使用Java,如this article中所示
通过WatchService注册文件夹更新
Path path = Paths.get(".");
WatchService watchService = path.getFileSystem().newWatchService();
path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);获取文件夹更新并通过流进行处理:
WatchKey watchKey = null;
while (true) {
watchKey = watchService.poll(10, TimeUnit.MINUTES);
if(watchKey != null) {
watchKey.pollEvents().stream().forEach(event -> System.out.println(event.context()));
}
watchKey.reset();
}在队伍中
watchKey.pollEvents().stream().forEach(event -> System.out.println(event.context()));我想你可以使用并行流,这里的event.context()是一个新创建的文件的Path实例。因此,您可以通过其他流操作(如map等)继续处理其内容。
https://stackoverflow.com/questions/29809470
复制相似问题