首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >当JSONStream.parsed()数据通过es.map()和JSONStream.stringify()传输到文件流时,节点堆耗尽

当JSONStream.parsed()数据通过es.map()和JSONStream.stringify()传输到文件流时,节点堆耗尽
EN

Stack Overflow用户
提问于 2016-08-17 01:17:17
回答 1查看 1K关注 0票数 2

我正在尝试通过JSONStream.parse()传输一个输入流(从一个巨大的JSONStream.parse文件中创建),以便将流分解成对象,然后通过event-Stre.map()来转换对象,然后通过JSONStream.stringify()创建一个字符串,最后再转换为一个可写的输出流。当进程运行时,我可以看到节点的内存占用继续增长,直到它最终耗尽堆。下面是重新创建问题的最简单脚本(test.js):

代码语言:javascript
复制
const fs = require("fs")
const es = require("event-stream")
const js = require("JSONStream")

out = fs.createWriteStream("/dev/null")
process.stdin
    .pipe(js.parse("features.*"))
    .pipe(es.map( function(data, cb) { 
        cb(null, data);
        return;
    } ))
    .pipe(js.stringify("{\n\"type\": \"FeatureCollection\", \"features\": [\n\t", ",\n\t", "\n]\n}"))
    .pipe(out)

一个小型bash脚本(barf.sh)将源源不断的JSON流注入节点的process.stdin,它将导致节点的堆逐渐增长:

代码语言:javascript
复制
#!/bin/bash

echo '{"type":"FeatureCollection","features":['
while :
do
    echo '{"type":"Feature","properties":{"name":"A Street"}, "geometry":{"type":"LineString"} },'
done

通过这样运行它:

代码语言:javascript
复制
barf.sh | node test.js

有几种奇怪的方法可以避开这个问题:

  • 删除fs.createWriteStream()并将最后一个管道阶段从".pipe(out)“更改为".pipe(process.stdout)”,然后将管道节点的stdout更改为/dev/null
  • 将异步es.map()更改为同步es.mapSync()

前两个操作中的任何一个都将允许脚本永久运行,节点的内存占用量低且不变。我使用的是节点v6.3.1、事件流v3.3.4和JSONStream 1.1.4,运行Ubuntu16.04的8台核心计算机上有8GB内存。

我希望有人能帮我纠正我认为是一个明显的错误。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2016-08-18 01:39:13

JSONStream不是streams2流,因此它不支持背压控制。(有一个关于streams2 这里的简要总结。)

这意味着数据将在parse事件中从data流中流出,并且不管消费流是否已经为它们准备好,流都会不断地将它们抽出来。如果在读和写东西的速度之间有什么不一致之处,就会有缓冲--这就是你所看到的。

您的barf.sh线束可以看到通过stdin注入的特性。如果您正在读取大量文件,则应该能够通过暂停该文件的读取流来管理该流。因此,如果要将一些pause/resume逻辑插入到map回调中,您应该能够让它处理一个大型文件;它只需稍长一点时间。我会做这样的实验:

代码语言:javascript
复制
let in = fs.createReadStream("/some/massive/file");
let out = fs.createWriteStream("/dev/null");
in
    .pipe(js.parse("features.*"))
    .pipe(es.map(function(data, cb) {
        // This is just an example; a 10-millisecond wait per feature would be very slow.
        if (!in.isPaused()) {
            in.pause();
            global.setTimeout(function () { in.resume(); }, 10);
        }
        cb(null, data);
        return;
    }))
    .pipe(js.stringify("{\n\"type\": \"FeatureCollection\", \"features\": [\n\t", ",\n\t", "\n]\n}"))
    .pipe(out);

顺便说一句,在我的计算机上使用mapSync几乎没有什么区别(这是旧的和慢的)。但是,除非在map中有一些异步操作要执行,否则我会使用mapSync

票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/38986744

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档