我有一个非常简单的使用者,通过它我创建了一个物化视图。我已经启用了对我的值对象的验证(对无效的json数据抛出Constraintviolationexception )。当我收到验证失败的值时,会将值显示给日志&使用者应该像LogAndContinueExceptionHandler启用那样读取下一个偏移量。
但是,从来没有调用过LogAndContinueExceptionHandler,并且consumePojo状态从PENDING_ERROR到错误的转换
代码
@Bean
public Consumer<KTable<String, Pojo>> consume() {
return values->
values
.filter((key, value) -> Objects.nonNull(key))
.mapValues(value-> value, Materialized.<String, Pojo>as(Stores.inMemoryKeyValueStore("POJO_STORE_NAME"))
.withKeySerde(Serdes.String())
.withValueSerde(SerdeUtil.pojoSerde())
.withLoggingDisabled())
.toStream()
.peek((key, value) -> log.debug("Receiving Pojo from topic with key: {}, and UUID: {}", key, value == null ? 0 : value.getUuid()));
}为什么在KTable的情况下不调用LogAndContinueExceptionHandler?
注意:如果代码被更改为KStreams,那么我会看到日志记录和记录被跳过,但是KTable没有!!
发布于 2022-11-09 08:43:26
为了处理不是由Kafka使用KafkaStreams.setUncaughtExceptionHandler方法和StreamsUncaughtExceptionHandler实现处理的异常,需要返回3个可用枚举中的一个:
在您的例子中,REPLACE_THREAD是最好的选择,正如您在基普-671中所看到的
REPLACE_THREAD:当前线程处于关闭状态,并传输到死状态。如果Kafka流客户端处于状态运行或再平衡状态,则启动一个新线程。对于全局线程,此选项将记录错误并恢复到关闭客户端,直到添加了该选项
在Spring中,可以将默认的StreamsUncaughtExceptionHandler替换为StreamsBuilderFactoryBean
@Autowired
void setMyStreamsUncaughtExceptionHandler(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
streamsBuilderFactoryBean.setStreamsUncaughtExceptionHandler(exception -> StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.REPLACE_THREAD);
}发布于 2022-11-09 13:54:00
在仔细查看日志之后,我能够解决这个问题,我发现Pojo的valueSerde显示了useNativeDecoding (默认是JsonSerde),因为这个DeserializationExceptionHandler没有被调用和线程终止。
当我在application.properties中修复application.properties时,问题就消失了
https://stackoverflow.com/questions/74363204
复制相似问题