我有一个关于RabbitMq消费者确认的查询,我阅读了RabbitMq上的文档,声明确认消息应该在消费者接收到的同一个通道上。但是,由于某种原因,在收到消息之后,由于某种原因消费者进程被停止,并且没有确认Rabbitmq,当消费者进程重新启动时,消费者开始从RabbitMq获得未被确认的消息,但是在这里,使用者不能向这些消息发送应答,因为我收到了一个通道异常,声明标记不属于该通道。因此,我的问题是如何处理这个场景,以及如何在我的使用者进程读取完消息之后确认应该删除该消息?
发布于 2018-11-22 09:13:00
正如你所说的,确认必须在同一个信道上发送。
确认必须在它接收到的相同的通道上发送。尝试使用不同的信道进行确认将导致信道级协议异常。
这样做的简单方法是使用autoack=true,因此消息一旦被使用就会自动确认。
boolean autoAck = true; // acknowledgment is covered below
channel.basicConsume(TASK_QUEUE_NAME, autoAck, consumer);编辑
如果auto_ack不适合您,您可以使用channel_consumer.basicCancel(consumerTag);
就像这样:
final Consumer consumer = new DefaultConsumer(channel_consumer) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
try {
channel_consumer.basicCancel(consumerTag);
System.out.println(" [x] stopping" + message + "'");
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(" [x] elaborated getting ack" + message + "'");
channel_consumer.basicAck(envelope.getDeliveryTag(), false);
} finally {
System.out.println(" [x] Done");
}
}
};
boolean autoAck = false; // acknowledgment is covered below
channel_consumer.basicConsume("test", autoAck, consumer);https://stackoverflow.com/questions/53426824
复制相似问题