我正在尝试一些网络监控工作的flink。我的目标是计算每个src_ip的不同src_ip。
下面的代码可以工作,但是性能确实很差。似乎每个滑动窗口重新计算所有事件,但这不应该是必要的。
例如,我们有活动准时第二次1- 600.Flink可以得到每秒钟的累加器,所以我们每秒钟有600个累加器。当第一个滑动窗口过期时,flink只合并1到300的累加器,并销毁第二个1的累加器。此窗口还可以在最后一秒钟前将1-299进行预合并。当第二个滑动窗口过期时,flink只合并2-301的累加器,破坏第二个的累加器。
这种方法比将事件分配给多个窗口和计算每个窗口的聚合效率要高得多。
支持flink吗?我能用flink获得类似的功能吗?
非常感谢!
public static class AverageAccumulator2 {
String key;
Set<String> target;
AverageAccumulator2() {
target = new HashSet<>();
}
}
public static class Average2 implements AggregateFunction<ObjectNode, AverageAccumulator2, Tuple3<String, Long, Set<String>>> {
@Override
public AverageAccumulator2 createAccumulator() {
return new AverageAccumulator2();
}
@Override
public AverageAccumulator2 add(ObjectNode value, AverageAccumulator2 accumulator) {
accumulator.key = value.get("value").get("src_ip").asText();
accumulator.target.add(value.get("value").get("dst_ip").asText());
return accumulator;
}
@Override
public Tuple3<String, Long, Set<String>> getResult(AverageAccumulator2 accumulator) {
return new Tuple3<>(accumulator.key, (long) accumulator.target.size(), accumulator.target);
}
@Override
public AverageAccumulator2 merge(AverageAccumulator2 a, AverageAccumulator2 b) {
a.target.addAll(b.target);
return a;
}
}
final SingleOutputStreamOperator<Tuple3<String, Long, Set<String>> > process2 =
stream.keyBy(value -> value.get("value").get("sip").asText())
.timeWindow(Time.seconds(300),Time.seconds(1))
.aggregate(new Average2());发布于 2018-08-24 12:17:41
正如您所观察到的,Flink并不试图优化滑动窗口。这确实变得非常昂贵的细粒度滑动。
您可以做的是使用ProcessFunction实现自己的处理状态和计时器的逻辑--您可以按照您所描述的那样实现这一点。您将拥有一个processElement方法,对于每个传入记录,更新用于积累结果的数据结构,还有一个onTimer方法,该方法每秒触发一次,将部分结果合并在一起,并将结果发送到下游。
https://stackoverflow.com/questions/51977741
复制相似问题