首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >用于CSV解析的Highland.js

用于CSV解析的Highland.js
EN

Stack Overflow用户
提问于 2015-04-01 23:52:46
回答 1查看 1.6K关注 0票数 3

我正在尝试写一种非常实用的方式。我们正在使用Highland.js来管理流处理,然而,因为我是如此新手,我想我真的对如何处理这种独特的情况感到困惑。

这里的问题是文件流中的所有数据都不一致。文件中的第一行通常是头文件,我们希望将其存储到内存中,然后压缩流中的所有行。

这是我的第一个尝试:

代码语言:javascript
复制
var _      = require('highland');
var fs     = require('fs');
var stream = fs.createReadStream('./data/gigfile.txt');
var output = fs.createWriteStream('output.txt');

var headers = [];

var through = _.pipeline(
    _.split(),
    _.head(),
    _.doto(function(col) {
        headers = col.split(',');
        return headers;
    }),

    ......

    _.splitBy(','),
    _.zip(headers),
    _.wrapCallback(process)
);

_(stream)
    .pipe(through)
    .pipe(output);

管道中的第一个命令是按行拆分文件。下一个抓取头部,doto将其声明为全局变量。问题是流中接下来的几行不存在,所以进程是blocked...likely的,因为它上面有head()命令。

我已经尝试了其他一些变体,但我觉得这个例子可以让你感觉到我需要用它去做什么。

任何关于这方面的指导都将是有帮助的--它还提出了一个问题,如果我的每一行都有不同的值,我如何在许多不同长度/复杂性的流操作之间拆分进程流。

谢谢。

编辑:我产生了一个更好的结果,但我质疑它的效率--有没有办法优化它,这样在每次运行时我都不会检查标题是否被记录下来了?这仍然让人感觉草率。

代码语言:javascript
复制
var through = _.pipeline(
    _.split(),
    _.filter(function(row) {
        // Filter out bogus values
        if (! row || headers) {
            return true;
        }
        headers = row.split(',');
        return false;
    }),
    _.map(function(row) {
        return row.split(',')
    }),
    _.batch(500),
    _.compact(),
    _.map(function(row) {
        return JSON.stringify(row) + "\n";
    })
);

_(stream)
    .pipe(through)
EN

回答 1

Stack Overflow用户

发布于 2015-07-01 23:46:52

您可以使用Stream.observe()Stream.fork()来拆分流。

代码语言:javascript
复制
var _      = require('highland');
var fs     = require('fs');
var stream = fs.createReadStream('./data/gigfile.txt');
var output = fs.createWriteStream('output.txt');
var through = highland.pipeline(function(s) {
    var headerStream, headers;
    // setup a shared variable to store the headers
    headers = [];
    // setup the csv processing
    s = s
        // split input into lines
        .split()
        // remove empty lines
        .compact()
        // split lines into arrays
        .map(function(row) {
            return row.split(',');
        });
    // create a new stream to grab the header
    headerStream = s.observe();
    // pause the original stream
    s.pause();
    // setup processing of the non-header rows
    s = s
        // drop the header row
        .drop(1)
        // convert the rest of the rows to objects
        .map(function(row) {
            var obj = headers.reduce(function(obj, key, i) {
                obj[key] = row[i];
                return obj;
            }, {});
            return JSON.stringify(obj) + "\n";
        });
    // grab the first row from the header stream
    // save the headers and then resume the normal stream
    headerStream.head().toArray(function(rows) {
        headers = rows[0];
        s.resume();
    });
    return s;
});
_(stream)
    .pipe(through)
    .pipe(output);

也就是说,您的csv解析不会考虑值中的换行和逗号转义。通常,在csv文件中,这是通过用双引号括起值来实现的。然后通过将两个引号放在一起来转义双引号。要做到这一点有点棘手,所以我建议使用一个可以处理它的包,比如fast-csv

然后,您的代码可能如下所示:

代码语言:javascript
复制
var _      = require('highland');
var fs     = require('fs');
var csv    = require('fast-csv');
var stream = fs.createReadStream('./data/gigfile.txt');
var output = fs.createWriteStream('output.txt');

_(stream.pipe(csv({headers: true, ignoreEmpty: true})))
    .map(function(row) {
        return JSON.stringify(row) + "\n";
    })
    .pipe(output);
票数 4
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/29395358

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档