首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >KTable & LogAndContinueExceptionHandler

KTable & LogAndContinueExceptionHandler
EN

Stack Overflow用户
提问于 2022-11-08 15:25:47
回答 2查看 29关注 0票数 0

我有一个非常简单的使用者,通过它我创建了一个物化视图。我已经启用了对我的值对象的验证(对无效的json数据抛出Constraintviolationexception )。当我收到验证失败的值时,会将值显示给日志&使用者应该像LogAndContinueExceptionHandler启用那样读取下一个偏移量。

但是,从来没有调用过LogAndContinueExceptionHandler,并且consumePojo状态从PENDING_ERROR到错误的转换

代码

代码语言:javascript
复制
@Bean
    public Consumer<KTable<String, Pojo>> consume() {
        return values-> 
                values
                .filter((key, value) -> Objects.nonNull(key))
                .mapValues(value-> value, Materialized.<String, Pojo>as(Stores.inMemoryKeyValueStore("POJO_STORE_NAME"))
                .withKeySerde(Serdes.String())
                .withValueSerde(SerdeUtil.pojoSerde())
                .withLoggingDisabled())
                .toStream()
                .peek((key, value) -> log.debug("Receiving Pojo from topic with key: {}, and UUID: {}", key, value == null ? 0 : value.getUuid()));
    }

为什么在KTable的情况下不调用LogAndContinueExceptionHandler?

注意:如果代码被更改为KStreams,那么我会看到日志记录和记录被跳过,但是KTable没有!!

EN

回答 2

Stack Overflow用户

发布于 2022-11-09 08:43:26

为了处理不是由Kafka使用KafkaStreams.setUncaughtExceptionHandler方法和StreamsUncaughtExceptionHandler实现处理的异常,需要返回3个可用枚举中的一个:

  • REPLACE_THREAD
  • SHUTDOWN_CLIENT
  • SHUTDOWN_APPLICATION

在您的例子中,REPLACE_THREAD是最好的选择,正如您在基普-671中所看到的

REPLACE_THREAD:当前线程处于关闭状态,并传输到死状态。如果Kafka流客户端处于状态运行或再平衡状态,则启动一个新线程。对于全局线程,此选项将记录错误并恢复到关闭客户端,直到添加了该选项

在Spring中,可以将默认的StreamsUncaughtExceptionHandler替换为StreamsBuilderFactoryBean

代码语言:javascript
复制
    @Autowired
    void setMyStreamsUncaughtExceptionHandler(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
        streamsBuilderFactoryBean.setStreamsUncaughtExceptionHandler(exception -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD);
    }
票数 0
EN

Stack Overflow用户

发布于 2022-11-09 13:54:00

在仔细查看日志之后,我能够解决这个问题,我发现Pojo的valueSerde显示了useNativeDecoding (默认是JsonSerde),因为这个DeserializationExceptionHandler没有被调用和线程终止。

当我在application.properties中修复application.properties时,问题就消失了

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/74363204

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档