首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用队列馈送无限流?

如何使用队列馈送无限流?
EN

Stack Overflow用户
提问于 2015-04-23 05:46:16
回答 2查看 1.3K关注 0票数 3

我想要几个能读取文件的threads。这些文件是包含多个文本文件本身的ZIP文件。因此,每个文件都必须逐行读取。

文件的任何内容都应该发送到某种类型的queue。队列本身应该从工作线程无限地处理。

如果可能的话,如何实现这样的场景呢?我想出了一些伪代码,但我真的不知道如何实现:

代码语言:javascript
复制
Queue<String> queue;

//multiple threads:
BufferedReader br;
queue.add(br.readLine());

//processing thread for the queue:
queue.stream().parallel().forEach(line -> convertAndWrite(line));

//worker function:
private void convertAndWrite(String line) {
    //convert the line to an output format,
    //and write each line eg to an output file or perist in DB, whatever
}
EN

回答 2

Stack Overflow用户

发布于 2016-09-06 03:59:29

让我们看看How to interconect non-paralel stream with parallel stream(one producer multiple consumers)的答案。对于此问题,使用无法并行化的流填充阻塞队列。实现了一个可并行化的拆分器来排空此队列。如果您希望文件是连续的,那么您可能有一个正在填充队列的读取器。

然后使用StreamSupport从拆分器创建一个流。阻塞队列支持并发修改,因此拆分器实现可以并行化,从而可以并行化您的流。如果你的下游编写器是可并行化的,那么你的整个消费者端就可以并行化。

如果您的阅读器遇到异常,那么您可以将一个流结束标记(可能是作为最后一个子句的一部分)推入BlockingQueue并重新抛出。由于只有一个tryAdvance调用方(请参阅AbstractSpliterator),因此一个流结束标记就足以终止所有并行流。

票数 1
EN

Stack Overflow用户

发布于 2016-09-07 00:02:13

要实现监视文件夹中的新文件外观的任务,我将使用Java,如this article中所示

通过WatchService注册文件夹更新

代码语言:javascript
复制
Path path = Paths.get(".");
WatchService watchService =  path.getFileSystem().newWatchService();
path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE);

获取文件夹更新并通过流进行处理:

代码语言:javascript
复制
WatchKey watchKey = null;
while (true) {
    watchKey = watchService.poll(10, TimeUnit.MINUTES);
    if(watchKey != null) {
        watchKey.pollEvents().stream().forEach(event -> System.out.println(event.context()));
    }
    watchKey.reset();
}

在队伍中

代码语言:javascript
复制
watchKey.pollEvents().stream().forEach(event -> System.out.println(event.context()));

我想你可以使用并行流,这里的event.context()是一个新创建的文件的Path实例。因此,您可以通过其他流操作(如map等)继续处理其内容。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/29809470

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档