首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >用于非连续数据的Flink和Kinesis流应用程序

用于非连续数据的Flink和Kinesis流应用程序
EN

Stack Overflow用户
提问于 2022-01-07 01:42:05
回答 1查看 63关注 0票数 0

我们已经构建了一个Flink应用程序来处理Kinesis流中的数据。该应用程序的执行流程包含基本操作,用于根据已注册类型过滤数据,根据事件时间戳分配水印,在5分钟的数据窗口上应用地图、进程和聚合函数,如下所示:

代码语言:javascript
复制
    final SingleOutputStreamOperator<Object> inputStream = env.addSource(consumer)
            .setParallelism(..)
            .filter(..)
            .assignTimestampsAndWatermarks(..);

    // Processing flow
    inputStream
            .map(..)
            .keyBy(..)
            .window(..)
            .sideOutputLateData(outputTag)
            .aggregate(aggregateFunction, processWindowFunction);

    // store processed data to external storage
    AsyncDataStream.unorderedWait(...);

我的水印分配程序的参考代码:

代码语言:javascript
复制
    @Override
public void onEvent(@NonNull final MetricSegment metricSegment,
                    final long eventTimestamp,
                    @NonNull final WatermarkOutput watermarkOutput) {
    if (eventTimestamp > eventMaxTimestamp) {
        currentMaxTimestamp = Instant.now().toEpochMilli();
    }
    eventMaxTimestamp = Math.max(eventMaxTimestamp, eventTimestamp);
}

@Override
public void onPeriodicEmit(@NonNull final WatermarkOutput watermarkOutput) {
    final Instant maxEventTimestamp = Instant.ofEpochMilli(eventMaxTimestamp);
    final Duration timeElaspsed = Duration.between(Instant.ofEpochMilli(lastCurrentTimestamp), Instant.now());
    if (timeElaspsed.getSeconds() >= emitWatermarkIntervalSec) {
        final long watermarkTimestamp = maxEventTimestamp.plus(1, ChronoUnit.MINUTES).toEpochMilli();
        watermarkOutput.emitWatermark(new Watermark(watermarkTimestamp));
    }
}

现在,这个应用程序的性能很好(按几秒钟的延迟顺序计算)。然而,最近,上游系统的帖子发生了变化,在Kinesis流中的数据被以突发的形式发布到流中(每天只发布2-3小时)。在进行此更改后,我们已经看到应用程序的延迟出现了很大的峰值(使用flink gauge方法测量,在第一个过滤器方法中记录开始时间,然后在异步方法中通过计算从开始时间点开始的时间差来发出度量值)。想知道是否有任何问题在使用Flink应用程序与动态流的突发流量/非连续数据流?

EN

回答 1

Stack Overflow用户

发布于 2022-01-07 10:27:33

由于输入流现在处于长时间空闲状态,这可能会造成水印被阻塞的情况。如果是这样的话,那么我预计延迟会有很大的变化,因为它(可能)只会是每个脉冲的最后窗口,其结果会被延迟到下一个脉冲到达。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70615663

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档