Sequelize等待,直到循环通过回调结束

Sequelize wait until loop finished with callback

本文关键字:回调 结束 循环 等待 Sequelize      更新时间:2023-10-30

来自php背景,我正试图了解这些回调内容。

基本上,我想得到一些行,然后我想循环浏览这些行,并将它们与其他模型(不同的数据库)进行检查。我希望回拨电话,直到他们都接通并检查过为止。

在sequelize遍历所有结果之前调用回调。

基本上,我希望函数是"阻塞"的。我需要改变什么?

toexport.getlasttransactions = function(lower,upper,callback){
    var deferred = Q.defer();
    var transactionsToUpdate = [];
    ///////////////////////////
    // set import conditions //
    ///////////////////////////
    var lowerbound = (lower) ? lower.format() : moment.utc().subtract(10, 'minutes').format();
    var upperbound = (upper) ? upper.format() : moment.utc().format();
    ///////////////////////////////
    // get IDs From Failed syncs //
    ///////////////////////////////
    FailedSync.find({ limit: 100 })
    .then(function(res){
        var FailedIDs = [];
        _.each(res, function(value,index){
            FailedIDs.push(value.transaction_id);
        });
        // build condition
        var queryCondition = { where: { updated_at: { between: [lowerbound,upperbound] } }, limit: 3 };
        if(FailedIDs.length > 0){
            queryCondition = {
                where: Sequelize.and({ updated_at: { between: [lowerbound,upperbound] } },
                Sequelize.or(
                  { id: FailedIDs }
                ))
            }
        }
        //////////////////////////////
        // get Phoenix Transactions //
        //////////////////////////////
        PhoenixTransaction
        .findAll(queryCondition)
        .then(function(poenixTrx){
            _.each(poenixTrx, function(value, index){
                Transaction.findOne({ where: { id: value.id }})
                .then(function(result){
                    if(!result || result.length === 0){
                        transactionsToUpdate.push(value);
                        console.log('!result || result.length === 0')
                    }
                    else if(result && result.length === 1){
                        if(result.hash != value.hash){
                            transactionsToUpdate.push(value);
                            console.log('result.hash != poenixTrx[i].hash')
                        }
                    }


                })
                .catch(function(err) {
                  console.log(err)
                })

            })
            deferred.resolve(transactionsToUpdate);

        })
        .catch(function(err){
          throw new Error("Something went wrong getting PhoenixTransaction") 
        })
    })
    deferred.promise.nodeify(callback);
    return deferred.promise;    
}

您的代码中有很多新的promise用户的模式:

  • 你在不需要的时候使用延期
  • 您没有使用promise聚合方法
  • 你不是在合适的地方等待,而是在筑巢

承诺表示一段时间内的。您可以使用promise,并在稍后通过then访问其结果,而不是立即访问-Sequelize的promise基于bluebird,并提供丰富的API为您进行聚合。这是一个注释版本的清理代码-注意它不是嵌套的:

toexport.getlasttransactions = function(lower,upper){ // no need for callback
    var lowerbound = (lower || moment.utc().subtract(10, 'minutes')).format();
    var upperbound = (upper || moment.utc()).format();
    // use `map` over a `each` with a push.
    var failedIds = FailedSync.find({ limit: 100 }).map(function(value){ 
        return value.transaction_id;
    });
    // build condition.
    var queryCondition = {
        where: { updated_at: { between: [lowerbound,upperbound] } }, limit: 3 
    };
    var query = failedIds.then(function(ids){ // use promise as proxy
        if(ids.length === 0) return queryCondition;
        return { // You can return a value or a promise from `then`
            where: Sequelize.and({ updated_at: { between: [lowerbound,upperbound] } },
                   Sequelize.or({ id: ids});
        };
    });
    var pheonixTransactions = query.then(function(condition){
        return PhoenixTransaction.findAll(queryCondition); // filter based on result
    });
    return pheonixTransactions.map(function(value){ // again, map over each
        return Transaction.findOne({ where: { id: value.id }}); // get the relevant one
    }).filter(function(result){ // filter over if chain and push
        return (!result || result.length === 0) || 
               ((result && result.length === 1) && result.hash != value.hash);
    });
};

理想情况下,您可能希望使用类似Bluebird的reduce之类的东西和一系列promise,但我将提供异步系列实现,因为它更容易理解。

安装异步

npm install async

需要它在您的文件中

var async = require('async')

然后这样实现:

        //////////////////////////////
        // get Phoenix Transactions //
        //////////////////////////////
        PhoenixTransaction
        .findAll(queryCondition)
        .then(function(poenixTrx){
            var queryArray = poenixTrx.map(function(value){
                return function(callback){
                    Transaction.findOne({ where: { id: value.id }})
                    .then(function(result){
                        if(!result || result.length === 0){
                            transactionsToUpdate.push(value);
                            console.log('!result || result.length === 0')
                        }
                        else if(result && result.length === 1){
                            if(result.hash != value.hash){
                                transactionsToUpdate.push(value);
                                console.log('result.hash != poenixTrx[i].hash')
                            }
                        }
                        // trigger callback with any result you want
                        callback(null, result)
                    })
                    .catch(function(err) {
                      console.log(err)
                      // trigger  error callback
                      callback(err)
                    })
                }
            })
            // async.series will loop through he queryArray, and execute each function one by one until they are all completed or an error is thrown.
            // for additional information see https://github.com/caolan/async#seriestasks-callback
            async.series(queryArray, function(err, callback){
                // after all your queries are done, execution will be here
                // resolve the promise with the transactionToUpdate array
                deferred.resolve(transactionsToUpdate);
            })

        })
        .catch(function(err){
          throw new Error("Something went wrong getting PhoenixTransaction") 
        })

老实说,整个事情有点混乱。特别是承诺/回调的混淆可能会在某个时候给你带来问题。不管怎样,你在transactionsToUpdate上使用了deferred.resolve,它只是一个数组,所以它会立即调用回调。

如果保持该脚本的原样,则使用_。each之类的东西(https://github.com/caolan/async)在paradell中运行事务并将其用作回调。

它可能看起来像这样:

toexport.getlasttransactions = function(lower,upper,callback){
    var transactionsToUpdate = [];
    ///////////////////////////
    // set import conditions //
    ///////////////////////////
    var lowerbound = (lower) ? lower.format() : moment.utc().subtract(10, 'minutes').format();
    var upperbound = (upper) ? upper.format() : moment.utc().format();
    ///////////////////////////////
    // get IDs From Failed syncs //
    ///////////////////////////////
    FailedSync.find({ limit: 100 })
    .then(function(res){
        var FailedIDs = [];
        _.each(res, function(value,index){
            FailedIDs.push(value.transaction_id);
        });
        // build condition
        var queryCondition = { where: { updated_at: { between: [lowerbound,upperbound] } }, limit: 3 };
        if(FailedIDs.length > 0){
            queryCondition = {
                where: Sequelize.and({ updated_at: { between: [lowerbound,upperbound] } },
                Sequelize.or(
                  { id: FailedIDs }
                ))
            }
        }
        //////////////////////////////
        // get Phoenix Transactions //
        //////////////////////////////
        PhoenixTransaction
        .findAll(queryCondition)
        .then(function(poenixTrx){
            async.each(poenixTrx, function(value, next){
                Transaction.findOne({ where: { id: value.id }})
                .then(function(result){
                    if(!result || result.length === 0){
                        transactionsToUpdate.push(value);
                        console.log('!result || result.length === 0')
                    }
                    else if(result && result.length === 1){
                        if(result.hash != value.hash){
                            transactionsToUpdate.push(value);
                            console.log('result.hash != poenixTrx[i].hash')
                        }
                    }
                    next();
                })
                .catch(function(err) {
                  console.log(err)
                })

            }, function(err) {
              //Return the array transactionsToUpdate in your callback for further use
              return callback(err, transactionsToUpdate);
            });
        })
        .catch(function(err){
          throw new Error("Something went wrong getting PhoenixTransaction") 
        })
    })
}

这就是回调的方式。但你需要下定决心使用什么:回调或承诺。不要同时使用这两种方法(如:如果您的方法需要回调,则不应返回promise,或者如果返回promise则不应期望回调)。

另外,如果你使用回调,你不想抛出错误,你只需要调用回调并在回调中给出错误-无论谁使用你的方法,都可以从回调中检查错误并进行处理。

希望这对你来说有点道理,我知道整个回调和承诺的事情有点奇怪,如果你来自php之类的东西,它需要一些习惯:)

感谢您解释这些差异。我认为使用promise是前进的方向,因为它使代码看起来更好,避免了这种"回调地狱"。

例如:

PhoenixSyncTransactions.getlasttransactions(lastTimeSynced,null)
.then(function(res){
    return PersistTransaction.prepareTransactions(res).then(function(preparedTrx){
      return preparedTrx;
    })
}).then(function(preparedTrx){
    return PersistTransaction.persistToDB(preparedTrx).then(function(Processes){
      return Processes;
    })
})
.then(function(Processes){
    return PersistTransaction.checkIfMultiProcess(Processes).then(function(result){
      return result;
    })
})
.then(function(result){
  console.log('All jobs done');
})

整个代码更容易阅读。