Node.js:执行多个异步操作的最佳方式,然后执行其他操作

Node.js: Best way to perform multiple async operations, then do something else?

本文关键字:执行 方式 然后 最佳 其他 操作 异步操作 js Node      更新时间:2023-09-26

在下面的代码中,我试图一次完成多个(大约10个)HTTP请求和RSS解析。

我在一个需要访问和解析结果的URI数组上使用标准的forEach构造。

代码:

var articles;
feedsToFetch.forEach(function (feedUri)
{   
        feed(feedUri, function(err, feedArticles) 
        {
            if (err)
            {
                throw err;
            }
            else
            {
                articles = articles.concat(feedArticles);
            }
        });
 });
 // Code I want to run once all feedUris have been visited

我知道,当调用一个函数一次时,我应该使用回调。然而,我能想到在本例中使用回调的唯一方法是调用一个函数,该函数计算它被调用的次数,并且只有当它被调用与feedsToFetch.length相同的次数时才继续,这似乎很糟糕。

所以我的问题是,在node.js.中处理这种情况的最佳方法是什么

最好不要有任何形式的阻塞!(我仍然想要那飞快的速度)。是承诺还是其他什么?

谢谢,Danny

无漏洞解决方案

承诺将包含在下一个JavaScript版本

流行的Promise库为这个确切的用例提供了一个.all()方法(等待一堆异步调用完成,然后做其他事情)。它与您的场景完美匹配

Bluebird还有.map(),它可以获取一个值数组并使用它来启动Promise链。

以下是使用Bluebird .map():的示例

var Promise = require('bluebird');
var request = Promise.promisifyAll(require('request'));
function processAllFeeds(feedsToFetch) {    
    return Promise.map(feedsToFetch, function(feed){ 
        // I renamed your 'feed' fn to 'processFeed'
        return processFeed(feed) 
    })
    .then(function(articles){
        // 'articles' is now an array w/ results of all 'processFeed' calls
        // do something with all the results...
    })
    .catch(function(e){
        // feed server was down, etc
    })
}
function processFeed(feed) { 
    // use the promisified version of 'get'
    return request.getAsync(feed.url)... 
}

还要注意,您不需要在这里使用闭包来累积结果。

Bluebird API文档也写得很好,有很多例子,所以它更容易上手。

一旦我学会了Promise模式,生活就变得轻松多了。我再怎么推荐也不为过。

此外,这是一篇关于使用promise、async模块和其他处理异步函数的不同方法的优秀文章

希望这能有所帮助!

无需破解

我建议使用异步模块,因为它可以让这类事情变得更容易。

async提供async.eachSeries作为arr.forEach的异步替换,并允许您在done回调函数完成时传递它。它将处理一个系列中的每个项目,就像forEach一样。此外,它还可以方便地将错误添加到回调中,这样您就不必在循环中使用处理程序逻辑。如果您想要/需要并行处理,可以使用async.each。

async.eachSeries调用和回调之间将不存在任何阻塞。

async.eachSeries(feedsToFetch, function(feedUri, done) {
  // call your async function
  feed(feedUri, function(err, feedArticles) {
    // if there's an error, "bubble" it to the callback
    if (err) return done(err);
    // your operation here;
    articles = articles.concat(feedArticles);
    // this task is done
    done();
  });
}, function(err) {
  // errors generated in the loop above will be accessible here
  if (err) throw err;
  // we're all done!
  console.log("all done!");
});

或者,您可以构建一个异步操作数组,并将它们传递给async.series。series将在系列(非并行)中处理结果,并在每个函数完成时调用回调。与async.eachSeries相比,使用此语法的唯一原因是您是否更喜欢熟悉的arr.forEach语法。

// create an array of async tasks
var tasks = [];
feedsToFetch.forEach(function (feedUri) {
  // add each task to the task array
  tasks.push(function() {
    // your operations
    feed(feedUri, function(err, feedArticles) {
      if (err) throw err;
      articles = articles.concat(feedArticles);
    });
  });
});
// call async.series with the task array and callback
async.series(tasks, function() {
 console.log("done !");
});

或者你可以自己滚动™

也许你觉得自己格外雄心勃勃,或者你不想依赖async。也许你和我一样无聊。无论如何,我特意复制了async.eachSeries的API,以便于理解它的工作原理。

一旦我们删除了这里的注释,我们只有9行代码,这些代码可以重用于我们想要异步处理的任何数组!它不会修改原始数组,错误可以发送到"短路"迭代,并且可以使用单独的回调。它还可以处理空数组。仅9行就有相当多的功能:)

// void asyncForEach(Array arr, Function iterator, Function callback)
//   * iterator(item, done) - done can be called with an err to shortcut to callback
//   * callback(done)       - done recieves error if an iterator sent one
function asyncForEach(arr, iterator, callback) {
  // create a cloned queue of arr
  var queue = arr.slice(0);
  // create a recursive iterator
  function next(err) {
    // if there's an error, bubble to callback
    if (err) return callback(err);
    // if the queue is empty, call the callback with no error
    if (queue.length === 0) return callback(null);
    // call the callback with our task
    // we pass `next` here so the task can let us know when to move on to the next task
    iterator(queue.shift(), next);
  }
  // start the loop;
  next();
}

现在,让我们创建一个示例异步函数来使用它。我们将在这里用500毫秒的setTimeout来伪造延迟。

// void sampleAsync(String uri, Function done)
//   * done receives message string after 500 ms
function sampleAsync(uri, done) {
  // fake delay of 500 ms
  setTimeout(function() {
    // our operation
    // <= "foo"
    // => "async foo !"
    var message = ["async", uri, "!"].join(" ");
    // call done with our result
    done(message);
  }, 500);
}

好的,让我们看看它们是如何工作的!

tasks = ["cat", "hat", "wat"];
asyncForEach(tasks, function(uri, done) {
  sampleAsync(uri, function(message) {
    console.log(message);
    done();
  });
}, function() {
  console.log("done");
});

输出(每次输出前延迟500毫秒)

async cat !
async hat !
async wat !
done

使用url列表的副本作为队列来跟踪到达情况,这很简单:(所有更改已注释)

var q=feedsToFetch.slice(); // dupe to censor upon url arrival (to track progress)
feedsToFetch.forEach(function (feedUri)
{   
        feed(feedUri, function(err, feedArticles) 
        {
            if (err)
            {
                throw err;
            }
            else
            {
                articles = articles.concat(feedArticles);
            }
            q.splice(q.indexOf(feedUri),1); //remove this url from list
            if(!q.length) done(); // if all urls have been removed, fire needy code
        });
 });
function done(){
  // Code I want to run once all feedUris have been visited
}

最后,这并没有承诺那么"肮脏",并且为您提供了重新加载未完成url的机会(单独的计数器不会告诉您哪一个失败)。对于这个简单的并行下载任务,它实际上会为您的项目实现Promises添加比简单队列更多的代码,而Promise.all()并不是最直观的地方。一旦你进入子查询,或者想要比火车残骸更好的错误处理,我强烈建议你使用Promises,但你不需要火箭发射器来杀死松鼠。。。