我有以下场景:假设有20个传感器正在向我发送流传输。我对流应用keyBy (sensorID)并执行一些操作,如average等。这已经实现,并且运行良好(使用Flink Java API)。
最初一切都很顺利,所有的传感器都在向我发送数据。经过一段时间后,可能会有几个传感器开始运行不正常,我开始从它们那里获得不规律的馈送,例如,我从18个传感器收到馈送,但其中两个在很长一段时间内没有向我发送馈送。
我们可以假设我已经知道了sensorId的固定列表(可能是硬编码的/或者在数据库中)。我如何识别哪两个没有发送摘要?在哪里可以获得keyId的列表,以便与数据库中的列表进行比较?
如果我得不到提要(例如,2分钟、5分钟、10分钟等等,优先级越来越高),我想发出警报。
有没有人使用flink-streaming / patterns实现过这样的场景?有什么建议请提出来。
发布于 2020-06-06 03:21:43
从技术上讲,您可以使用ProcessFunction和计时器。
您可以简单地为每个记录注册计时器,并在收到数据时重置计时器。如果你计划计时器在5分钟的处理时间后运行,这基本上意味着如果你还没有收到数据,它将调用函数onTimer,你可以简单地从这个函数发出一些警报。可以重新注册已触发警报的计时器,以允许发出严重程度更高的警报。
请注意,只有在最初所有传感器都正常工作的情况下,这才能起作用。具体地说,它只会对至少出现过一次的键发出警报。但从你的描述来看,它似乎能解决你的问题。
发布于 2020-06-06 16:43:07
我只是碰巧有一个这种模式的例子。它需要一些调整来适应你的用例,但应该能让你上手。
public class TimeoutFunction extends KeyedProcessFunction<String, Event, String> {
private ValueState<Long> lastModifiedState;
static final int TIMEOUT = 2 * 60 * 1000; // 2 minutes
@Override
public void open(Configuration parameters) throws Exception {
// register our state with the state backend
state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", Long.class));
}
@Override
public void processElement(Event event, Context ctx, Collector<String> out) throws Exception {
// update our state and timer
Long current = lastModifiedState.value();
if (current != null) {
ctx.timerService().deleteEventTimeTimer(current + TIMEOUT);
}
current = max(current, event.timestamp());
lastModifiedState.update(current);
ctx.timerService().registerEventTimeTimer(current + TIMEOUT);
}
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
// emit alert
String deviceId = ctx.getCurrentKey();
out.collect(deviceId);
}
}这里假设有一个主程序,它执行如下操作:
DataStream<String> result = stream
.assignTimestampsAndWatermarks(new MyBoundedOutOfOrdernessAssigner(...))
.keyBy(e -> e.deviceId)
.process(new TimeoutFunction());正如@Dominik所说,这只会对至少出现过一次的键发出警报。您可以通过引入次要事件源来修复它,该次要事件源为每个应该存在的源创建一个人工事件,并将该流与主要源合并。
发布于 2020-06-09 22:07:02
模式现在对我来说非常清楚了。我已经实现了这个解决方案,它的效果很不错。
如果有人需要这些代码,我很乐意与大家分享
https://stackoverflow.com/questions/62220729
复制相似问题