我很难理解如何解决这个问题,所以我在这里提出这个问题,希望其他人已经面临同样的问题。我们正在使用手动ack模式运行@KafkaListener,并运行一个死信恢复程序,其重试限制为3。由于业务逻辑,当给定某些情况(外部依赖关系)时,我们不会破坏消息并暂停5分钟,因此需要手动ack模式。
此外,我们确实需要死信队列来处理由于某些原因而不能处理的消息。
现在,手动ack模式的问题是,当侦听器/使用者到达重试限制并将其移动到dl队列时,他不会确认消息。
如果消费者服务将被重新启动,他将一次又一次地尝试使用这些消息,并将它们移动到dl队列中。
我们有什么办法解决这个问题吗?
感谢和问候汉堡!
发布于 2018-12-14 14:50:05
如果可能的话,我会尽量避免使用手动acks;也许可以通过增加max.poll.interval.ms来避免。
如果使用AckMode.MANUAL_IMMEDIATE,那么在错误处理程序中直接在Consumer上执行提交是安全的。
子类SeekToCurrentErrorHandler并重写handle(),如果super.handle()不抛出异常,这意味着超过重试,您可以在Consumer上提交偏移量。
发布于 2019-11-17 19:14:06
对于提供给commitRecovered的SeekToCurrentErrorHandler实例,可以将ContainerListenerFactory设置为true。
参考文献这里
公用空setCommitRecovered(布尔commitRecovered) 设置为true以提交恢复记录的偏移量。容器必须>用ContainerProperties.AckMode.MANUAL_IMMEDIATE配置。>commit是同步还是异步取决于容器的syncCommits属性。
https://stackoverflow.com/questions/53781327
复制相似问题