串行地处理异步处理的消息队列

Serially processing a queue of messages whose processing is async

本文关键字:处理 消息 队列 异步      更新时间:2023-09-26

当处理消息的函数对消息进行异步操作时,如何按照到达的顺序处理传入函数的一系列消息?

一个例子:

var processMessage = function(msg) {
  switch (msg.action) {
    case "doFoo":
      this.processFoo(()=> {
        // `foo` is done processing
      });
    break;
    case "doBar":
      this.processBar(()=> {
        // `bar` is done processing
      });
    break;
    case "doBaz":
      this.processBaz(()=> {
        // `baz` is done processing
      });
     break;
  }
}
指出

  • 我当然可以将项目推入数组,然后使用async eachSeries来处理消息数组
  • 然而,消息不断到来,因此用更多要处理的项目填充数组,导致处理不稳定

对于这类问题有什么实际的/标准的解决方案吗?

一般方案:

  1. 当新消息到达时,检查一个标志,看看您是否已经在处理消息的中间。如果设置了标志,只需将消息添加到队列中。
  2. 如果没有设置标志,检查队列,如果队列中有消息,则从队列中删除该消息。
  3. 当您开始处理消息时,设置一个标志,表示您现在正在处理消息。
  4. 启动处理消息的异步操作
  5. 当异步消息完成的回调信号发生时,清除标志,表明你正在处理中间,并递归地调用从第2步开始的函数。

在两点触发对新消息的处理。首先,当一个新消息到达时,您还没有处理消息;其次,当一些其他消息的处理完成时,您检查在处理过程中是否有其他东西添加到队列中。

你维护一个inProcessing类型标志,这样你就不会在另一个消息已经在处理的时候无意中开始处理传入的消息(这会强制执行你请求的串行执行)。

必须严格处理错误条件,这样标志才不会卡住,队列处理才不会停止。

在伪代码中(假设这些是包含数组作为队列的队列对象上的方法):

addQueue: function(msg) {
    if (!this.inProcess) {
        // not currently processing anything so just process the message
        this.processMessage(msg);
    } else {
        this.queue.push(msg);
    }
},

processMessage: function(msg, completeFn) {
     var self = this;
     // must set this flag before going async
     self.inProcess = true;
     // asynchronously process this message
     someAsyncProcessing(msg, function(err) {
         self.inProcess = false;
         if (completeFn) {
             completeFn(err);
         }
         // see if anything else is in the queue to process
         if (self.queue.length) {
             // pull out oldest message and process it
             var msg = self.queue.shift();
             self.processMessage(msg);
         }
     });
}