我们使用flink-cep作为一个独立的库来查找事件列表中的模式。
鉴于下列事件清单:
val patientKey = "patient"
val hrKey = "hr"
// Event
val p1e1 = Event("hr", mapOf(patientKey to 1, hr to 1))
val p1e2 = Event("hr", mapOf(patientKey to 1, hr to 2))
val p2e1 = Event("hr", mapOf(patientKey to 2, hr to 1))
val p1e3 = Event("hr", mapOf(patientKey to 1, hr to 3))
val p2e2 = Event("hr", mapOf(patientKey to 2, hr to 2))
val p3e1 = Event("hr", mapOf(patientKey to 3, hr to 1))
val p2e3 = Event("hr", mapOf(patientKey to 2, hr to 3))
val p3e2 = Event("hr", mapOf(patientKey to 3, hr to 2))
val p3e3 = Event("hr", mapOf(patientKey to 3, hr to 3))我们想要编写一个以匹配的形式返回的模式:
第一场比赛: p1e1,p1e2,p1e3
第二场比赛: p2e1,p2e2,p2e3
第三场比赛: p3e1,p3e2,p3e3
因此,在有键流的flink环境中运行CEP似乎是可行的,但是如果没有键控流,我们该如何做呢?在受限设备上运行时,我们无法部署完整的flink env。
我们希望在5秒内为病人收集所有心率。
谢谢
发布于 2022-10-17 15:49:26
您可以将键控约束放入模式定义中,从而获得输入流的效果。如果使用SQL,则如下所示:
PATTERN (A B C) WITHIN INTERVAL '5' SECOND
DEFINE
A AS A.hr = 1
B AS B.patientKey = A.patientKey AND B.hr = 2
C AS C.patientKey = B.patientKey AND C.hr = 3如果不使用SQL,则应用相同的逻辑。(对于连续性,您需要指定followedBy而不是next,因为您不会按patientKey对流进行分区。)
无论它的价值是什么,我想不出任何运营或性能的好处,将来自于避免关键流。(事实上,CEP总是使用键控状态,即使没有显式地使用键控流。)键控流的使用使使用更大的Flink集群和并行操作成为可能,但不需要它。
https://stackoverflow.com/questions/74098605
复制相似问题