我们已经构建了一个Flink应用程序来处理Kinesis流中的数据。该应用程序的执行流程包含基本操作,用于根据已注册类型过滤数据,根据事件时间戳分配水印,在5分钟的数据窗口上应用地图、进程和聚合函数,如下所示:
final SingleOutputStreamOperator<Object> inputStream = env.addSource(consumer)
.setParallelism(..)
.filter(..)
.assignTimestampsAndWatermarks(..);
// Processing flow
inputStream
.map(..)
.keyBy(..)
.window(..)
.sideOutputLateData(outputTag)
.aggregate(aggregateFunction, processWindowFunction);
// store processed data to external storage
AsyncDataStream.unorderedWait(...);我的水印分配程序的参考代码:
@Override
public void onEvent(@NonNull final MetricSegment metricSegment,
final long eventTimestamp,
@NonNull final WatermarkOutput watermarkOutput) {
if (eventTimestamp > eventMaxTimestamp) {
currentMaxTimestamp = Instant.now().toEpochMilli();
}
eventMaxTimestamp = Math.max(eventMaxTimestamp, eventTimestamp);
}
@Override
public void onPeriodicEmit(@NonNull final WatermarkOutput watermarkOutput) {
final Instant maxEventTimestamp = Instant.ofEpochMilli(eventMaxTimestamp);
final Duration timeElaspsed = Duration.between(Instant.ofEpochMilli(lastCurrentTimestamp), Instant.now());
if (timeElaspsed.getSeconds() >= emitWatermarkIntervalSec) {
final long watermarkTimestamp = maxEventTimestamp.plus(1, ChronoUnit.MINUTES).toEpochMilli();
watermarkOutput.emitWatermark(new Watermark(watermarkTimestamp));
}
}现在,这个应用程序的性能很好(按几秒钟的延迟顺序计算)。然而,最近,上游系统的帖子发生了变化,在Kinesis流中的数据被以突发的形式发布到流中(每天只发布2-3小时)。在进行此更改后,我们已经看到应用程序的延迟出现了很大的峰值(使用flink gauge方法测量,在第一个过滤器方法中记录开始时间,然后在异步方法中通过计算从开始时间点开始的时间差来发出度量值)。想知道是否有任何问题在使用Flink应用程序与动态流的突发流量/非连续数据流?
发布于 2022-01-07 10:27:33
由于输入流现在处于长时间空闲状态,这可能会造成水印被阻塞的情况。如果是这样的话,那么我预计延迟会有很大的变化,因为它(可能)只会是每个脉冲的最后窗口,其结果会被延迟到下一个脉冲到达。
https://stackoverflow.com/questions/70615663
复制相似问题