我有一个@StreamListener方法,它将执行REST调用。当REST调用返回异常时,@StreamListener方法抛出RunTimeException并重试。@StreamListener方法抛出RuntimeException时,重试次数不限
Spring云流重试配置:
spring.cloud.stream.kafka.bindings.inputChannel.consumer.enableDlq=true
spring.cloud.stream.bindings.inputChannel.consumer.maxAttempts=3
spring.cloud.stream.bindings.inputChannel.consumer.concurrency=3
spring.cloud.stream.bindings.inputChannel.consumer.backOffInitialInterval=300000
spring.cloud.stream.bindings.inputChannel.consumer.backOffMaxInterval=600000SpringBoot微服务依赖项版本:
Spring Boot 2.0.3
Spring Cloud Stream Elmhurst.RELEASE
Kafka broker 1.1.0发布于 2019-11-05 15:36:48
使用RetryTemplate或增加maxAttempts属性有一个限制,即重试应该在max.poll.interval.ms内完成,否则Kafka broker会认为消费者停机,并将分区重新分配给另一个消费者(如果可用)。
另一种选择是让监听器使用consumer.seek方法重新读取来自Kafka的相同消息。
@StreamListener("events")
public void handleEvent(@Payload String eventString, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partitionId,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
@Header(KafkaHeaders.OFFSET) String offset) {
try {
//do the logic (example: REST call)
} catch (Exception e) { // Catch only specific exceptions that can be retried
consumer.seek(new TopicPartition(topic, Integer.parseInt(partitionId)), Long.parseLong(offset));
}
}发布于 2018-08-28 20:08:27
当然,您可以将尝试次数(maxAttempts属性)增加到类似于Integer.MAX_VALUE的值,也可以提供您自己的RetryTemplate bean的一个实例,该实例可以根据您的需要进行配置。这里是你可以获得更多信息的地方https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_retry_template
发布于 2018-09-06 15:31:45
经过几次试错,我们发现kafka配置:max.poll.interval.ms默认为5分钟。由于我们的消费者重试机制,对于最坏的情况,我们的整个重试过程将需要15分钟。
因此,在第一条消息被消费5分钟后,kafka分区确定消费者没有提供任何响应,进行自动平衡,并将相同的消息分配给另一个分区。
https://stackoverflow.com/questions/52042849
复制相似问题