我正在将来自卡夫卡主题的消息存储在KeyValueStore中,以便以后查询它们。我创建一个KTable如下所示:
@StreamListener public void process(@Input("input") KTable<String,MyMessage> myMessages) {
我将application.yml中的使用者配置为:
更新的反序列化程序包
spring.cloud.stream.kafka.streams.bindings.input: consumer: materializedAs: all-messages key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: com.me.MyMessageDeserializer key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: com.me.MyMessageSerializer
但是,当我从KeyValueStore读取键时,键作为String正确地返回,但是返回的值是Byte数组,而不是MyMessage。由于某些原因,我的自定义反序列化器没有被使用。我试图自己反序列化消息,但是反序列化器崩溃了,出现了异常。我在序列化程序上放置了一个断点,它从未被调用过。我很清楚,我的序列化程序和反序列化器都没有被使用。
我缺少什么配置,以便使用自定义值反/序列化程序?反/序列化器是否需要在要查找的特定包中?
发布于 2019-07-03 12:00:54
application.yml中使用了错误的配置键。而不是键反序列化:它应该是keySerde:而不是值反序列化:它应该是valueSerde。以下是正确的配置:
spring.cloud.stream.kafka.streams.bindings.input: consumer: materializedAs: all-messages keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde valueSerde: com.me.MyMessageSerde producer: keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde valueSerde: com.me.MyMessageSerde
https://stackoverflow.com/questions/56865774
复制相似问题