首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在春云流DefaultKafkaHeaderMapper中无法解码json类型

在春云流DefaultKafkaHeaderMapper中无法解码json类型
EN

Stack Overflow用户
提问于 2020-12-14 07:11:01
回答 1查看 1.8K关注 0票数 4

我们正在使用春云流,并计划升级我们的卡夫卡版本。

我们的应用程序使用spring-cloud-stream:2.0.0 (Spring-Kafka2.1.7)和apache服务器1.0.1,并使用spring-cloud-sleuth:2.0.0进行跟踪。

我们将把卡夫卡服务器升级到版本2.3.0,因此它需要用spring-cloud-sleuth:2.2.0spring-cloud-stream:3.0.3 (Horsham.SR3)升级到spring-boot 2.2.x (Hoxton)

我们有200个应用程序使用Kafka,因此升级将逐步进行,因此,作为中间状态,我们将有新版本的生产者和使用旧版本的消费者。

我们的消费者正在使用@StreamListener

在测试过程中,我们遇到了使用String类型解析大多数标头的问题,并获得了以下内容:

代码语言:javascript
复制
ERROR 27448 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$4  : Could not decode json type: ecb89ccb3e79418b for key: X-B3-TraceId
com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'ecb89ccb3e79418b': was expecting ('true', 'false' or 'null')
 at [Source: (byte[])"ecb89ccb3e79418b"; line: 1, column: 33]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:679) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3526) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2621) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:826) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723) ~[jackson-core-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:4141) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4000) ~[jackson-databind-2.9.6.jar:2.9.6]
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3091) ~[jackson-databind-2.9.6.jar:2.9.6]
    at org.springframework.kafka.support.DefaultKafkaHeaderMapper.lambda$toHeaders$1(DefaultKafkaHeaderMapper.java:233) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at java.lang.Iterable.forEach(Iterable.java:75) ~[na:1.8.0_221]
    at org.springframework.kafka.support.DefaultKafkaHeaderMapper.toHeaders(DefaultKafkaHeaderMapper.java:216) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$4.toHeaders(KafkaMessageChannelBinder.java:554) ~[spring-cloud-stream-binder-kafka-2.0.0.RELEASE.jar:2.0.0.RELEASE]
    at org.springframework.kafka.support.converter.MessagingMessageConverter.toMessage(MessagingMessageConverter.java:106) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:229) ~[spring-kafka-2.1.7.RELEASE.jar:2.1.7.RELEASE]
...

类型标头是:

代码语言:javascript
复制
{spanTraceId=java.lang.String, spanId=java.lang.String, spanParentSpanId=java.lang.String, nativeHeaders=org.springframework.util.LinkedMultiValueMap, X-B3-SpanId=java.lang.String, X-B3-ParentSpanId=java.lang.String, scst_partition=java.lang.Integer, X-B3-Sampled=java.lang.String, X-B3-TraceId=java.lang.String, spanSampled=java.lang.String, contentType=java.lang.String}

例如,Sleuth添加的ecb89ccb3e79418b类型为string,其值为:X-B3-SpanId,它不是JSON字符串,因此ObjectMapper 在转换为String对象时失败

代码语言:javascript
复制
headers.put(h.key(), getObjectMapper().readValue(h.value(), type))

看起来,当我们有字符串类型时,它不应该使用ObjectMapper,因此我们的旧用户失败了。

在使用新生产者和旧消费者时,是否有办法防止这一问题?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-12-14 15:13:01

您可以将DefaultKafkaHeaderMapper配置为与旧版本兼容:

代码语言:javascript
复制
    /**
     * Set to true to encode String-valued headers as JSON ("..."), by default just the
     * raw String value is converted to a byte array using the configured charset. Set to
     * true if a consumer of the outbound record is using Spring for Apache Kafka version
     * less than 2.3
     * @param encodeStrings true to encode (default false).
     * @since 2.3
     */
    public void setEncodeStrings(boolean encodeStrings) {
        this.encodeStrings = encodeStrings;
    }

还请参见https://docs.spring.io/spring-cloud-stream-binder-kafka/docs/3.0.10.RELEASE/reference/html/spring-cloud-stream-binder-kafka.html#_kafka_binder_properties

spring.cloud.stream.kafka.binder.headerMapperBeanName

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/65284940

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档