我用Flink实现了一个模式,它匹配三个事件,比如A->B->C。在定义了我的模式之后,我生成一个
PatternStream<Event> patternStream = CEP.pattern(eventStream, pattern);
有了这样一个PatternSelectFunction
patternStream.select(new MyPatternSelectFunction()).print();
这就像一种魅力,但我对所有匹配事件的活动时间感兴趣。我知道传统的Flink流API提供了丰富的功能,允许您注册Flink的内部延迟跟踪器,如本问题所述。我还看到Flink 1.8增加了一个新的RichPatternSelectFunction。但不幸的是,我不能设置Flink 1.8与Flink CEP。
最后,是否有办法获得所有匹配事件的事件时间?
发布于 2019-01-21 18:20:03
您不需要Rich来使用Flink的延迟跟踪。您只需要通过将latencyTrackingInterval设置为Flink配置或ExecutionConfig中的正数来启用它,例如,
env.getConfig().setLatencyTrackingInterval(1000);然后,您可以在度量解决方案中或通过REST来观察结果(延迟度量没有在Flink web中报告)。
更新:
延迟统计信息是作业度量,并且在
http://<job_manager_rest_endpoint>/jobs/<job_id>/metrics可以从
http://<job_manager_rest_endpoint>/jobs/<job_id>/metrics?get=<metric_name>这些指标的名称如下
latency.source_id.<ID>.operator_id.<ID>.operator_subtask_index.<SUBTASK>.<metric>其中,is标识作业图中的源节点和运算符节点,在这些节点之间测量延迟。
例如,通过这个请求,我可以在我现在运行的作业中确定源和其中一个接收器之间的第95百分位延迟:
http://localhost:8081/jobs/94b189a96b98b3aafaba6db6aa8b770b/metrics?get=latency.source_id.bc764cd8ddf7a0cff126f51c16239658.operator_id.fd0ee602f2fa8d310d9bd9f694e185f5.operator_subtask_index.0.latency_p95或者,您可以使用ProcessFunction在事件进入任务的CEP部分之前向它们添加处理时间戳,然后再使用另一个ProcessFunction来度量经过的时间。
https://stackoverflow.com/questions/54293808
复制相似问题