我想通过request_time计算健康检查数据的状态代码,从当前时间起有一个1分钟的窗口。
据了解,健康检查每分钟发送大约60个请求。所以结果应该是{environment: "XX", host: "XX", 200: 60, 300: 0, 400: 0, 500: 0}
但是实际的结果是{environment: "XX", host: "XX", 200: 50000, 300: 0, 400: 0, 500: 0},它计算了许多以前的数据。
我的代码就像
env.fromSource(kafkasource).filter().flatMap()
.assignTimeStampAndWaterMarks(
WaterMarkStrategy.<OnjectNode>forboundedOutOfOrderness(DurationOfSeconds(60).withTimeStampAssigner(assigner))
.keyBy("env","host")
.window(1m)
.reduce()有人知道缺了什么或者我在逻辑上错了吗?
发布于 2022-05-26 08:09:49
也许您应该在reduce()和flatmap()中提供更详细的代码信息。根据当前可用的信息,此问题可能是reduce中的状态未被重置。您可以检查reduce中的代码或是否正确地设置了ttl。
https://stackoverflow.com/questions/72382115
复制相似问题