首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >SpringBoot微服务@StreamListener在抛出RunTimeException时不限次数地重试

SpringBoot微服务@StreamListener在抛出RunTimeException时不限次数地重试
EN

Stack Overflow用户
提问于 2018-08-27 23:47:08
回答 3查看 2.7K关注 0票数 1

我有一个@StreamListener方法,它将执行REST调用。当REST调用返回异常时,@StreamListener方法抛出RunTimeException并重试。@StreamListener方法抛出RuntimeException时,重试次数不限

Spring云流重试配置:

代码语言:javascript
复制
spring.cloud.stream.kafka.bindings.inputChannel.consumer.enableDlq=true
spring.cloud.stream.bindings.inputChannel.consumer.maxAttempts=3
spring.cloud.stream.bindings.inputChannel.consumer.concurrency=3
spring.cloud.stream.bindings.inputChannel.consumer.backOffInitialInterval=300000
spring.cloud.stream.bindings.inputChannel.consumer.backOffMaxInterval=600000

SpringBoot微服务依赖项版本:

代码语言:javascript
复制
Spring Boot 2.0.3
Spring Cloud Stream Elmhurst.RELEASE
Kafka broker 1.1.0
EN

回答 3

Stack Overflow用户

发布于 2019-11-05 15:36:48

使用RetryTemplate或增加maxAttempts属性有一个限制,即重试应该在max.poll.interval.ms内完成,否则Kafka broker会认为消费者停机,并将分区重新分配给另一个消费者(如果可用)。

另一种选择是让监听器使用consumer.seek方法重新读取来自Kafka的相同消息。

代码语言:javascript
复制
@StreamListener("events")
public void handleEvent(@Payload String eventString, @Header(KafkaHeaders.CONSUMER) Consumer<?, ?> consumer,
                             @Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partitionId,
                             @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                             @Header(KafkaHeaders.OFFSET) String offset) {
   try {
       //do the logic (example: REST call)
   } catch (Exception e) { // Catch only specific exceptions that can be retried
        consumer.seek(new TopicPartition(topic, Integer.parseInt(partitionId)), Long.parseLong(offset));
   }
}
票数 1
EN

Stack Overflow用户

发布于 2018-08-28 20:08:27

当然,您可以将尝试次数(maxAttempts属性)增加到类似于Integer.MAX_VALUE的值,也可以提供您自己的RetryTemplate bean的一个实例,该实例可以根据您的需要进行配置。这里是你可以获得更多信息的地方https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_retry_template

票数 0
EN

Stack Overflow用户

发布于 2018-09-06 15:31:45

经过几次试错,我们发现kafka配置:max.poll.interval.ms默认为5分钟。由于我们的消费者重试机制,对于最坏的情况,我们的整个重试过程将需要15分钟。

因此,在第一条消息被消费5分钟后,kafka分区确定消费者没有提供任何响应,进行自动平衡,并将相同的消息分配给另一个分区。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52042849

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档