我正在尝试在无序流事件的FLINK上实现CEP模式。我的溪流就是这样构建的:
DataStream<DataInput> input = inputStream.flatMap(
new FlatMapFunction<String, DataInput>() {
@Override
public void flatMap(String value, Collector<DataInput> out) throws Exception {
for(DataInput input : JsonUtilsJackson.getInstance().initTrackingDataFromJson(value)) {
//One input can generate multiple DataInput
out.collect(input);
}
}
})
// Elements can be lately sent
.assignTimestampsAndWatermarks(WatermarkStrategy.Tracking>forBoundedOutOfOrderness(Duration.ofSeconds(10))
//Timestamp is not based on Kinesis but on data timestamp
.withTimestampAssigner((event, timestamp) -> event.getGeneratedDate().toEpochSecond()))
//CEP by KEY
.keyBy(requestId -> requestId.getTrackingData().getEntityReference());我的模式通过以下代码链接到我的流:
SingleOutputStreamOperator<DataOutput> enterStream = CEP.pattern(
input,
PatternStrategy.getPattern()
).process(new SpecificProcess());我对forBoundedOutOfOrderness的理解是,如果一个元素在11:01:00使用generatedDate字段= 10:00:00注入,它将接受在09:59:50到10:00之间使用generatedDate字段的所有元素,并且它将以升序模式排序。
我不明白的是如何管理水印的定期检查。因为这个不依赖于我的动态时间戳读数(我的例子是11:01:00),Flink将如何触发他不再需要等待的事实,那个链接到水印的周期性生成+无序吗?
在我的测试中,该模式只启动一次,之后从未启动。通过调试,我在CepOperator.onEventTime中看到了事件的良好缓冲,但它们的时间戳始终是<= timerService.currentWaterMark()。
所以,如果有人有解释的话,这会对我有帮助。谢谢。
顺便说一句,有没有办法让KeyedStream有水印,我的不同的实体有不同的生命周期,我错过了一些事件。
发布于 2022-01-28 16:20:21
你的问题还不完全清楚,但也许下面的信息会对你有所帮助。
水印所起的作用是,它们位于流中的某个特定位置,并在该点标记一个表示完整性的时间戳--在流中的那个点,没有比水印中的时间戳更少的事件了。
水印不对流进行排序,但它们可以用于排序。这是CEP在事件时间模式中使用时所做的事情。
forBoundedOutOfOrderness是一种水印策略,它周期性地生成水印(默认情况下,每200毫秒生成一次水印)。但是,只有在自上一次水印以来出现了新事件之后,水印才会提前,这些事件可用作较大水印(即,至少有一个具有较大时间戳的事件)的正当理由。
Flink不支持密钥水印.但FlinkKinesisConsumer支持每片水印,这可能会有所帮助。这将导致滞后最严重的碎片阻碍水印的形成,这将避免出现如此多的后期事件。如果您对每个密钥使用一个单独的碎片,那么您将得到类似于per水印的内容。
https://stackoverflow.com/questions/70896332
复制相似问题