我必须在NodeJS中解析一个非常大的CSV文件,并将其保存在一个数据库中(异步操作),该数据库一次最多允许500个条目。由于内存的限制,我不得不流式传输CSV文件,并希望使用PapaParse来解析CSV文件(因为这在我的例子中工作得最好)。
因为PapaParse使用回调风格的方法来解析Node.js流,所以我没有看到一个容易将highland (用于批处理和数据转换)和PapaParse结合起来的方法。因此,我尝试使用ParseThrough流将数据写入和读取该流,并使用高地进行批处理:
const csv = require('papaparse');
const fs = require('fs');
const highland = require('highland');
const { PassThrough } = require('stream');
const passThroughStream = new PassThrough({ objectMode: true });
csv.parse(fileStream, {
step: function(row) {
// Write data to stream
passThroughStream.write(row.data[0]);
},
complete: function() {
// Somehow "end" the stream
passThroughStream.write(null);
},
});
highland(passThroughStream)
.map((data) => {
// data transform
})
.batch(500)
.map((data) => {
// Save up to 500 entries in database (async call)
});显然,这并不能按原样工作,也不会真正做任何事情。有没有可能甚至是更好的方式来解析非常大的CSV文件并将行保存到数据库中(以批处理的方式保存,最多500行)?
编辑:使用csv包(https://www.npmjs.com/package/csv)可能是这样的(fast-csv也是如此):
highland(fileStream.pipe(csv.parse()))
.map((data) => {
// data transform
})
.batch(500)
.map((data) => {
// Save up to 500 entries in database (async call)
});但不幸的是,这两个NPM包并不能在所有情况下正确解析CSV文件。
发布于 2018-02-08 01:04:26
在快速浏览了papaparse之后,我决定用scramjet实现CSV解析器。
fileStream.pipe(new scramjet.StringStream('utf-8'))
.csvParse(options)
.batch(500)
.map(items => db.insertArray('some_table', items))我希望这对你有用。:)
https://stackoverflow.com/questions/48632646
复制相似问题