我的flink应用程序执行以下操作
我面临的问题是flink使用者无法保存数据10秒,并抛出以下异常:
由: java.util.concurrent.ExecutionException: java.io.IOException引起的:状态的大小大于允许的最大内存支持状态。Size=18340663,maxSize=5242880
我不能应用countWindow,因为如果记录的频率太慢,那么elasticsearch接收器可能会被推迟很长时间。
我的问题是:
可以应用TimeWindow和CountWindow的OR函数吗?
> if ( recordCount is 500 OR 10 seconds have elapsed)
> then dump data to flink发布于 2019-01-30 13:00:35
不是直接的。但是您可以使用带有自定义触发逻辑的GlobalWindow。查看计数触发器这里的源代码。
你的触发逻辑会像这样。
private final ReducingStateDescriptor<Long> stateDesc =
new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);
private long triggerTimestamp = 0;
@Override
public TriggerResult onElement(String element, long l, GlobalWindow globalWindow, TriggerContext triggerContext) throws Exception {
ReducingState<Long> count = triggerContext.getPartitionedState(stateDesc);
// Increment window counter by one, when an element is received
count.add(1L);
// Start the timer when the first packet is received
if (count.get() == 1) {
triggerTimestamp = triggerContext.getCurrentProcessingTime() + 10000; // trigger at 10 seconds from reception of first event
triggerContext.registerProcessingTimeTimer(triggerTimestamp); // Override the onProcessingTime method to trigger the window at this time
}
// Or trigger the window when the number of packets in the window reaches 500
if (count.get() >= 500) {
// Delete the timer, clear the count and fire the window
triggerContext.deleteProcessingTimeTimer(triggerTimestamp);
count.clear();
return TriggerResult.FIRE;
}
return TriggerResult.CONTINUE;
}发布于 2019-01-30 15:18:35
您也可以使用RocksDB状态后端,但是自定义触发器的性能会更好。
https://stackoverflow.com/questions/54440361
复制相似问题