首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Nodejs fast-csv和promises

Nodejs fast-csv和promises
EN

Stack Overflow用户
提问于 2021-03-10 21:52:00
回答 2查看 210关注 0票数 0

我觉得这里的一切都乱了套。我想建立到mySQL数据库的连接。然后我想读入一个文件。我想逐行获取名称并运行查询。我假设返回promise的sqlSelectQuery函数会等待promise解析,然后再转到下一行。这里我漏掉了什么?

代码语言:javascript
复制
const mysql = require('mysql');
const fs = require('fs');
const path = require('path');
const csv = require('fast-csv');
const config = require('./config')

const connection = mysql.createConnection({
    user: config.user,
    password: config.password,
    database: config.database,
    host: config.host
});

connection.connect((err) => {
    if(err){
      console.log('Error connecting to Db');
      return;
    }
    console.log('Connection established');
  });

fs.createReadStream(path.resolve(__dirname,'data.csv'))
    .pipe(csv.parse({ headers: true }))
    .on('error', error => console.error("error", error))
    .on('data', row => { // need to get this to block
        sqlSelectQuery(row).then(result => console.log("result: ", result))
    })
    .on('end', rowCount => console.log(`Parsed ${rowCount} rows`));




const sqlSelectQuery = (row) => {
    return new Promise((resolve, reject) => {
        console.log("inside promise");
        const selectQuery = 'SELECT * FROM loans where business_name = ?;';
        connection.query(selectQuery, [row.BorrowerName], (err,rows) => {
            let result = {};
            if(err) reject(err);
            if (rows.length === 1){
                let res = rows[0];
                result = {
                    business_name: res.business_name,
                    loan_range: res.loan_range,
                    loan_amount: row.InitialApprovalAmount,
                    count: 1
                };
                resolve(result);
            } else {
                result = {
                    business_name: row.BorrowerName,
                    loan_range: "",
                    loan_amount: "",
                    unique: rows.length
                };
                resolve(result);
            }
        });
    })
}

my console looks like this
inside promise
inside promise  //20 times (I have 20 rows)
Parsed 20 rows
Connection established
result:  {....}
result: {...}....
EN

回答 2

Stack Overflow用户

发布于 2021-03-10 22:21:26

我找到了这个答案。我需要添加暂停和恢复nodejs async await inside createReadStream

代码语言:javascript
复制
.on('data', async (row) => { // need to get this to block
        stream.pause();
        await sqlSelectQuery(row).then(result => console.log("result: ", result))
        stream.resume();
    })

现在的问题是我的.on('end')在最后一行之前运行。

票数 1
EN

Stack Overflow用户

发布于 2021-03-10 22:15:37

您可以将每一行添加到rowsToProcess数组中,然后,在读取文件数据后,逐一处理每一行:

代码语言:javascript
复制
const mysql = require('mysql');
const fs = require('fs');
const path = require('path');
const csv = require('fast-csv');
const config = require('./config')

const connection = mysql.createConnection({
    user: config.user,
    password: config.password,
    database: config.database,
    host: config.host
});

connection.connect((err) => {
    if (err) {
        console.error('Error connecting to Db:', err);
        return;
    } 
    console.log('Connection established');
    const rowsToProcess = [];
    fs.createReadStream(path.resolve(__dirname,'data.csv'))
        .pipe(csv.parse({ headers: true }))
        .on('error', error => console.error("error", error))
        .on('data', row => {
            // Add row to process.
            rowsToProcess.push(row);
        })
        .on('end', async rowCount => { 
            await processRows(rowsToProcess);
            console.log("processRows: complete.")
        })
});

async function processRows(rowsToProcess) {
    console.log(`Read ${rowsToProcess.length} row(s) from csv file...`)
    for (let i = 0; i < rowsToProcess.length; i++) {
        console.log(`processing row ${i+1} of ${rowsToProcess.length}...`);
        let result = await sqlSelectQuery(rowsToProcess[i])
        console.log(`row ${i+1} result:`, result);
    }
}

const sqlSelectQuery = (row) => {
    return new Promise((resolve, reject) => {
        console.log("Processing row:", row);
        const selectQuery = 'SELECT * FROM loans where business_name = ?;';
        connection.query(selectQuery, [row.BorrowerName], (err,rows) => {
            let result = {};
            if(err) reject(err);
            if (rows.length === 1){
                let res = rows[0];
                result = {
                    business_name: res.business_name,
                    loan_range: res.loan_range,
                    loan_amount: row.InitialApprovalAmount,
                    count: 1
                };
                resolve(result);
            } else {
                result = {
                    business_name: row.BorrowerName,
                    loan_range: "",
                    loan_amount: "",
                    unique: rows.length
                };
                resolve(result);
            }
        });
    })
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/66566132

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档