我已经配置了一个errorHandler,它将记录发送到DLT
@Bean
public DeadLetterPublishingRecoverer recoverer(KafkaTemplate<String, List<T>> template) {
return new DeadLetterPublishingRecoverer(template);
}
@Bean
public ErrorHandler errorHandler(DeadLetterPublishingRecoverer recoverer) {
return new SeekToCurrentErrorHandler(recoverer, new FixedBackOff(0L, 2L));
}我使用这个DLT捕获由ErrorHandlingDeserializer2处理的序列化错误。
我面临的问题是DLT中的消息采用base64编码格式,而不是普通字符串。
生产者

DLT消费者

test message以dGVzdCBtZXNzYWdl的形式进入DLT
另一个问题是由于base64编码增加了请求大小。
我想让消息像DLT一样传出去。
你能不能给我指点。
发布于 2020-06-01 20:34:07
显示您的生产者工厂配置。
也许你使用的是一个普通的生产者工厂?
最好的猜测是您在那里使用的是JsonSerializer而不是ByteArraySerializer。
使用版本2.5,您可以覆盖每个KafkaTemplate中的生产者工厂配置。
https://stackoverflow.com/questions/62139970
复制相似问题