首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在reactor-kafka中重试失败的ConsumerRecord

如何在reactor-kafka中重试失败的ConsumerRecord
EN

Stack Overflow用户
提问于 2021-10-01 05:39:55
回答 1查看 189关注 0票数 3

我正在尝试reactor-kafka消费消息。其他一切都很好,但我想为失败的消息添加一个retry(2)。默认情况下,spring-kafka已经重试了3次失败的记录,我想用reactor-kafka来实现同样的效果。

我使用spring-kafka作为reactive-kafka的包装器。下面是我的消费者模板:

代码语言:javascript
复制
reactiveKafkaConsumerTemplate
                .receiveAutoAck()
                .map(ConsumerRecord::value)
                .flatMap(this::consumeWithRetry)
                .onErrorContinue((error, value)->log.error("something bad happened while consuming : {}", error.getMessage()))
                .retryWhen(Retry.backoff(30, Duration.of(10, ChronoUnit.SECONDS)))
                .subscribe();

让我们考虑一下us方法,如下所示

代码语言:javascript
复制
public Mono<Void> consume(MessageRecord message){
   return Mono.error(new RuntimeException("test retry"); //sample error scenario
}

我使用以下逻辑在失败时重试the方法。

代码语言:javascript
复制
public Mono<Void> consumeWithRetry(MessageRecord message){
   return consume(message)
          .retry(2);
}

如果当前的消费者记录出现异常而失败,我想重试消费消息。我试图用另一个retry(3)来包装the方法,但这并不能达到目的。最后一个retryWhen仅用于重试订阅kafka再平衡。

@simon-baslé@gary-russell

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-10-01 09:40:38

以前,在重试时,我使用了以下方法:

代码语言:javascript
复制
public Mono<Void> consumeWithRetry(MessageRecord message){
   return consume(message)
          .retry(2);
}

但这并不是重试。添加Mono.defer后,上面的代码可以工作,并添加了所需的重试。

代码语言:javascript
复制
public Mono<Void> consumeWithRetry(MessageRecord message){
   return Mono.defer(()->consume(message))
          .retry(2);
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69400971

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档