首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >滑动时间窗口的Flink性能问题

滑动时间窗口的Flink性能问题
EN

Stack Overflow用户
提问于 2018-08-23 02:58:04
回答 1查看 738关注 0票数 3

我正在尝试一些网络监控工作的flink。我的目标是计算每个src_ip的不同src_ip

下面的代码可以工作,但是性能确实很差。似乎每个滑动窗口重新计算所有事件,但这不应该是必要的。

例如,我们有活动准时第二次1- 600.Flink可以得到每秒钟的累加器,所以我们每秒钟有600个累加器。当第一个滑动窗口过期时,flink只合并1到300的累加器,并销毁第二个1的累加器。此窗口还可以在最后一秒钟前将1-299进行预合并。当第二个滑动窗口过期时,flink只合并2-301的累加器,破坏第二个的累加器。

这种方法比将事件分配给多个窗口和计算每个窗口的聚合效率要高得多。

支持flink吗?我能用flink获得类似的功能吗?

非常感谢!

代码语言:javascript
复制
public static class AverageAccumulator2 {
    String key;
    Set<String> target;
    AverageAccumulator2() {
        target = new HashSet<>();
    }
}

public static class Average2 implements AggregateFunction<ObjectNode, AverageAccumulator2, Tuple3<String, Long, Set<String>>> {
    @Override
    public AverageAccumulator2 createAccumulator() {
        return new AverageAccumulator2();
    }

    @Override
    public AverageAccumulator2 add(ObjectNode value, AverageAccumulator2 accumulator) {
        accumulator.key = value.get("value").get("src_ip").asText();
        accumulator.target.add(value.get("value").get("dst_ip").asText());
        return accumulator;
    }
    @Override
    public Tuple3<String, Long, Set<String>> getResult(AverageAccumulator2 accumulator) {
        return new Tuple3<>(accumulator.key, (long) accumulator.target.size(), accumulator.target);
    }

    @Override
    public AverageAccumulator2 merge(AverageAccumulator2 a, AverageAccumulator2 b) {
        a.target.addAll(b.target);
        return a;
    }
}

final SingleOutputStreamOperator<Tuple3<String, Long, Set<String>> > process2 =
stream.keyBy(value -> value.get("value").get("sip").asText())
                    .timeWindow(Time.seconds(300),Time.seconds(1))
                    .aggregate(new Average2());
EN

回答 1

Stack Overflow用户

发布于 2018-08-24 12:17:41

正如您所观察到的,Flink并不试图优化滑动窗口。这确实变得非常昂贵的细粒度滑动。

您可以做的是使用ProcessFunction实现自己的处理状态和计时器的逻辑--您可以按照您所描述的那样实现这一点。您将拥有一个processElement方法,对于每个传入记录,更新用于积累结果的数据结构,还有一个onTimer方法,该方法每秒触发一次,将部分结果合并在一起,并将结果发送到下游。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/51977741

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档