我有以下格式的数据,
SIP|2405463430|4115474257|8.205142580136622E12|Tue 11月08 :58:58 IST 2016邀请RTP|2405463430|4115474257|8.205142580136622E12|Tue 11月08 :58:58 IST 2016 0 RTP|2405463430|4115474257|8.205142580136622E12|Tue 11月08 :58:58 IST 2016 RTP|2405463430|4115474257|8.205142580136622E12|Tue 16:58 16:58 IST 2016 2 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 16:58:58 IST 2016 3 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 :58:58 IST 2016 4 RTP|2405463430|4115474257|8.205142580136622E12|Tue Nov 08 :58:58 IST 20162016年11月8日16:58 RTP|2405463430|4115474257|8.205142580136622E12|Tue 2016年11月8日16:58 2016年11月8日16:58 IST 2016 SIP|2405463430|4115474257|8.205142580136622E12|Tue 16:58
我希望我的窗口在遇到SIP-INVITE消息时启动,当遇到SIP-BYE消息时触发一个事件,执行一些聚合。
我该怎么做?对于给定的用户,SIP-INVITE消息在任何时候都会出现,我还可能为多个用户同时发送多个SIP-INVITE消息。
发布于 2016-11-09 10:16:21
我认为您可以用用户键控的全局窗口解决您的用例。全局窗口收集每个键的所有数据,并将触发和清除窗口的责任推给用户定义的Trigger函数。
全局窗口的定义如下:
val input: DataStream[(String, Int, String)] = ??? // (userId, value, marker)
val agg = input
// one global window per user (handles overlapping SIP-INVITE events).
.keyBy(_._1)
// collect all data for each user until the trigger fires and purges the window.
.window(GlobalWindows.create())
// you have to implement a custom trigger which reacts on the marker.
.trigger(new YourCustomTrigger())
// the window function computes your aggregation.
.apply(new YourWindowFunction())我认为执行以下操作的触发器应该工作(假设SIP-INVITE事件总是启动会话)。Trigger.onElement()方法应该检查SIP-BYE字段并触发窗口评估并清除窗口,即返回TriggerResult.FIRE_AND_PURGE。这将调用评估函数并删除窗口状态。
注意,如果要支持无序事件,则需要特别注意(在这种情况下,您应该将事件时间计时器设置为关闭元素的时间戳,以确保收到时间戳之前的所有数据)。如果有数据应该被丢弃,因为它不是SIP-INVITE和SIP-BYE之间的数据,那么您也需要处理这个问题。
https://stackoverflow.com/questions/40501245
复制相似问题