amqplib, checkQueue()崩溃我的应用程序时,错误命中

amqplib, checkQueue() crashing my app when error hit

本文关键字:错误 应用程序 崩溃 checkQueue amqplib 我的      更新时间:2023-09-26

我正在用express写一个服务投资者,接受请求并将它们传递给rabbit MQ。

我似乎无法优雅地绕过checkQueue()错误。每次我点击它们,我的整个应用都会因为这个错误而崩溃。

events.js:141
  throw er; // Unhandled 'error' event
  ^
Error: Channel closed by server: 404 (NOT-FOUND) with message "NOT_FOUND - no queue 'asdfasdfb' in vhost '/'"
  at Channel.C.accept (/app/node_modules/amqplib/lib/channel.js:406:17)
  at Connection.mainAccept [as accept] (/app/node_modules/amqplib/lib/connection.js:63:33)
  at Socket.go (/app/node_modules/amqplib/lib/connection.js:476:48)
  at emitNone (events.js:67:13)
  at Socket.emit (events.js:166:7)
  at emitReadable_ (_stream_readable.js:411:10)
  at emitReadable (_stream_readable.js:405:7)
  at readableAddChunk (_stream_readable.js:157:11)
  at Socket.Readable.push (_stream_readable.js:110:10)
  at TCP.onread (net.js:523:20)

这是让我悲伤的AMQP代码

.post(function(req, res) {
var promise =  new Promise(function(resolve, reject){
  var queue = req.body.queue;
  if ( typeof(queue) == 'undefined' || queue.length == 0 ) return reject("No Queue Specified");
  // need to add VALID JSON CHECK
  var message = req.body.message;
  if ( typeof(message) == 'undefined' || message.length == 0 ) reject("No Message Specified");
  // select the Awknoledgement of the queue.
  var noAck = req.body.ack || true;
  // select the durabiltiy of the queue. Custing issues in the below if different hten when created.
  var durableOpt = req.body.durable || false;
  var assertOpt = {durable: durableOpt};
  var sendOptions = {
    noAck: noAck,
    contentType: 'application/json' };
  amqp.connect( process.env.QUEUE_HOST , function(err, conn) {
    if (err) return reject(err);
    conn.createChannel(function(err, ch) {
      if (err) return reject(err);
      var check = new Promise(function(resolve, reject){
        ch.checkQueue(queue, function(err,ok){
          if (err != null){
            return reject(err);
          } else {
            return resolve(true);
          }
        })
      })
      var assert = new Promise(function(resolve, reject){
        ch.assertQueue( queue , assertOpt, function(err,ok){
          if (err != null){
            return reject(err);
          } else {
            return resolve(true);
          }
        })
      })
      Promise.all( [ check , assert ] ).then(function( values ){
        for (var i = 0; i < values.length; i++) {
          if (values[i] !== true){
            reject(values[i])
          }
        }
        if( ch.sendToQueue(queue, new Buffer( JSON.stringify( message ) ) , sendOptions) == true){
          return resolve('message added');
        }
      }) // end Promise.all
    }) // end conn.createChannel
  }) // end amqp.connect
}) // end of promise.
promise.then(
function(response){
  res.status(200).json(response);
},
function( error ){
  res.status(400).json(error);
})
})

如果我曾经在checkQueue中遇到错误,或者我实际上在AssertQueue中有错误。它总是崩溃并出现上述错误。通道总是关闭,并得到一个events.js:141错误。有什么方法可以避免呼叫错误导致我的频道中断吗?或者当错误发生时,我需要在运行中重新连接它吗?

我找到了导致问题的原因…conn抛出了一个错误,我没有"正确地"捕获,我尝试了一个标准的尝试/捕获,但它也不喜欢。

在文档中挖掘了一段时间后,我在"事件"下找到了它,我想这是有意义的,但是如果标记了错误处理就好了。

conn.on('error', function(handle){
  reject(handle);
})

我不确定,但似乎你把return reject(err);称为承诺之外。

试试这个代码

amqp.connect( process.env.QUEUE_HOST , function(err, conn) {
  if (err) return console.log(err);
  conn.createChannel(function(err, ch) {
    if (err) return console.log(err);
    var check = new Promise(function(resolve, reject){
      <<< SPECIFICALLY THIS >>>
      ch.checkQueue(queue, function(err,ok){
        if (err != null){
          return reject(err);
        } else{
          return resolve(true);
        }
      })
    })
    var assert = new Promise(function(resolve, reject){
      <<< AND THIS >>>
      ch.assertQueue( queue , assertOpt, function(err,ok){
        if (err != null){
          return reject(err);
        } else {
          return resolve(true);
        }
      });
    })
    Promise.all( [check,assert] ).then(function(values){
      console.log(values);
      if( ch.sendToQueue(queue, new Buffer( JSON.stringify( message ) ) , sendOptions) == true){
        return resolve('message added');
      }
    })
  });
})