技术:
2.2.7
问题场景:夫妇kafka集群节点崩溃,kafka开始报告异常,而不是接受我们的提交。默认异步回调只是将这些错误打印到日志中。投票仍在继续。消息处理得很好(但没有提交)。10分钟后,prod支持在日志中发现错误。卡夫卡集群恢复了,苦难进程被反弹。由于提交被代理删除了一段时间,该进程处理了大约200万条重复消息。
问题:
(我们希望保持批处理和异步,因为没有批处理和异步,整个程序几乎无法使每秒产生大约1000条消息,这是很慢的)。
我将非常感谢你的建议/想法!谢谢!
理想情况下,我们希望它能够自我恢复(我们在scala +akka流的微服务中实现了自我恢复,因此我知道这是可行的)。
发布于 2021-03-08 15:43:32
2.2.x不再受支持。
现在有几种用于批处理错误处理程序的选项。
SeekToCurrentBatchErrorHandler (自2.1起--在2.3中添加的重试之间延迟的能力)-无限期地重放整个批,直到成功。
RetryingBatchErrorHandler (自2.3.7以来)-暂停使用者和重试(并退出);当重试用尽时,将每个记录发送给恢复程序(如DeadLetterPublishingRecoverer )。
RecoveringBatchErrorHandler (自2.5起)-如果侦听器抛出一个BatchListenerFailedException (指示批处理中的哪个记录失败),则在提交该记录之前对记录进行偏移,然后重新传递其余的记录(直到失败的记录被发送到恢复程序时,重试结束为止)。对于所有其他异常,它将返回到SeekToCurrentBatchErrorHandler,而不会后退。
但是,在使用异步提交时,不会引发任何异常,因此不会调用错误处理程序。
最好确保您的复制因子足以处理掉一些代理。
https://stackoverflow.com/questions/66513031
复制相似问题