首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何设置卡夫卡KeyValueStore的值反/序列化器?

如何设置卡夫卡KeyValueStore的值反/序列化器?
EN

Stack Overflow用户
提问于 2019-07-03 08:18:09
回答 1查看 493关注 0票数 1

我正在将来自卡夫卡主题的消息存储在KeyValueStore中,以便以后查询它们。我创建一个KTable如下所示:

@StreamListener public void process(@Input("input") KTable<String,MyMessage> myMessages) {

我将application.yml中的使用者配置为:

更新的反序列化程序包

spring.cloud.stream.kafka.streams.bindings.input: consumer: materializedAs: all-messages key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: com.me.MyMessageDeserializer key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: com.me.MyMessageSerializer

但是,当我从KeyValueStore读取键时,键作为String正确地返回,但是返回的值是Byte数组,而不是MyMessage。由于某些原因,我的自定义反序列化器没有被使用。我试图自己反序列化消息,但是反序列化器崩溃了,出现了异常。我在序列化程序上放置了一个断点,它从未被调用过。我很清楚,我的序列化程序和反序列化器都没有被使用。

我缺少什么配置,以便使用自定义值反/序列化程序?反/序列化器是否需要在要查找的特定包中?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-07-03 12:00:54

application.yml中使用了错误的配置键。而不是键反序列化:它应该是keySerde:而不是值反序列化:它应该是valueSerde。以下是正确的配置:

spring.cloud.stream.kafka.streams.bindings.input: consumer: materializedAs: all-messages keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde valueSerde: com.me.MyMessageSerde producer: keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde valueSerde: com.me.MyMessageSerde

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/56865774

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档