我有一个要求将偏移量重置为一个数字。
详细需求:我的应用程序正在使用来自kafka主题的消息并将其转储到DB中,假设DB在处理所使用的消息(offset=10)时下降,直到DB关闭时,应用程序使用的消息一直到偏移量20。
现在DB在处理第20偏移量消息时再次出现,现在我希望将偏移量重新设置为10,这样就可以将数据保存在数据库中。
我可以通过编程(spring引导)实现这个目标吗?我用的是春云流活页夹卡夫卡。
发布于 2021-10-19 17:20:15
如果DB关闭,为什么继续接收消息?
只需配置侦听器容器就可以继续重试当前记录,直到DB再次可用为止。
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> listenerContainerCustomizer() {
return (container, topic, group) -> {
container.setErrorHandler(...);
};
}请参阅https://docs.spring.io/spring-kafka/docs/current/reference/html/#seek-to-current
https://stackoverflow.com/questions/69634967
复制相似问题