如果ZMQ路由器繁忙,我如何让它引发错误

how do I get a ZMQ Router to raise an error if it is busy?

本文关键字:错误 路由器 ZMQ 如果      更新时间:2023-09-26

我有一个REQ -> ROUTER -> [DEALER,DEALER... DEALER]设置,其中REQ是客户端,路由器是队列,经销商套接字是处理数据并将其发送回路由器的工作人员,路由器将其发送回REQ。当有足够的经销商来处理这项工作时,工作很好。但是,如果我减慢经销商的速度,路由器永远不会告诉我它得到的工作超过了它的处理能力。

医生说:

路由器套接字确实有一种有点残酷的方式来处理它们不能发送到任何地方的消息:它们默默地丢弃它们。这种态度在工作代码中是有意义的,但它使调试变得困难。发送身份作为第一帧这种方法非常棘手,以至于我们在学习过程中经常出错,而当我们搞砸时,ROUTER的沉默也不是很有建设性。

因为ØMQ v3.2有一个套接字选项,你可以设置来捕获这个错误:ZMQ_ROUTER_MANDATORY。在ROUTER套接字上设置它,然后当你在send调用中提供不可路由标识时,套接字将发出EHOSTUNREACH错误信号。

老实说,我不确定这是不是我所看到的同样的问题。石头般的沉默肯定符合我所看到的。

下面是设置的代码:
var argsToString, buildSocket, client, q;
buildSocket = function(desc, socketType, port) {
  var socket;
  log("creating socket: " + (argsToString(Array.apply(null, arguments))));
  socket = zmq.socket(socketType);
  socket.identity = "" + desc + "-" + socketType + "-" + process.pid + "-" + port;
  return socket;
};
argsToString = function(a) {
  return a.join(', ');
};
client = buildSocket("client", 'req', clientPort);
q = buildSocket("q", "router", qPort);
q.setsockopt(zmq.ZMQ_ROUTER_MANDATORY, 1);
q.on('error', function() {
  return log('router error ' + argsToString(Array.apply(null, arguments)));
});

如果需要,我可以发布更多的代码。问题是,当REQ套接字在一秒钟内发送10条消息,但经销商需要2秒才能完成他们的工作时,路由器就会忽略传入的消息,而不考虑ZMQ_ROUTER_MANDATORY。我已经发送了1000条消息,从未见过任何套接字抛出错误(.on 'error')。

有关于ZMQ_HWM的讨论,但是节点驱动程序似乎不支持经销商或路由器。

我如何管理一个路由器,运行完的地方发送消息到?

首先,如果你正在实现一个特定的模式(正如我从你之前的问题中所知道的,你正在实现Paranoid Pirate),那么说出来总是有帮助的,因为它将为你的代码提供上下文。

你所说的在《偏执海盗》中没有特别提到。你可以通过跳过指南到泰坦尼克号模式来看到这一点……当您处理零星的连接性问题时,或者在您的情况下,由于新消息到达时您的工人仍在工作,因此您必须维护队列中工人的状态,以了解您需要对该消息做什么……要么把它发送给一个可用的worker,要么把它存储在某个地方,这样当一个worker可用时,你就可以把它拉出来发送。

如果您尽可能严格地这样做,您将破坏ZMQ的"队列"性质,但您避免了HWM中固有的不确定性,丢弃消息,而不是使系统崩溃。

你可以维护一个缓冲区,不断向队列中添加消息,直到你意识到你已经进入了HWM的40%(这取决于消息的大小)…这将在您开始保存消息之前为您提供一个缓冲区,但最终的过程是相同的。

这是ZMQ将责任转移给应用程序设计人员的一个领域,因为没有一种"正确"的方法可以为所有场景做事情。


编辑回应评论:

下面是我如何在node.js中处理这个问题的基本要点:

var worker_count = 0;
var job_count = 0;
// ...
q.on('message', function() {
    // ...
    if (msg.toString() == 'ready') worker_count++;
    else job_count--;
    // ...
    // this could use some TLC, but here's the basic gist of the logic...
    if (job_count >= worker_count) {
        // we'll assume the message was cached when it was received from
        // the req socket, if so nothing else to do here
    }
    else {
        // figure out if there is a cached message ready to go, if so, then...
        q.send(job);
        job_count++;
    }
});

也许DONT_WAIT标志可以帮助您。当接收方因为队列已满而无法再接收消息时,这将抛出一个错误:

http://api.zeromq.org/4-1 zmq-send

ZMQ_DONTWAIT对于套接字类型(DEALER, PUSH),当没有可用的对等体(或所有对等体都有完整的高水位标记)时阻塞,指定该操作应在非阻塞模式下执行。如果消息不能在套接字上排队,zmq_send()函数可以如果errno设置为EAGAIN,则失败

. . . . . . . . . . .(管理一个饱和的ZeroMQ-primitive元素)

not to be ?. . . . . . . . . . . . . . . . . . . . . . . . .(避免任何饱和)

而问题将问题定义为:

"我怎么得到…",

任何严肃的设计,在默认情况下,都应采取一切措施防止(避免)饱和。因此,端到端FlowControl &任何对任何信号控制平面应精心设计& &;实现饱和避免策略的考虑因素。

ZeroMQ scaleable Formal Communications Patterns框架利用了对后者的大量洞察,而不是前者。

非常感谢Martin Sustrik的&


使用ZeroMQ的隐藏功能vs.试图让ZeroMQ-primitive做它们不适合的工作

作为一个更一般的说明,这个经验使它更明显,抽象丰富的正式通信模式的概念是最令人兴奋和强大的"黑暗质量"。意外"behind"零/纳米,并不总是被看到,理解& &;利用这些伟大的工具来达到最好的效果。

绝对值得花时间&努力读完整本书,首先只拍下图中主要观点的照片。在之后,只有下到高级设计草图,在最后,编写源代码。

传输不可知,故障弹性并行负载平衡根本无法从单独的源代码行中理解。


您的场景可能受益于REQ -> ROUTER -> [DEALER,DEALER... DEALER]设置中不仅仅是.connect()的ZeroMQ基本元素&将它们放入更抽象的通信模式中,既满足应用程序需求,又能很好地满足实际使用中的负载平衡和故障恢复方面。

您可能还需要传播<<em>状态>将[worker]-client(s)返回到工作单元的[dispatcher],无论是单个还是大量负载均衡/按需分叉的ZeroMQ-primitive(s)池。

对于试图用ZeroMQ编码的前几件事来说,这可能听起来很复杂,但如果您至少跳到第1卷(PDF)的 265页,如果不是一步一步地阅读的话。

最快的学习曲线是首先在图上有一个未曝光的视图。60重新发布更新图62HA克隆服务器对,以获得可能的高可用性方法,然后返回根,元素和细节。