我对Kafka Streams有个问题(0.10.1.1)。我正在尝试就同一主题创建一个KStream和一个KTable。
我尝试的第一种方法是简单地在同一主题上调用流和表的KStreamBuilder方法。这导致了
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: Topic <topicName> has already been registered by another source.好吧,这似乎是一些限制-卡夫卡流。
我的第二种方法是最初创建一个KTable并在其上使用toStream()方法。这有一个问题,就是KTables做了一些内部缓冲/刷新,所以如果一个键发生多次,输出流就不会反映所有的输入元素。这是一个问题,因为我正在计算一个键的出现情况。
看起来可行的方法是最初创建一个KStream,按键对其进行分组,然后通过丢弃旧的聚合并保留新的值来“减少”它。我对这种方法并不满意,因为它看起来非常复杂,b) Reducer接口没有指定哪个是已经聚合的值,哪个是新的值。我参加了会议,保留了第二次,但是.嗯。
所以问题归结为:有更好的方法吗?我是不是漏掉了一些显而易见的东西?
请记住,我不是在处理一个正确的用例--这只是让我了解Streams-API。
发布于 2017-02-17 23:30:49
关于添加两次主题:这是不可能的,因为Kafka流应用程序是一个单一的“使用者组”,因此只能为一个主题提交一次偏移,而添加两次主题将表明主题的使用者获得了两次(以及独立的进度)。
对于KTable#toStream()方法,可以通过StreamsConfig参数cache.max.bytes.buffering == 0禁用缓存。但是,这是一个全局设置,并禁用所有KTable (cf )的缓存/去重复。http://docs.confluent.io/current/streams/developer-guide.html#memory-management)。
更新:自卡夫卡0.11以来,可以通过
Materialized参数单独禁用每个KTable的缓存。
groupBy方法也可以工作,即使它需要一些样板。我们考虑将KStream#toTable()添加到API中以简化此转换。是的,reduce中的第二个参数是新值--因为reduce是用来组合两个值的,所以API没有“旧”和“新”的概念,因此参数没有这样的命名。
https://stackoverflow.com/questions/42306086
复制相似问题