目前,我正在尝试使用storm进行消息处理。我发现滑动窗口功能很有趣,并尝试让它工作。
但是,即使我将间隔设置为5秒,窗口后面的计算也会更频繁地完成。似乎每一条新消息都会执行元组窗口的execute-method。
builder.setBolt("messageCountBolt",
new MessageCountBolt()
.withWindow(
new BaseWindowedBolt.Duration(20, TimeUnit.SECONDS),
new BaseWindowedBolt.Duration(5, TimeUnit.SECONDS))
.withMessageIdField("id")
.withTimestampField("timeStamp")
.withLag(new BaseWindowedBolt.Duration(5, TimeUnit.SECONDS)),
1).globalGrouping("spout");有人知道为什么吗?我希望计算等待5秒间隔内的所有消息。
发布于 2016-08-06 14:49:06
必须使用withTumblingWindow而不是withWindow。
withWindow在每个输入元组上执行,并交付包含最后一个输入消息的输入批次。但withTumblingWindow会将所有输入消息聚合在一个批处理中,并提供完整的消息。
发布于 2018-02-13 16:56:52
我认为原因是您使用的是SlidingWindow -它为该窗口中的每个入口和出口生成一个输出。如果您只想在窗口的末尾显示一个输出,那么理想情况下应该使用批处理窗口或翻滚窗口。总结一下:
https://stackoverflow.com/questions/37484130
复制相似问题