管理异步性并在节点中执行多个 MySQL 查询

Managing asynchronicity and performing multiple MySQL queries in node

本文关键字:执行 MySQL 查询 节点 异步 管理      更新时间:2023-09-26

我在处理nodejs的异步性质时遇到了一些问题。我正在使用nodejs-mysql连接器来获取数据。实质上,下面的代码正在执行查询以获取file_url并将信息发送到 myApi。 第二个查询应该在之后立即运行,以使用从 api(inputId 返回的结果更新数据库。但是,以前的异步方式不起作用(连接在处理所有内容之前关闭)。使用 async 模块执行以下操作的最佳方法是什么?

var mysql = require("mysql");
var async = require("async");
var Q = require('q'),
    myApi = require('myApi')('xxxxx');
//DB
var con = mysql.createConnection({
    host: "localhost",
    user: "root",
    password: "root",
    database: "test"
});

//1ST Query - Fetch Urls, then upload file to api
con.query('SELECT file_url, file_id FROM myfiles', function(err, rows) {
    if (err) throw err;
    for (var i = 0; i < rows.length; i++) {
        // File URL for upload
        var fileUrl = rows[i].file_url,
            createInputPromise, createEncodingProfilePromise;
        var fileId = rows[i].file_id;
        // Create myApi Input
        createInputPromise = myApi.input.create(fileUrl);
        //WHERE THE ACTION OCCURS!
        Q.all([createInputPromise, createEncodingProfilePromise]).then(
            function(result) {
                console.log('Successfully uploaded file: ' + result[0].url + ' inputId: ' + result[0].inputId);
                //2ND Query - Save the inputId return by api for each file
                con.query(
                    'UPDATE myfiles SET myApi_input_id = ?  WHERE file_id = ?', [result[0].inputId, fileId],
                    function(err, result) {
                        if (err) throw err;
                        console.log('Changed ' + result.changedRows + ' rows');
                    }
                );
            },
            function(error) {
                console.log('Error while uploading file to api:', error);
            }
        );
    };
});
con.end(function(err) {});

发布的答案是不错的选择,但我最终想通过使用async来完成它。我遇到了瀑布模式:

串联运行函数数组,每个函数将其结果传递给 数组中的下一个。但是,如果任何函数传递错误 对于回调,下一个函数不执行,主函数 立即调用回调并显示错误。

我结束了以下实施:

var pool = mysql.createPool({...});
function fn(callback) {
    var getConnection = function(callback) {...};
    async.waterfall([
        doQuery.bind(null, getConnection),
        doSomethingAsyncWithResult,
        doUpdate(null, getConnection)
    ], function(err, result) {
        getConnection.end();
        callback(err, result);
    });
}
function doQuery(connection, callback) {
    connection.query(sql, callback);
}
function doSomethingAsyncWithResult(result, callback) {
    ...something...
    callback(null, anotherResult);
}
function doUpdate(connection, result, callback) {
    connection.update(sql, [result], callback);
}

在很大程度上取决于您如何处理这些:createInputPromisecreateEncodingProfilePromise。与这两个相关的更多代码将有助于获得一个好的答案。您究竟在代码中哪里遇到问题?console.log('Successfully uploaded file: ' + result[0].url + ' inputId: ' + result[0].inputId);输出到控制台,还是在此之前有错误?

现在,你确定myApi.input.create(fileUrl)在回报承诺吗?您需要查看该 API 的文档以确保它是。如果没有,那么您可以使用 Q 库创建一个:

var inputPromise = Q.defer();
var encodingProfilePromise = Q.defer();
inputPromise.resolve(myApi.input.create(fileUrl));
encodingProfilePromise.resolve(/*encoding profile code*/);
Q.all([inputPromise.promise, encodingProfilePromise.promise]).then(function(result) {
  /*blah blah blah*/
});

有了更多信息,我可以给出更准确的答案。

而不是异步模块,我会以不同的方式看待它。让我们去发电机。

let co = require('co');
let parallel = require('co-parallel');
let sequelize = require('sequelize'); // also read documentation to provide connection details (this is just pseudocode)

let gen = function*() {
  let inputId = yield fetch_file_url(); // this function should be generator or should return promise
  let res = yield sequelize.query(`INSERT INTO... ${inputId} `);
}
co(function*() {
  let files = ['f1', 'f2', 'f3'];
  let gens = files.map(file => gen(file));
  let arrayOrResults = yield parallel(gens, 5);  // 5 threads
    // done
}).catch(function(err) {
  // catch errors
});