首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >KTable状态存储无限保留

KTable状态存储无限保留
EN

Stack Overflow用户
提问于 2017-11-22 16:42:11
回答 1查看 2.9K关注 0票数 8

我们有以下高级DSL处理拓扑:

代码语言:javascript
复制
TimeWindows timeWindow = TimeWindows.of(windowDurationMs).advanceBy(windowAdvanceMs).until(retensionTimeMs);

KTable<Windowed<K>, Long> table1 = stream1.groupByKey().count(timeWindow, "Stream_1_Count_Store");
KTable<Windowed<K>, Long> table2 = stream2.groupByKey().count(timeWindow, "Stream_2_Count_Store");


KTable<Windowed<K>, Pair<Long,Long> joined = table1.leftJoin(table2, someValueJoiner, joinSerde, "Join_Store");

KTable<Windowed<SmallerKey>, Tuple<Long,Long,Long>> grouped = joined.groupBy(someSelector);

KTable<Windowed<SmallerKey>, Map<Long, Pair<Long,Long>>> aggregated = grouped.aggregate(initializer, adder, subtractor, aggValueSerde, "Agg_Store_Name")

简而言之,我们上面所做的是:

  1. 使用跳转窗口计算事件
  2. 在结果的KTables之间执行左联接(左是因为业务逻辑)
  3. 分组和更改键和值:获取键的一个组件(长)并移动到值
  4. 将生成的KTable聚合到最终的KTable,聚合对象是从T到步骤1中的两个连接计数器的映射。注意,映射的大小不超过600,而且通常要小得多。

其想法是创建窗口事件计数,并将这些加窗口的键用于连接和聚合操作(在KTable中,这些操作没有窗口)

问题是:连接和聚合操作的状态存储没有保留机制,并导致磁盘(RocksDB)中的空间爆炸。

更具体地说:(跳转)窗口会导致密钥上的笛卡儿产品,并且没有删除旧窗口的机制。

如果KTable键没有加窗,但只有足够数量的唯一键,也会出现同样的问题。

请注意,支持table1和table2的状态存储没有空间问题,这是因为它们是由管理丢弃旧窗口的DSL提供的窗口存储。在联接和聚合中,我们将加窗的键视为“任何旧键”,DSL也会这样做,并使用非窗口的KeyValueStore。

这个问题涉及以下几个方面:卡夫卡-4212卡夫卡-4273汇合论坛问题

这里有什么误解的概念吗?是否有一种使用DSL实现此拓扑的简单方法?如果不是,建议使用低级别API实现它的方法是什么?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-11-23 22:42:50

我想你可以这样做:

代码语言:javascript
复制
StreamsBuilder builder = new StreamBuilder();
KStream<K,V> streams = builder.stream(/* pattern for both streams */);

KStream<SmallerKey,Tuple<Long,V,String>> enrichedStream = stream.transform(
    /* custom Transformer that set the weaker grouping key right here
       and puts the extracted component into the value before the aggregation;
       additionally (that's why we need a Transformer) get the topic name from
       context object and enrich the value accordingly (ie, third String argument in the output Tuple */);

KTable<Windowed<SmallerKey>, Map<Long, Pair<Long,Long>>> = stream.groupByKey.aggregate(
    timeWindow,
    /* initializer: return an empty Map;
       aggregator:
       for each input record, check if Map contains entry for Long key already (ie, extracted component, first argument from input Tuple<Long,V,String>;
         if not, add new map entry with Pair(0,0)
       take the corresponding Pair from the Map and increase one
       counter depending on the original topic that
       is encoded in the input value (ie, Pair.first is counter for first topic and Pair.second is counter for second topic) */);

示例

假设两个输入流s1s2具有以下记录(<TS,key,value>):

代码语言:javascript
复制
s1: <1,k1,v1> | <2,k2,v2> | <3,k1,v3> | <6,k2,v4> | <12,k2,v5>
s2: <1,k1,va> | <2,k2,vb> | <3,k2,vc> | <11,k2,vd>

在您最初的程序中,您首先要分别计算这两个流(假设大小为5的翻滚窗口)得到(省略TS):

代码语言:javascript
复制
<W0<k1>, 1> | <W0<k2>, 1> | <W0<k1>, 2> | <W1<k2>, 1> | <W2<k2>, 1>  
and
<W0<k1>, 1> | <W0<k2>, 1> | <W0<k2>, 2> | <W2<k2>, 1>  

在您的左加入之后,您将得到(在所有记录被处理后的结果,省略中间部分):

代码语言:javascript
复制
<<W0<k1>, <2,1>> | <W0<k2>, <1,2>> | <W1<k2>, <1,null>> | <W2<k2>, <1,1>>

现在,您使用“弱键”重新分组,将一个键部分提取到值中,并将所有条目放到一个映射中,以提取的键部分为基础。让我们假设我们根据"char“和”k1“来拆分密钥(即k1被拆分为k,因为smallerKey1是提取到值中的Long )。在聚合之后得到(我将映射表示为(k1 -> v1, k2 - v2) )

代码语言:javascript
复制
<<W0<k>, (1 -> <2,1>, 2 -> <1,2>> | <W1<k>, (2 -> <1,null>)> | <W2<k>, (2 -> <1,1>)>

如果这是一个正确的例子(我可能没有理解你的问题描述)。您可以使用上面描述的transform/groupBy/aggregate进行同样的操作。所提供的资料如下:

代码语言:javascript
复制
s1: <1,k1,v1> | <2,k2,v2> | <3,k1,v3> | <6,k2,v4> | <12,k2,v5>
s2: <1,k1,va> | <2,k2,vb> | <3,k2,vc> | <11,k2,vd>

transform的结果是(包括TS):

代码语言:javascript
复制
<1, k, <1,v1,s1>> | <2, k, <2,v2,s1>> | <3, k, <1,v3,s1>> | <6, k, <2,v4,s1>> | <12, k, <2,v5,s1>>
and
<1, k, <1,va,s2>> | <2, k, <2,vb,s2>> | <3, k, <2,vc,s2>> | <11, k, <2,vd,s2>>

请注意,Transform实际上将这两个流处理为“一个流”,因为我们使用了模式订阅--因此,输出只是一个流,其中包含两个原始流的交错记录。

现在应用相同的窗口和聚合结果(TS省略了) --我们通过交替处理每个原始输入流的一个记录来显示结果,作为inputRecord ==> outputRecord

代码语言:javascript
复制
<1, k, <1,v1,s1>> ==> <W0<k>, (1 -> <1,null>)>
<1, k, <1,va,s2>> ==> <W0<k>, (1 -> <1,1>>
<2, k, <2,v2,s1>> ==> <W0<k>, (1 -> <1,1>, 2 -> <1, null>)>
<2, k, <2,vb,s2>> ==> <W0<k>, (1 -> <1,1>, 2 -> <1,1>)>
<3, k, <1,v3,s1>> ==> <W0<k>, (1 -> <2,1>, 2 -> <1, null>)>
<3, k, <2,vc,s2>> ==> <W0<k>, (1 -> <2,1>, 2 -> <1,2>)>
<6, k, <2,v4,s1>> ==> <W1<k>, (2 -> <1,null>)>
<11, k, <2,vd,s2>> ==> <W2<k>, (2 -> <null, 1>)>
<12, k, <2,v5,s1>> ==> <W2<k>, (2 -> <1,1>)>

如果您将此结果的每个键的最新记录与上面的结果进行比较,您会发现两者是相同的。

票数 10
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/47439855

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档