我觉得这里的一切都乱了套。我想建立到mySQL数据库的连接。然后我想读入一个文件。我想逐行获取名称并运行查询。我假设返回promise的sqlSelectQuery函数会等待promise解析,然后再转到下一行。这里我漏掉了什么?
const mysql = require('mysql');
const fs = require('fs');
const path = require('path');
const csv = require('fast-csv');
const config = require('./config')
const connection = mysql.createConnection({
user: config.user,
password: config.password,
database: config.database,
host: config.host
});
connection.connect((err) => {
if(err){
console.log('Error connecting to Db');
return;
}
console.log('Connection established');
});
fs.createReadStream(path.resolve(__dirname,'data.csv'))
.pipe(csv.parse({ headers: true }))
.on('error', error => console.error("error", error))
.on('data', row => { // need to get this to block
sqlSelectQuery(row).then(result => console.log("result: ", result))
})
.on('end', rowCount => console.log(`Parsed ${rowCount} rows`));
const sqlSelectQuery = (row) => {
return new Promise((resolve, reject) => {
console.log("inside promise");
const selectQuery = 'SELECT * FROM loans where business_name = ?;';
connection.query(selectQuery, [row.BorrowerName], (err,rows) => {
let result = {};
if(err) reject(err);
if (rows.length === 1){
let res = rows[0];
result = {
business_name: res.business_name,
loan_range: res.loan_range,
loan_amount: row.InitialApprovalAmount,
count: 1
};
resolve(result);
} else {
result = {
business_name: row.BorrowerName,
loan_range: "",
loan_amount: "",
unique: rows.length
};
resolve(result);
}
});
})
}
my console looks like this
inside promise
inside promise //20 times (I have 20 rows)
Parsed 20 rows
Connection established
result: {....}
result: {...}....发布于 2021-03-10 22:21:26
我找到了这个答案。我需要添加暂停和恢复nodejs async await inside createReadStream
.on('data', async (row) => { // need to get this to block
stream.pause();
await sqlSelectQuery(row).then(result => console.log("result: ", result))
stream.resume();
})现在的问题是我的.on('end')在最后一行之前运行。
发布于 2021-03-10 22:15:37
您可以将每一行添加到rowsToProcess数组中,然后,在读取文件数据后,逐一处理每一行:
const mysql = require('mysql');
const fs = require('fs');
const path = require('path');
const csv = require('fast-csv');
const config = require('./config')
const connection = mysql.createConnection({
user: config.user,
password: config.password,
database: config.database,
host: config.host
});
connection.connect((err) => {
if (err) {
console.error('Error connecting to Db:', err);
return;
}
console.log('Connection established');
const rowsToProcess = [];
fs.createReadStream(path.resolve(__dirname,'data.csv'))
.pipe(csv.parse({ headers: true }))
.on('error', error => console.error("error", error))
.on('data', row => {
// Add row to process.
rowsToProcess.push(row);
})
.on('end', async rowCount => {
await processRows(rowsToProcess);
console.log("processRows: complete.")
})
});
async function processRows(rowsToProcess) {
console.log(`Read ${rowsToProcess.length} row(s) from csv file...`)
for (let i = 0; i < rowsToProcess.length; i++) {
console.log(`processing row ${i+1} of ${rowsToProcess.length}...`);
let result = await sqlSelectQuery(rowsToProcess[i])
console.log(`row ${i+1} result:`, result);
}
}
const sqlSelectQuery = (row) => {
return new Promise((resolve, reject) => {
console.log("Processing row:", row);
const selectQuery = 'SELECT * FROM loans where business_name = ?;';
connection.query(selectQuery, [row.BorrowerName], (err,rows) => {
let result = {};
if(err) reject(err);
if (rows.length === 1){
let res = rows[0];
result = {
business_name: res.business_name,
loan_range: res.loan_range,
loan_amount: row.InitialApprovalAmount,
count: 1
};
resolve(result);
} else {
result = {
business_name: row.BorrowerName,
loan_range: "",
loan_amount: "",
unique: rows.length
};
resolve(result);
}
});
})
}https://stackoverflow.com/questions/66566132
复制相似问题