首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >卡夫卡流数据处理中的错误处理方法

卡夫卡流数据处理中的错误处理方法
EN

Stack Overflow用户
提问于 2020-08-27 10:08:13
回答 1查看 701关注 0票数 1

我正在使用编写一个Java应用程序。下面是我使用的functional片段:

代码语言:javascript
复制
@Bean
public Function<KStream<String, String>, KStream<String, String>> process() {
    return input ->
        input.transform(
            () ->
                new Transformer<String, String, KeyValue<String, String>>() {

                  ProcessorContext context;

                  @Override
                  public void init(ProcessorContext context) {
                    this.context = context;
                  }

                  @Override
                  public void close() {}

                  @Override
                  public KeyValue<String, String> transform(String key, String value) {                       
                         String result = fetch_data_from_database(key, value);
                         return new KeyValue<>(key, result);
                  }
});

fetch_data_from_database()可以抛出异常。

在发生来自KStream()的异常时,如何停止对入站fetch_from_database(偏移量不应提交)的处理,并使其使用相同的偏移量数据重试处理?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-08-31 21:32:32

在这种情况下,您需要自己重新尝试逻辑。为此,您可以使用Spring的RetryTemplateThis answer有关于如何在卡夫卡流中使用RetryTemplate的详细信息。它不像您所使用的那样使用低级别的处理器API,但这是相同的想法。将数据库调用包装在重试模板中,并根据需求自定义重试。任何上游处理都将暂停,直到重试耗尽为止。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63613717

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档