我正在使用编写一个Java应用程序。下面是我使用的functional片段:
@Bean
public Function<KStream<String, String>, KStream<String, String>> process() {
return input ->
input.transform(
() ->
new Transformer<String, String, KeyValue<String, String>>() {
ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void close() {}
@Override
public KeyValue<String, String> transform(String key, String value) {
String result = fetch_data_from_database(key, value);
return new KeyValue<>(key, result);
}
});fetch_data_from_database()可以抛出异常。
在发生来自KStream()的异常时,如何停止对入站fetch_from_database(偏移量不应提交)的处理,并使其使用相同的偏移量数据重试处理?
发布于 2020-08-31 21:32:32
在这种情况下,您需要自己重新尝试逻辑。为此,您可以使用Spring的RetryTemplate。This answer有关于如何在卡夫卡流中使用RetryTemplate的详细信息。它不像您所使用的那样使用低级别的处理器API,但这是相同的想法。将数据库调用包装在重试模板中,并根据需求自定义重试。任何上游处理都将暂停,直到重试耗尽为止。
https://stackoverflow.com/questions/63613717
复制相似问题