首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用Kafka Streams DSL的两步窗口聚合

使用Kafka Streams DSL的两步窗口聚合
EN

Stack Overflow用户
提问于 2017-07-13 21:15:20
回答 1查看 3.4K关注 0票数 1

假设我有一个流" stream -1“,每秒包含1个数据点,我想要计算一个派生的流" stream -5”,它包含使用5秒的跳跃窗口的总和,另一个流" stream - 10“基于"stream-5”,包含使用10秒的跳跃窗口的总和。聚合需要分别为每个键完成,我希望能够在不同的进程中运行每个步骤。如果stream-5和stream-10包含相同键/时间戳的更新(所以我不一定需要How to send final kafka-streams aggregation result of a time windowed KTable?),只要最后一个值是正确的,这本身就不是问题。

有没有一种(简单的)方法可以使用高级Kafka Streams DSL解决这个问题?到目前为止,我还没有看到一种优雅的方法来处理由于聚合而在stream-5上产生的中间更新。

我知道中间更新可以通过cache.max.bytes.bufferingcommit.interval.ms设置来控制,但我不认为任何设置都能保证在所有情况下都不会产生中间值。我也可以尝试在读取时使用键的时间戳部分将"stream-5“转换为KTable,但是KTable似乎不支持像KStreams那样的窗口操作。

这就是我到目前为止所拥有的,由于stream-5上的中间聚合值而失败

代码语言:javascript
复制
Reducer<DataPoint> sum = new Reducer<DataPoint>() {                                                                           
    @Override                                                                                                                 
    public DataPoint apply(DataPoint x, DataPoint y) {                                                                        
        return new DataPoint(x.timestamp, x.value + y.value);                                                                 
    }                                                                                                                         
 };                                                                                                                           

 KeyValueMapper<Windowed<String>, DataPoint, String> strip = new 
           KeyValueMapper<Windowed<String>, DataPoint, String>() {      
      @Override                                                                                                               
      public String apply(Windowed<String> wKey, DataPoint arg1) {                                                            
          return wKey.key();                                                                                                  
      }                                                                                                                       
 };                                                                                                                           

KStream<String, DataPoint> s1 = builder.stream("stream-1");                                                                      

s1.groupByKey()                                                                                                               
       .reduce(sum, TimeWindows.of(5000).advanceBy(5000))                                                                     
       .toStream()                                                                                                            
       .selectKey(strip)                                                                                                      
       .to("stream-5");                                                                                                          

KStream<String, DataPoint> s5 = builder.stream("stream-5");                                                                      

s5.groupByKey()                                                                                                               
       .reduce(sum, TimeWindows.of(10000).advanceBy(10000))                                                                   
       .toStream()                                                                                                            
       .selectKey(strip)                                                                                                      
       .to("stream-10");      

现在,如果stream-1包含intput( key就是KEY)

代码语言:javascript
复制
KEY {"timestamp":0,"value":1.0}
KEY {"timestamp":1000,"value":1.0}
KEY {"timestamp":2000,"value":1.0}
KEY {"timestamp":3000,"value":1.0}
KEY {"timestamp":4000,"value":1.0}
KEY {"timestamp":5000,"value":1.0}
KEY {"timestamp":6000,"value":1.0}
KEY {"timestamp":7000,"value":1.0}
KEY {"timestamp":8000,"value":1.0}
KEY {"timestamp":9000,"value":1.0}

stream-5包含正确的(最终)值:

代码语言:javascript
复制
KEY {"timestamp":0,"value":1.0}
KEY {"timestamp":0,"value":2.0}
KEY {"timestamp":0,"value":3.0}
KEY {"timestamp":0,"value":4.0}
KEY {"timestamp":0,"value":5.0}
KEY {"timestamp":5000,"value":1.0}
KEY {"timestamp":5000,"value":2.0}
KEY {"timestamp":5000,"value":3.0}
KEY {"timestamp":5000,"value":4.0}
KEY {"timestamp":5000,"value":5.0}

但是stream-10是错误的(最终值应该是10.0),因为它还考虑了stream-5上的中间值:

代码语言:javascript
复制
KEY {"timestamp":0,"value":1.0}
KEY {"timestamp":0,"value":3.0}
KEY {"timestamp":0,"value":6.0}
KEY {"timestamp":0,"value":10.0}
KEY {"timestamp":0,"value":15.0}
KEY {"timestamp":0,"value":21.0}
KEY {"timestamp":0,"value":28.0}
KEY {"timestamp":0,"value":36.0}
KEY {"timestamp":0,"value":45.0}
KEY {"timestamp":0,"value":55.0}
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-07-14 01:50:05

问题是所有聚合的结果都是KTables的,这意味着生成到其输出主题的记录表示更改日志。但是,当您随后将它们作为流加载时,下游聚合将重复计算。

相反,您需要将中间主题加载为表,而不是流。但是,您将无法对它们使用窗口聚合,因为这些聚合仅在流上可用。

您可以使用以下模式在表而不是流上完成窗口聚合:

https://cwiki.apache.org/confluence/display/KAFKA/Windowed+aggregations+over+successively+increasing+timed+windows

如果您想在单独的进程中运行每个步骤,您可以对其进行调整,只需记住使用builder.table()而不是builder.stream()加载中间表。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/45081747

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档