This question介绍了如何使用Flink对无序流进行排序,但我更愿意使用DataStream API。One solution将使用一个使用PriorityQueue来缓冲事件的ProcessFunction来实现这一点,直到水印表明它们不再是无序的,但是在RocksDB状态后端(问题是每次对PriorityQueue的访问都需要整个PriorityQueue的ser/de )时,它们的性能很差。无论使用的是哪种状态后端,我如何有效地做到这一点?
发布于 2019-12-24 11:29:09
一种更好的方法(或多或少是Flink的SQL和CEP库在内部所做的)是在MapState中缓冲无序流,如下所示:
如果您是独立地对每个键进行排序,那么首先输入流。否则,对于全局排序,将流按一个常量键,以便您可以使用KeyedProcessFunction实现排序。
在该进程函数的open方法中,实例化一个MapState对象,其中键是时间戳,值是流元素的列表,它们都具有相同的时间戳。
在onElement方法中:
如果事件延迟,则将其删除或发送到侧output
的事件时间定时器
当调用onTimer时,这个时间戳的映射中的条目就可以作为排序流的一部分释放了--因为当前的水印现在表明所有早期的事件都应该已经被处理了。在将事件发送到下游后,不要忘记清除地图中的条目。
发布于 2022-07-17 12:57:41
不幸的是,使用计时器的解决方案对我们无效。由于产生了大量的计时器,导致检查点失效。作为另一种选择,我们做了一种有着翻滚窗户的方法:
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.util.stream.StreamSupport;
public class EventSortJob {
private static final Duration ALLOWED_LATENESS = Duration.ofMillis(2);
private static final Duration SORT_WINDOW_SIZE = Duration.ofMillis(5);
private static final Logger LOGGER = LoggerFactory.getLogger(EventSortJob.class);
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SingleOutputStreamOperator<Integer> source = env
.fromElements(0, 1, 2, 10, 9, 8, 3, 5, 4, 7, 6)
.assignTimestampsAndWatermarks(
new WatermarkStrategy<Integer>() {
@Override public WatermarkGenerator<Integer> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<Integer>() {
private long watermark = Long.MIN_VALUE;
// punctuated watermarks are used here for demonstration purposes only!!!
@Override public void onEvent(Integer event, long eventTimestamp, WatermarkOutput output) {
long potentialWatermark = event - ALLOWED_LATENESS.toMillis(); // delay watermark behind latest timestamp
if (potentialWatermark > watermark) {
watermark = potentialWatermark;
output.emitWatermark(new Watermark(watermark));
LOGGER.info("watermark = {}", watermark);
}
}
// normally, periodic watermarks should be used
@Override public void onPeriodicEmit(WatermarkOutput output) {}
};
}
@Override public TimestampAssigner<Integer> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return (element, recordTimestamp) -> element; // for simplicity, element values are also timestamps (in millis)
}
}
);
OutputTag<Integer> lateEventsTag = new OutputTag<Integer>("lateEventsTag") {};
SingleOutputStreamOperator<Integer> sorted = source
.keyBy(v -> 1)
.window(TumblingEventTimeWindows.of(Time.milliseconds(SORT_WINDOW_SIZE.toMillis())))
.sideOutputLateData(lateEventsTag)
.process(new ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>() {
@Override public void process(
Integer integer,
ProcessWindowFunction<Integer, Integer, Integer, TimeWindow>.Context context,
Iterable<Integer> elements,
Collector<Integer> out
) {
StreamSupport.stream(elements.spliterator(), false)
.sorted()
.forEachOrdered(out::collect);
}
});
source.keyBy(v -> 1).map(v -> String.format("orig: %d", v)).addSink(new PrintSinkFunction<>());
sorted.addSink(new PrintSinkFunction<>());
sorted.getSideOutput(lateEventsTag).keyBy(v -> 1).map(v -> String.format("late: %d", v)).addSink(new PrintSinkFunction<>());
env.execute();
}
}https://stackoverflow.com/questions/59468154
复制相似问题