@StreamListener("input")
@SendTo("output")
public KStream<?, MyObject> process(KStream<Object, IncomingObject> input) {
KTable table = input.flatMapValues(value -> this.getMylogic(value));
return table.toStream();
}我试图将KStream转换为KTable,然后再转换为KStream,但是无法从KStream转换为KTable
价值是json。敬请帮助,我也如何使用聚合?
{
"name":"test",
address{
"localAddress":"myaddress",
"businessAddress":"testAddress"
}
}在神话的方法,我只拿地址发送到另一个主题。请帮帮忙
发布于 2018-07-23 21:08:26
您需要应用聚合函数,例如count作为结果获取KTable。否则,就没有办法做KStream -> KTable -> KStream了。
您需要并可以做的是KStream.count() (例如) -> KTable -> KStream。因此,农业的基本结果将被发布到KStream,这也可能被发表在卡夫卡的主题。
https://stackoverflow.com/questions/51473289
复制相似问题