首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kafka 1.0.0 - Serialized.with()使用默认serde,而不是提供的serde

Kafka 1.0.0 - Serialized.with()使用默认serde,而不是提供的serde
EN

Stack Overflow用户
提问于 2018-05-17 04:55:05
回答 1查看 119关注 0票数 0

我们最近将我们的kafka版本从0.10更新到1.0,我正在更新过时的代码

代码语言:javascript
复制
KTable<Long, myClass> myKTable = this.streamBuilder
            .stream(Serdes.Long(), mySerde, sub_topic)
            .groupByKey(Serdes.Long(), mySerde)
            .reduce(myReducer, my_store);

到这个

代码语言:javascript
复制
KTable<Long, myClass> myKTable = this.streamBuilder
            .stream(sub_topic, Consumed.with(Serdes.Long(), mySerde))
            .groupByKey(Serialized.with(Serdes.Long(), mySerde))
            .reduce(myReducer, Materialized.as(my_store));

groupByKey中序列化时,我的流抛出一个错误。Serialized.with()不使用提供的keySerde,默认为byteArray。然后这个byteArray serde遇到我的密钥,它是一个Long,并抛出一个强制转换错误。

是否有人在kafka的1.0.0版本中遇到过此错误。使用过时版本的kafka的第一个代码运行良好。但是更新代码以使用Serialized.with()似乎不起作用。任何帮助都是非常感谢的。

EN

回答 1

Stack Overflow用户

发布于 2018-05-17 05:28:47

你能分享堆栈跟踪吗?实际上,我认为问题出在reduce()上--您需要再次通过Materialized指定Serdes

这是对新API的一种回归,最近在trunkhttps://github.com/apache/kafka/pull/4919中得到了修复,因此,即将发布的2.0版本将包含该修复。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50379736

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档