我一直在尝试为Spring cloud stream kafka实现重试逻辑,以便在向主题sample-topic生成事件时抛出异常,它会再重试两次。
我在application.properties文件中添加了以下配置
spring.cloud.stream.bindings.processSampleEvent.destination=sample-topic
spring.cloud.stream.bindings.processSampleEvent.content-type=application/json
spring.cloud.stream.bindings.processSampleEvent.consumer.maxAttempts=2我已经编写了lister代码,它只是记录接收到的消息并抛出一个NullPointerException,这样我就可以测试重试。
@StreamListener(ListenerBind.SAMPLE_CHANNEL)
public void processSampleEvent(String productEventDto) {
System.out.println("Entering listener: " + productEventDto);
throw new NullPointerException();
}但是当通过向sample-topic生成一个事件进行测试时,我在日志中看到该事件已经重试了20次,但我在属性中指定了仅重试两次,而且当我更改它3次时,会发生一件奇怪的事情,它会重试30次。
我是Spring cloud streams的新手,在这方面的任何帮助都会非常有帮助。提前感谢
发布于 2021-07-06 23:28:55
侦听器容器中的默认错误处理程序现在是具有10次传递尝试的SeekToCurrentErrorHandler。
您可以禁用绑定器中的重试,并使用所需的重试语义配置STCEH,或者在绑定器中使用重试,并使用简单的LoggingErrorHandler替换默认的错误处理程序。
要配置容器的错误处理程序,请添加一个ListenerContainerCustomizer<AbstractKafkaListenerContainerFactory> @Bean。
发布于 2022-01-13 15:48:27
我也面临着同样的问题。我的工作解决方案是创建一个ListenerContainerCustomizer Bean,赋予它所需的最大尝试次数,并设置消费者绑定maxAttempts: 1
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?,?>> listenerContainerCustomizer(){
return (container, dest, group) ->
container.setErrorHandler(containerAwareErrorHandler());
}
public SeekToCurrentErrorHandler containerAwareErrorHandler(){
return new SeekToCurrentErrorHandler(new FixedBackOff(0, maxAttempts-1);
}https://stackoverflow.com/questions/68269486
复制相似问题