首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >卡夫卡流-获得KTable和KStream相同主题的最佳方式?

卡夫卡流-获得KTable和KStream相同主题的最佳方式?
EN

Stack Overflow用户
提问于 2017-02-17 19:37:56
回答 1查看 3.4K关注 0票数 8

我对Kafka Streams有个问题(0.10.1.1)。我正在尝试就同一主题创建一个KStream和一个KTable

我尝试的第一种方法是简单地在同一主题上调用流和表的KStreamBuilder方法。这导致了

代码语言:javascript
复制
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Topic <topicName> has already been registered by another source.

好吧,这似乎是一些限制-卡夫卡流。

我的第二种方法是最初创建一个KTable并在其上使用toStream()方法。这有一个问题,就是KTables做了一些内部缓冲/刷新,所以如果一个键发生多次,输出流就不会反映所有的输入元素。这是一个问题,因为我正在计算一个键的出现情况。

看起来可行的方法是最初创建一个KStream,按键对其进行分组,然后通过丢弃旧的聚合并保留新的值来“减少”它。我对这种方法并不满意,因为它看起来非常复杂,b) Reducer接口没有指定哪个是已经聚合的值,哪个是新的值。我参加了会议,保留了第二次,但是.嗯。

所以问题归结为:有更好的方法吗?我是不是漏掉了一些显而易见的东西?

请记住,我不是在处理一个正确的用例--这只是让我了解Streams-API。

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-02-17 23:30:49

关于添加两次主题:这是不可能的,因为Kafka流应用程序是一个单一的“使用者组”,因此只能为一个主题提交一次偏移,而添加两次主题将表明主题的使用者获得了两次(以及独立的进度)。

对于KTable#toStream()方法,可以通过StreamsConfig参数cache.max.bytes.buffering == 0禁用缓存。但是,这是一个全局设置,并禁用所有KTable (cf )的缓存/去重复。http://docs.confluent.io/current/streams/developer-guide.html#memory-management)。

更新:自卡夫卡0.11以来,可以通过Materialized参数单独禁用每个KTable的缓存。

groupBy方法也可以工作,即使它需要一些样板。我们考虑将KStream#toTable()添加到API中以简化此转换。是的,reduce中的第二个参数是新值--因为reduce是用来组合两个值的,所以API没有“旧”和“新”的概念,因此参数没有这样的命名。

票数 7
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/42306086

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档