我正在写一个后端应用程序接口在node.js中,需要为用户能够上传文件与数据,然后调用存储过程插入数据到MySQL的功能。我正在考虑使用fast-csv作为解析器,但是我正在努力解决如何在csv流中设置对存储过程的调用。这个想法是这样的:
var fs = require("fs");
var csv = require("fast-csv");
var stream1 = fs.createReadStream("files/testCsvFile.csv");
csv
.fromStream(stream2, { headers: true })
.on("data", function(data) {
//CALL TO SP with params from "data"//
numlines++;
})
.on("end", function() {
console.log("done");
});在应用程序的其他部分中,我设置了如下路由:
auth.post("/verified", async (req, res) => {
var user = req.session.passwordless;
if (user) {
const rawCredentials = await admin.raw(getUserRoleCredentials(user));
const { user_end, role } = await normalizeCredentials(rawCredentials);
const user_data = { user_end, role };
res.send(user_data);
} else {
res.sendStatus(401);
}
});..that is路由是以异步/等待的方式编写的,查询(所有都是调用的存储过程)被定义为Promises..我想在每个行函数的upload/parse csv/call SP中遵循此模式
发布于 2018-08-10 00:02:08
正如评论中提到的,我的scramjet可以轻松处理这样的用例……如果我理解错了,请纠正我,但我知道您希望为测试中的每个CSV行调用两个等待行。
如果是这样,您的代码将如下所示(更新以匹配您的注释/答案):
var fs = require("fs");
var csv = require("fast-csv");
var stream1 = fs.createReadStream("files/testCsvFile.csv");
var {DataStream} = require("scramjet");
DataStream
// the following line will convert any stream to scramjet.DataStream
.from(csv.fromStream(stream2, { headers: true }))
// the next lines controls how many simultaneous operations are made
// I assumed 16, but if you're fine with 40 or you want 1 - go for it.
.setOptions({maxParallel: 16})
// the next line will call your async function and wait until it's completed
// and control the back-pressure of the stream
.do(async (data) => {
const query = await queryBuilder({
schema,
routine,
parameters,
request
}); //here we prepare query for calling the SP with parameters from data
winston.info(query + JSON.stringify(data));
const rawResponse = await session.raw(query); //here the query gets executed
return data; // push each row - for testing only)
})
// next line will run the stream until end and return a promise
.toArray()
.then(fileRows => {
console.log(fileRows);
fs.unlinkSync(form.FileName); // remove temp file
//process "fileRows" and respond
res.end(JSON.stringify(fileRows)); // - for testing
})
.catch(e => {
res.writeHead(500); // some error handling
res.end(e.message);
})
;
// you may want to put an await statement before this, or call then to check
// for errors, which I assume is your use case.
;要回答您的注释问题-如果您要在on("data")事件中使用异步函数-您将需要创建一个promises数组,并在流end上等待该数组的Promise.all -但这需要同步完成-因此事件处理程序中的异步函数不会执行此操作。
在scramjet中,这是在幕后进行的,所以你可以使用函数。
发布于 2018-08-10 18:12:18
这是为我做的工作--你能描述一下如何用你的框架实现这一点吗--我相信它应该以某种方式完成,我只需要正确地配置它
//use fast-csv to stream data from a file
csv
.fromPath(form.FileName, { headers: true })
.on("data", async data => {
const query = await queryBuilder({
schema,
routine,
parameters,
request
}); //here we prepare query for calling the SP with parameters from data
winston.info(query + JSON.stringify(data));
const rawResponse = await session.raw(query); //here the query gets executed
fileRows.push(data); // push each row - for testing only
})
.on("end", function() {
console.log(fileRows);
fs.unlinkSync(form.FileName); // remove temp file
//process "fileRows" and respond
res.end(JSON.stringify(fileRows)) // - for testing
});https://stackoverflow.com/questions/51751062
复制相似问题