我们有以下高级DSL处理拓扑:
TimeWindows timeWindow = TimeWindows.of(windowDurationMs).advanceBy(windowAdvanceMs).until(retensionTimeMs);
KTable<Windowed<K>, Long> table1 = stream1.groupByKey().count(timeWindow, "Stream_1_Count_Store");
KTable<Windowed<K>, Long> table2 = stream2.groupByKey().count(timeWindow, "Stream_2_Count_Store");
KTable<Windowed<K>, Pair<Long,Long> joined = table1.leftJoin(table2, someValueJoiner, joinSerde, "Join_Store");
KTable<Windowed<SmallerKey>, Tuple<Long,Long,Long>> grouped = joined.groupBy(someSelector);
KTable<Windowed<SmallerKey>, Map<Long, Pair<Long,Long>>> aggregated = grouped.aggregate(initializer, adder, subtractor, aggValueSerde, "Agg_Store_Name")简而言之,我们上面所做的是:
其想法是创建窗口事件计数,并将这些加窗口的键用于连接和聚合操作(在KTable中,这些操作没有窗口)
问题是:连接和聚合操作的状态存储没有保留机制,并导致磁盘(RocksDB)中的空间爆炸。
更具体地说:(跳转)窗口会导致密钥上的笛卡儿产品,并且没有删除旧窗口的机制。
如果KTable键没有加窗,但只有足够数量的唯一键,也会出现同样的问题。
请注意,支持table1和table2的状态存储没有空间问题,这是因为它们是由管理丢弃旧窗口的DSL提供的窗口存储。在联接和聚合中,我们将加窗的键视为“任何旧键”,DSL也会这样做,并使用非窗口的KeyValueStore。
这个问题涉及以下几个方面:卡夫卡-4212,卡夫卡-4273,汇合论坛问题
这里有什么误解的概念吗?是否有一种使用DSL实现此拓扑的简单方法?如果不是,建议使用低级别API实现它的方法是什么?
发布于 2017-11-23 22:42:50
我想你可以这样做:
StreamsBuilder builder = new StreamBuilder();
KStream<K,V> streams = builder.stream(/* pattern for both streams */);
KStream<SmallerKey,Tuple<Long,V,String>> enrichedStream = stream.transform(
/* custom Transformer that set the weaker grouping key right here
and puts the extracted component into the value before the aggregation;
additionally (that's why we need a Transformer) get the topic name from
context object and enrich the value accordingly (ie, third String argument in the output Tuple */);
KTable<Windowed<SmallerKey>, Map<Long, Pair<Long,Long>>> = stream.groupByKey.aggregate(
timeWindow,
/* initializer: return an empty Map;
aggregator:
for each input record, check if Map contains entry for Long key already (ie, extracted component, first argument from input Tuple<Long,V,String>;
if not, add new map entry with Pair(0,0)
take the corresponding Pair from the Map and increase one
counter depending on the original topic that
is encoded in the input value (ie, Pair.first is counter for first topic and Pair.second is counter for second topic) */);示例
假设两个输入流s1和s2具有以下记录(<TS,key,value>):
s1: <1,k1,v1> | <2,k2,v2> | <3,k1,v3> | <6,k2,v4> | <12,k2,v5>
s2: <1,k1,va> | <2,k2,vb> | <3,k2,vc> | <11,k2,vd>在您最初的程序中,您首先要分别计算这两个流(假设大小为5的翻滚窗口)得到(省略TS):
<W0<k1>, 1> | <W0<k2>, 1> | <W0<k1>, 2> | <W1<k2>, 1> | <W2<k2>, 1>
and
<W0<k1>, 1> | <W0<k2>, 1> | <W0<k2>, 2> | <W2<k2>, 1> 在您的左加入之后,您将得到(在所有记录被处理后的结果,省略中间部分):
<<W0<k1>, <2,1>> | <W0<k2>, <1,2>> | <W1<k2>, <1,null>> | <W2<k2>, <1,1>>现在,您使用“弱键”重新分组,将一个键部分提取到值中,并将所有条目放到一个映射中,以提取的键部分为基础。让我们假设我们根据"char“和”k1“来拆分密钥(即k1被拆分为k,因为smallerKey和1是提取到值中的Long )。在聚合之后得到(我将映射表示为(k1 -> v1, k2 - v2) )
<<W0<k>, (1 -> <2,1>, 2 -> <1,2>> | <W1<k>, (2 -> <1,null>)> | <W2<k>, (2 -> <1,1>)>如果这是一个正确的例子(我可能没有理解你的问题描述)。您可以使用上面描述的transform/groupBy/aggregate进行同样的操作。所提供的资料如下:
s1: <1,k1,v1> | <2,k2,v2> | <3,k1,v3> | <6,k2,v4> | <12,k2,v5>
s2: <1,k1,va> | <2,k2,vb> | <3,k2,vc> | <11,k2,vd>transform的结果是(包括TS):
<1, k, <1,v1,s1>> | <2, k, <2,v2,s1>> | <3, k, <1,v3,s1>> | <6, k, <2,v4,s1>> | <12, k, <2,v5,s1>>
and
<1, k, <1,va,s2>> | <2, k, <2,vb,s2>> | <3, k, <2,vc,s2>> | <11, k, <2,vd,s2>>请注意,
Transform实际上将这两个流处理为“一个流”,因为我们使用了模式订阅--因此,输出只是一个流,其中包含两个原始流的交错记录。
现在应用相同的窗口和聚合结果(TS省略了) --我们通过交替处理每个原始输入流的一个记录来显示结果,作为inputRecord ==> outputRecord。
<1, k, <1,v1,s1>> ==> <W0<k>, (1 -> <1,null>)>
<1, k, <1,va,s2>> ==> <W0<k>, (1 -> <1,1>>
<2, k, <2,v2,s1>> ==> <W0<k>, (1 -> <1,1>, 2 -> <1, null>)>
<2, k, <2,vb,s2>> ==> <W0<k>, (1 -> <1,1>, 2 -> <1,1>)>
<3, k, <1,v3,s1>> ==> <W0<k>, (1 -> <2,1>, 2 -> <1, null>)>
<3, k, <2,vc,s2>> ==> <W0<k>, (1 -> <2,1>, 2 -> <1,2>)>
<6, k, <2,v4,s1>> ==> <W1<k>, (2 -> <1,null>)>
<11, k, <2,vd,s2>> ==> <W2<k>, (2 -> <null, 1>)>
<12, k, <2,v5,s1>> ==> <W2<k>, (2 -> <1,1>)>如果您将此结果的每个键的最新记录与上面的结果进行比较,您会发现两者是相同的。
https://stackoverflow.com/questions/47439855
复制相似问题