我们最近将我们的kafka版本从0.10更新到1.0,我正在更新过时的代码
KTable<Long, myClass> myKTable = this.streamBuilder
.stream(Serdes.Long(), mySerde, sub_topic)
.groupByKey(Serdes.Long(), mySerde)
.reduce(myReducer, my_store);到这个
KTable<Long, myClass> myKTable = this.streamBuilder
.stream(sub_topic, Consumed.with(Serdes.Long(), mySerde))
.groupByKey(Serialized.with(Serdes.Long(), mySerde))
.reduce(myReducer, Materialized.as(my_store));在groupByKey中序列化时,我的流抛出一个错误。Serialized.with()不使用提供的keySerde,默认为byteArray。然后这个byteArray serde遇到我的密钥,它是一个Long,并抛出一个强制转换错误。
是否有人在kafka的1.0.0版本中遇到过此错误。使用过时版本的kafka的第一个代码运行良好。但是更新代码以使用Serialized.with()似乎不起作用。任何帮助都是非常感谢的。
发布于 2018-05-17 05:28:47
你能分享堆栈跟踪吗?实际上,我认为问题出在reduce()上--您需要再次通过Materialized指定Serdes。
这是对新API的一种回归,最近在trunk:https://github.com/apache/kafka/pull/4919中得到了修复,因此,即将发布的2.0版本将包含该修复。
https://stackoverflow.com/questions/50379736
复制相似问题