首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在同一数据上Flink多个Windows

在同一数据上Flink多个Windows
EN

Stack Overflow用户
提问于 2019-01-30 12:11:06
回答 2查看 844关注 0票数 2

我的flink应用程序执行以下操作

  1. 资料来源:阅读卡夫卡记录形式的数据。
  2. split:基于某些标准
  3. 窗口:10秒的时间窗口,将其聚合为一个大容量记录
  4. 接收器:将这些大容量记录转储到elasticsearch

我面临的问题是flink使用者无法保存数据10秒,并抛出以下异常:

由: java.util.concurrent.ExecutionException: java.io.IOException引起的:状态的大小大于允许的最大内存支持状态。Size=18340663,maxSize=5242880

我不能应用countWindow,因为如果记录的频率太慢,那么elasticsearch接收器可能会被推迟很长时间。

我的问题是:

可以应用TimeWindow和CountWindow的OR函数吗?

代码语言:javascript
复制
> if ( recordCount is 500 OR 10 seconds have elapsed)
>           then dump data to flink
EN

回答 2

Stack Overflow用户

发布于 2019-01-30 13:00:35

不是直接的。但是您可以使用带有自定义触发逻辑的GlobalWindow。查看计数触发器这里的源代码。

你的触发逻辑会像这样。

代码语言:javascript
复制
private final ReducingStateDescriptor<Long> stateDesc = 
    new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);
private long triggerTimestamp = 0;

@Override
public TriggerResult onElement(String element, long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {

    ReducingState<Long> count = triggerContext.getPartitionedState(stateDesc);

    // Increment window counter by one, when an element is received
    count.add(1L); 

    // Start the timer when the first packet is received
    if (count.get() == 1) {
        triggerTimestamp = triggerContext.getCurrentProcessingTime() + 10000; // trigger at 10 seconds from reception of first event
        triggerContext.registerProcessingTimeTimer(triggerTimestamp); // Override the onProcessingTime method to trigger the window at this time
    }

    // Or trigger the window when the number of packets in the window reaches 500
    if (count.get() >= 500) {
        // Delete the timer, clear the count and fire the window   
        triggerContext.deleteProcessingTimeTimer(triggerTimestamp);
        count.clear();
        return TriggerResult.FIRE;
    }

    return TriggerResult.CONTINUE;
}
票数 1
EN

Stack Overflow用户

发布于 2019-01-30 15:18:35

您也可以使用RocksDB状态后端,但是自定义触发器的性能会更好。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54440361

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档