首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何让RabbitMQ重新发送未确认的消息?

如何让RabbitMQ重新发送未确认的消息?
EN

Stack Overflow用户
提问于 2017-10-10 04:33:27
回答 2查看 1.3K关注 0票数 0

我将严格遵循以下教程:https://www.rabbitmq.com/tutorials/tutorial-two-java.html

我这样启动RabbitMQ服务器:

代码语言:javascript
复制
docker pull rabbitmq
docker run -d --hostname my-rabbit-host --name my-rabbit -p 5672:5672 rabbitmq:3

来自本教程:

使用此代码我们可以确保,即使您在处理消息时使用CTRL+C杀死一个工作人员,也不会有任何损失。工作进程死后不久,所有未确认的消息都将被重新传递。

我产生了两个消费者,当我对其中一个消费者执行CTRL+C操作时,另一个正在运行的消费者不会收到最初发往前一个消费者的消息。如何在其中一个消费者发出CTRL+C‘’ing后重新发送消息?

编辑:我现在正在通过'brew‘安装RabbitMQ,但我仍然看到同样的问题。

代码语言:javascript
复制
brew update
brew install rabbitmq
/usr/local/sbin/rabbitmq-server &
EN

回答 2

Stack Overflow用户

发布于 2017-10-10 14:47:06

没有必要将睡眠或类似的东西放在消费者代码中。在您提供的链接上,搜索以手动消息确认开头的段落,并查看其中的代码。关键是不要承认这条消息。如果您将autoACK标志设置为true,那么您可以调用任何您想调用的内容,消息一收到就会得到确认。所以简单地不要设置该标志,而且为了测试,您可以注释掉行channel.basicAck(envelope.getDeliveryTag(), false);,以便也不进行手动确认。因此,当使用者退出时,消息仍将在队列中。

票数 0
EN

Stack Overflow用户

发布于 2019-03-25 02:40:05

奇怪的是,RabbitMQ为我开箱即用。

Step 1,我启动了RabbitMQ:

代码语言:javascript
复制
$ docker run -d --hostname my-rabbit-host --name my-rabbit -p 5672:5672 rabbitmq:3
$ docker ps
CONTAINER ID        IMAGE               COMMAND                  CREATED             STATUS              PORTS                                                   NAMES
e0c3257b8b49        rabbitmq:3          "docker-entrypoint.s…"   18 minutes ago      Up 14 minutes       4369/tcp, 5671/tcp, 25672/tcp, 0.0.0.0:5672->5672/tcp   my-rabbit

第2步,我发布了一条消息(顺便说一句,我尝试使用Node.js。源代码见下面附录):

代码语言:javascript
复制
$ node src/producer.js
Publisher:  TODO 1st

Step 3,我一个接一个地启动了两个消费者(我的消费者是为了测试目的而设计的,不是用来确认的,所以RabbitMQ永远不会将消息出队)。

消费者1将收到消息,而消费者2不会。

消费者1:

代码语言:javascript
复制
$ node src/consumer.js 
Consumer:  TODO 1st

消费者2:

代码语言:javascript
复制
$ node src/consumer.js 

Step 4,当我通过'Ctrl + c‘停止消费者1时,消费者2将立即收到来自RabbitMQ的消息:

消费者2:

代码语言:javascript
复制
$ node src/consumer.js 
Consumer:  TODO 1st

结论:基本上,在设置消费者时,我们需要告诉RabbitMQ在收到消费者的确认之前不要将消息出队。因此,如果消费者1在它有机会确认消息之前由于任何原因而停止,RabbitMQ将把消息重新传递给消费者2。

附录

src/producer.js

代码语言:javascript
复制
var q = 'tasks'

function bail (err) {
  console.error(err)
  process.exit(1)
}

// Publisher
function publisher (conn) {
  conn.createChannel(onOpen)
  function onOpen (err, ch) {
    if (err != null) bail(err)
    ch.assertQueue(q)
    const msg = 'TODO 1st'
    ch.sendToQueue(q, Buffer.from(msg), { persistent: true })
    console.log('Publisher: ', msg)
  }
}

require('amqplib/callback_api')
  .connect('amqp://guest:guest@localhost', function (err, conn) {
    if (err != null) bail(err)
    publisher(conn)
  })

src/consumer.js

代码语言:javascript
复制
var q = 'tasks'

function bail (err) {
  console.error(err)
  process.exit(1)
}

// Consumer
function consumer (conn) {
  conn.createChannel(onOpen)
  function onOpen (err, ch) {
    if (err != null) bail(err)
    ch.assertQueue(q)
    ch.consume(q, function (msg) {
      if (msg !== null) {
        console.log('Consumer: ', msg.content.toString())
        // Commented out the line below, so RabbitMQ never dequeues a message
        // ch.ack(msg)
      }
    }, { noAck: false })
  }
}

require('amqplib/callback_api')
  .connect('amqp://guest:guest@localhost', function (err, conn) {
    if (err != null) bail(err)
    consumer(conn)
  })
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46654531

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档