为了抓取页面,我一直试图设置一个通风机/工人/水槽模式,但我从未通过测试阶段。我的装置的一个特殊之处是水槽与通风机生活在同一个过程中。所有节点都使用ipc://传输。目前只交换测试消息。呼吸机发送任务,工作人员接收它们,然后等待,然后发送确认到水槽。
症状:一段时间后(通常不到5分钟),水槽停止接收确认信息,尽管呼吸机继续发送任务,而工作人员继续接收确认信息并发送确认消息。
我知道确认是发送的,因为如果我重新启动接收器,它会在启动时获得所有丢失的消息。
我以为ZeroMQ处理的是自动重新连接。
通风机/水槽
var push = zmq.socket('push');
var sink = zmq.socket('pull');
var pi = 0;
setInterval(function() {
push.send(['ping', pi++], zmq.ZMQ_SNDMORE);
push.send('end');
}, 2000);
push.bind('ipc://crawl.ipc');
sink.bind('ipc://crawl-sink.ipc');
sink.on('message', function() {
var args = [].slice.apply(arguments).map(function(e) {return e.toString()});
console.log('got message', args.join(' '));
});worker.js
var pull = zmq.socket('pull');
var sink = zmq.socket('push');
sink.connect(opt.sink);
pull.connect(opt.push);
pull.on('message', function() {
var args = [].slice.apply(arguments).map(function(e) {return e.toString()});
console.log('got job ', args.join(' '));
setTimeout(function() {
console.log('job done ', args.join(' '));
sink.send(['job done', args.join(' ')]);
}, Math.random() * 5 * 1000);
});编辑我尝试将接收器移动到另一个进程,它看起来很有效。不过,我真的希望它生活在相同的进程中,并且在每个进程处理多个zmq套接字时,无论使用何种模式,我都观察到了类似的行为。
编辑我正在使用这个模块https://github.com/JustinTulloss/zeromq.node
发布于 2013-02-05 17:16:11
我不一定认为这个答案会被接受,但我把它放在这里作为参考。有一个非常忠实的纯节点模块叫做轴突,它是受ZeroMQ启发的。
注意:、ZMQ和Axon不能互操作。
发布于 2013-03-05 20:46:53
不确定您使用的是基本的AMQP客户端,还是在幕后使用它的包,我对RabbitMQ也有类似的问题。实际进程仍在运行(setInterval工作)
我正在运行我的服务/工人通过cluster.fork从主要进程..。主进程中有侦听器在退出时重新启动工人/服务.在我的工作人员内部,我有一个每X秒运行一次的setInterval,如果在这段时间内没有完成任何工作,我就有我的worker process.exit (主进程侦听器将在这里启动一个新的分叉)。这对我来说是足够的弹性了。通过运行多个工作人员(侦听队列),仍然可以完成工作。
正如另一个建议的那样,我一直在考虑切换到Axon,因为我与MQ的所有接口都正在通过Node。我的其他系统通过NodeJS驱动的API服务进行接口。就这一点而言,通过API服务公开您可能需要的内容可能并不困难。
https://stackoverflow.com/questions/14709398
复制相似问题