我使用Flink和FlinkCEP来检测数据流上的复杂事件。出于研究目的,我只需要测量识别时间。
我正在使用Flink / FlinkCEP - 1.7.1。我使用env.fromCollection()函数在Flink环境中创建流。在此之后,我将使用FlinkCEP:CEP.pattern(....)以及其他select和print函数。
我只找到了这篇文章:Measure job execution time in flink,它帮了我很多。它提出了一种返回流环境进程的执行时间的解决方案。这不完全是我要找的。
我注意到返回值包含了其他运算符的时间,比如.assignAscendingTimestamps(x => x.TimeStamp()),因此我不能使用它。
有没有办法只测量CEP.pattern进程的时间?我也找不到在这种情况下对我有帮助的指标,除非我遗漏了一些东西……
发布于 2019-09-04 03:43:38
您可以为每条记录添加一个时间戳字段,并在CEP之前使用mapFunction将当前时间放入该字段。然后使用它在RichMapFunction中立即计算CEP内经过的时间--然后您可以通过自定义指标报告该时间,或将其发送到接收器。这会增加一些开销,但不会太多。只要您能够避免这两个函数之间的任何keyBy或重新平衡调用,所涉及的一切都将通过函数调用链接在一起,而不会产生任何序列化或网络开销。
https://stackoverflow.com/questions/57775407
复制相似问题