我们使用https://github.com/reactor/reactor-kafka项目来实现Spring Reactive Kafka。但我们希望利用Kafka重试和恢复反应式Kafka的逻辑。有人能提供一些示例代码吗?
发布于 2021-09-21 07:19:01
由于您正在使用spring生态系统进行重试和恢复,因此可以使用spring-retry查看那里的文档spring -retry。在web上有足够的参考资料。
下面的一个示例是使用来自kafka主题和processing的消息。
使用的方法被标记为可重试,因此如果有异常处理,它将重试,如果重试不成功,则将调用相应的恢复方法。
public class KafkaListener{
@KafkaListener(topic="books-topic", id ="group-1")
@Retryable(maxAttempts = 3, value = Exception.class))
public void consuming(String message){
// To do message processing
// Whenever there is exception thrown from this method
// - it will retry 3 times in total
// - Even after retry we get exception then it will be handed of to below
// recover method recoverConsuming
}
@Recover
public void recoverConsuming(Exception exception, String message){
// Recovery logic
// you can implement your recovery scenario
}
}https://stackoverflow.com/questions/69263865
复制相似问题