这里是Java & RabbitMQ。我需要实现某种毒丸模式,在处理特定消息时,使用者需要取消自己,停止接收/处理任何进一步的消息。停下来清理干净。消息杀死使用者并释放线程、内存等。
我看到消费者有一个handleCancel方法,他们可以实现这个方法来响应来自外部的取消命令,但是我如何处理消费者内部的一条毒丸消息,该消息告诉消费者死亡?
发布于 2022-08-17 12:25:48
我不认为RabbitMQ出于某种原因来处理这个场景。
我的解决方案似乎正在奏效:
Terminating
Processing (默认)和its (在Processing状态下),它消耗和处理队列之外的消息,就像往常一样。当它接收到神奇的毒药药丸(可能是消息头/属性中的值,或者消息本身中的特定值)时,它会将其状态设置为Terminating,而不处理该消息。它还使用异步事件总线向外部处理程序发送自定义ShutdownConsumerEvent。此事件通过发送给使用者的ShutdownConsumerEvent event = new ShutdownConsumerEvent(channel, consumerTag);)
consumerTag进行实例化(例如,使用者在Terminating状态下接收到的更多消息被重新发布到队列中,启用ACK以避免丢失ACK,并让pseudo-transactionality
ShutdownConsumerSubscriber (已注册的接收ShutdownConsumerEvents的处理程序)接收关闭使用者的命令,通过发出channel.basicCancel(consumerTag)来完成此操作。
https://stackoverflow.com/questions/73375828
复制相似问题