我将严格遵循以下教程:https://www.rabbitmq.com/tutorials/tutorial-two-java.html。
我这样启动RabbitMQ服务器:
docker pull rabbitmq
docker run -d --hostname my-rabbit-host --name my-rabbit -p 5672:5672 rabbitmq:3来自本教程:
使用此代码我们可以确保,即使您在处理消息时使用CTRL+C杀死一个工作人员,也不会有任何损失。工作进程死后不久,所有未确认的消息都将被重新传递。
我产生了两个消费者,当我对其中一个消费者执行CTRL+C操作时,另一个正在运行的消费者不会收到最初发往前一个消费者的消息。如何在其中一个消费者发出CTRL+C‘’ing后重新发送消息?
编辑:我现在正在通过'brew‘安装RabbitMQ,但我仍然看到同样的问题。
brew update
brew install rabbitmq
/usr/local/sbin/rabbitmq-server &发布于 2017-10-10 14:47:06
没有必要将睡眠或类似的东西放在消费者代码中。在您提供的链接上,搜索以手动消息确认开头的段落,并查看其中的代码。关键是不要承认这条消息。如果您将autoACK标志设置为true,那么您可以调用任何您想调用的内容,消息一收到就会得到确认。所以简单地不要设置该标志,而且为了测试,您可以注释掉行channel.basicAck(envelope.getDeliveryTag(), false);,以便也不进行手动确认。因此,当使用者退出时,消息仍将在队列中。
发布于 2019-03-25 02:40:05
奇怪的是,RabbitMQ为我开箱即用。
Step 1,我启动了RabbitMQ:
$ docker run -d --hostname my-rabbit-host --name my-rabbit -p 5672:5672 rabbitmq:3
$ docker ps
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
e0c3257b8b49 rabbitmq:3 "docker-entrypoint.s…" 18 minutes ago Up 14 minutes 4369/tcp, 5671/tcp, 25672/tcp, 0.0.0.0:5672->5672/tcp my-rabbit第2步,我发布了一条消息(顺便说一句,我尝试使用Node.js。源代码见下面附录):
$ node src/producer.js
Publisher: TODO 1stStep 3,我一个接一个地启动了两个消费者(我的消费者是为了测试目的而设计的,不是用来确认的,所以RabbitMQ永远不会将消息出队)。
消费者1将收到消息,而消费者2不会。
消费者1:
$ node src/consumer.js
Consumer: TODO 1st消费者2:
$ node src/consumer.js Step 4,当我通过'Ctrl + c‘停止消费者1时,消费者2将立即收到来自RabbitMQ的消息:
消费者2:
$ node src/consumer.js
Consumer: TODO 1st结论:基本上,在设置消费者时,我们需要告诉RabbitMQ在收到消费者的确认之前不要将消息出队。因此,如果消费者1在它有机会确认消息之前由于任何原因而停止,RabbitMQ将把消息重新传递给消费者2。
附录
src/producer.js
var q = 'tasks'
function bail (err) {
console.error(err)
process.exit(1)
}
// Publisher
function publisher (conn) {
conn.createChannel(onOpen)
function onOpen (err, ch) {
if (err != null) bail(err)
ch.assertQueue(q)
const msg = 'TODO 1st'
ch.sendToQueue(q, Buffer.from(msg), { persistent: true })
console.log('Publisher: ', msg)
}
}
require('amqplib/callback_api')
.connect('amqp://guest:guest@localhost', function (err, conn) {
if (err != null) bail(err)
publisher(conn)
})src/consumer.js
var q = 'tasks'
function bail (err) {
console.error(err)
process.exit(1)
}
// Consumer
function consumer (conn) {
conn.createChannel(onOpen)
function onOpen (err, ch) {
if (err != null) bail(err)
ch.assertQueue(q)
ch.consume(q, function (msg) {
if (msg !== null) {
console.log('Consumer: ', msg.content.toString())
// Commented out the line below, so RabbitMQ never dequeues a message
// ch.ack(msg)
}
}, { noAck: false })
}
}
require('amqplib/callback_api')
.connect('amqp://guest:guest@localhost', function (err, conn) {
if (err != null) bail(err)
consumer(conn)
})https://stackoverflow.com/questions/46654531
复制相似问题