首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何使用Flink对无序事件时间流进行排序

如何使用Flink对无序事件时间流进行排序
EN

Stack Overflow用户
提问于 2019-12-24 11:29:09
回答 2查看 2.1K关注 0票数 5

This question介绍了如何使用Flink对无序流进行排序,但我更愿意使用DataStream API。One solution将使用一个使用PriorityQueue来缓冲事件的ProcessFunction来实现这一点,直到水印表明它们不再是无序的,但是在RocksDB状态后端(问题是每次对PriorityQueue的访问都需要整个PriorityQueue的ser/de )时,它们的性能很差。无论使用的是哪种状态后端,我如何有效地做到这一点?

EN

回答 2

Stack Overflow用户

发布于 2019-12-24 11:29:09

一种更好的方法(或多或少是Flink的SQL和CEP库在内部所做的)是在MapState中缓冲无序流,如下所示:

如果您是独立地对每个键进行排序,那么首先输入流。否则,对于全局排序,将流按一个常量键,以便您可以使用KeyedProcessFunction实现排序。

在该进程函数的open方法中,实例化一个MapState对象,其中键是时间戳,值是流元素的列表,它们都具有相同的时间戳。

onElement方法中:

如果事件延迟,则将其删除或发送到侧output

  • Otherwise,,将事件附加到对应于其timestamp

  • Register的映射条目中--该事件的时间戳

的事件时间定时器

当调用onTimer时,这个时间戳的映射中的条目就可以作为排序流的一部分释放了--因为当前的水印现在表明所有早期的事件都应该已经被处理了。在将事件发送到下游后,不要忘记清除地图中的条目。

票数 10
EN

Stack Overflow用户

发布于 2022-07-17 12:57:41

不幸的是,使用计时器的解决方案对我们无效。由于产生了大量的计时器,导致检查点失效。作为另一种选择,我们做了一种有着翻滚窗户的方法:

代码语言:javascript
复制
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.stream.StreamSupport;

public class EventSortJob {
    private static final Duration ALLOWED_LATENESS = Duration.ofMillis(2);
    private static final Duration SORT_WINDOW_SIZE = Duration.ofMillis(5);

    private static final Logger LOGGER = LoggerFactory.getLogger(EventSortJob.class);

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        SingleOutputStreamOperator<Integer> source = env
                .fromElements(0, 1, 2, 10, 9, 8, 3, 5, 4, 7, 6)
                .assignTimestampsAndWatermarks(
                        new WatermarkStrategy<Integer>() {
                            @Override public WatermarkGenerator<Integer> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
                                return new WatermarkGenerator<Integer>() {
                                    private long watermark = Long.MIN_VALUE;

                                    // punctuated watermarks are used here for demonstration purposes only!!!
                                    @Override public void onEvent(Integer event, long eventTimestamp, WatermarkOutput output) {
                                        long potentialWatermark = event - ALLOWED_LATENESS.toMillis(); // delay watermark behind latest timestamp
                                        if (potentialWatermark > watermark) {
                                            watermark = potentialWatermark;
                                            output.emitWatermark(new Watermark(watermark));
                                            LOGGER.info("watermark = {}", watermark);
                                        }
                                    }

                                    // normally, periodic watermarks should be used
                                    @Override public void onPeriodicEmit(WatermarkOutput output) {}
                                };
                            }

                            @Override public TimestampAssigner<Integer> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
                                return (element, recordTimestamp) -> element; // for simplicity, element values are also timestamps (in millis)
                            }
                        }
                );

        OutputTag<Integer> lateEventsTag = new OutputTag<Integer>("lateEventsTag") {};
        SingleOutputStreamOperator<Integer> sorted = source
                .keyBy(v -> 1)
                .window(TumblingEventTimeWindows.of(Time.milliseconds(SORT_WINDOW_SIZE.toMillis())))
                .sideOutputLateData(lateEventsTag)
                .process(new ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>() {
                    @Override public void process(
                            Integer integer,
                            ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>.Context context,
                            Iterable<Integer> elements,
                            Collector<Integer> out
                    ) {
                        StreamSupport.stream(elements.spliterator(), false)
                                .sorted()
                                .forEachOrdered(out::collect);
                    }
                });

        source.keyBy(v -> 1).map(v -> String.format("orig: %d", v)).addSink(new PrintSinkFunction<>());
        sorted.addSink(new PrintSinkFunction<>());
        sorted.getSideOutput(lateEventsTag).keyBy(v -> 1).map(v -> String.format("late: %d", v)).addSink(new PrintSinkFunction<>());

        env.execute();
    }
}
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/59468154

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档