首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >FLINK forBoundedOutOfOrderness + CEP

FLINK forBoundedOutOfOrderness + CEP
EN

Stack Overflow用户
提问于 2022-01-28 15:40:30
回答 1查看 368关注 0票数 0

我正在尝试在无序流事件的FLINK上实现CEP模式。我的溪流就是这样构建的:

代码语言:javascript
复制
DataStream<DataInput> input = inputStream.flatMap(
new FlatMapFunction<String, DataInput>() {
    @Override
    public void flatMap(String value, Collector<DataInput> out) throws Exception {                              
        for(DataInput input : JsonUtilsJackson.getInstance().initTrackingDataFromJson(value)) { 
            //One input can generate multiple DataInput
            out.collect(input); 
        }
    }
})
// Elements can be lately sent
.assignTimestampsAndWatermarks(WatermarkStrategy.Tracking>forBoundedOutOfOrderness(Duration.ofSeconds(10))          
//Timestamp is not based on Kinesis but on data timestamp        
.withTimestampAssigner((event, timestamp) ->  event.getGeneratedDate().toEpochSecond())) 
//CEP by KEY
.keyBy(requestId -> requestId.getTrackingData().getEntityReference());

我的模式通过以下代码链接到我的流:

代码语言:javascript
复制
SingleOutputStreamOperator<DataOutput> enterStream = CEP.pattern(
            input,
            PatternStrategy.getPattern()
    ).process(new SpecificProcess());

我对forBoundedOutOfOrderness的理解是,如果一个元素在11:01:00使用generatedDate字段= 10:00:00注入,它将接受在09:59:50到10:00之间使用generatedDate字段的所有元素,并且它将以升序模式排序。

我不明白的是如何管理水印的定期检查。因为这个不依赖于我的动态时间戳读数(我的例子是11:01:00),Flink将如何触发他不再需要等待的事实,那个链接到水印的周期性生成+无序吗?

在我的测试中,该模式只启动一次,之后从未启动。通过调试,我在CepOperator.onEventTime中看到了事件的良好缓冲,但它们的时间戳始终是<= timerService.currentWaterMark()。

所以,如果有人有解释的话,这会对我有帮助。谢谢。

顺便说一句,有没有办法让KeyedStream有水印,我的不同的实体有不同的生命周期,我错过了一些事件。

EN

回答 1

Stack Overflow用户

发布于 2022-01-28 16:20:21

你的问题还不完全清楚,但也许下面的信息会对你有所帮助。

水印所起的作用是,它们位于流中的某个特定位置,并在该点标记一个表示完整性的时间戳--在流中的那个点,没有比水印中的时间戳更少的事件了。

水印不对流进行排序,但它们可以用于排序。这是CEP在事件时间模式中使用时所做的事情。

forBoundedOutOfOrderness是一种水印策略,它周期性地生成水印(默认情况下,每200毫秒生成一次水印)。但是,只有在自上一次水印以来出现了新事件之后,水印才会提前,这些事件可用作较大水印(即,至少有一个具有较大时间戳的事件)的正当理由。

Flink不支持密钥水印.但FlinkKinesisConsumer支持每片水印,这可能会有所帮助。这将导致滞后最严重的碎片阻碍水印的形成,这将避免出现如此多的后期事件。如果您对每个密钥使用一个单独的碎片,那么您将得到类似于per水印的内容。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70896332

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档