首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在Kafka Stream Topology中可以多次使用topic吗?

在Kafka Stream Topology中可以多次使用topic吗?
EN

Stack Overflow用户
提问于 2020-04-23 22:21:02
回答 1查看 169关注 0票数 0

让我们假设groupby函数在kafka streams中不可用。我是否可以执行以下操作来获取字数并在其上构建一个KTable?请注意,我在拓扑中使用了两次"word-count-topic“。我有一个用例,我想要迭代地构建一些东西,对于下一个流事件,我想要查找以前的值并基于事件更新它。我希望在构建Ktable的同一主题中保持最新的值。

代码语言:javascript
复制
KTable<String,Long> wordCountTable = builder.table("word-count-topic",Consumed.with(Serdes.String(), Serdes.Long()));

KStream<String,String> wordsStream = builder.stream("words-topic",Consumed.with(Serdes.String(), Serdes.String()));

KStream<String,String> msgStream = wordsStream
                                   .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+")))
                                   .selectKey((k,v) -> v);

msgStream.leftJoin(kTable, (word,count) -> {
                                             if( count == null) return new WordCount(word, Long.valueOf(1));
                                             else return new WordCount(word, count + 1);
                                           })
            .mapValues((k,v)-> v.getCount())
            .to("word-count-topic", Produced.with(Serdes.String(), Serdes.Long()));

streams = new KafkaStreams(builder.build(), props);
streams.start();
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2020-04-27 03:20:42

这应该行得通。为什么不直接运行代码呢?

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61389500

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档