首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >检查我是否使用所有密钥正确接收流

检查我是否使用所有密钥正确接收流
EN

Stack Overflow用户
提问于 2020-06-06 01:11:25
回答 3查看 29关注 0票数 0

我有以下场景:假设有20个传感器正在向我发送流传输。我对流应用keyBy (sensorID)并执行一些操作,如average等。这已经实现,并且运行良好(使用Flink Java API)。

最初一切都很顺利,所有的传感器都在向我发送数据。经过一段时间后,可能会有几个传感器开始运行不正常,我开始从它们那里获得不规律的馈送,例如,我从18个传感器收到馈送,但其中两个在很长一段时间内没有向我发送馈送。

我们可以假设我已经知道了sensorId的固定列表(可能是硬编码的/或者在数据库中)。我如何识别哪两个没有发送摘要?在哪里可以获得keyId的列表,以便与数据库中的列表进行比较?

如果我得不到提要(例如,2分钟、5分钟、10分钟等等,优先级越来越高),我想发出警报。

有没有人使用flink-streaming / patterns实现过这样的场景?有什么建议请提出来。

EN

回答 3

Stack Overflow用户

发布于 2020-06-06 03:21:43

从技术上讲,您可以使用ProcessFunction和计时器。

您可以简单地为每个记录注册计时器,并在收到数据时重置计时器。如果你计划计时器在5分钟的处理时间后运行,这基本上意味着如果你还没有收到数据,它将调用函数onTimer,你可以简单地从这个函数发出一些警报。可以重新注册已触发警报的计时器,以允许发出严重程度更高的警报。

请注意,只有在最初所有传感器都正常工作的情况下,这才能起作用。具体地说,它只会对至少出现过一次的键发出警报。但从你的描述来看,它似乎能解决你的问题。

票数 0
EN

Stack Overflow用户

发布于 2020-06-06 16:43:07

我只是碰巧有一个这种模式的例子。它需要一些调整来适应你的用例,但应该能让你上手。

代码语言:javascript
复制
public class TimeoutFunction extends KeyedProcessFunction<String, Event, String> {

    private ValueState<Long> lastModifiedState;
    static final int TIMEOUT = 2 * 60 * 1000; // 2 minutes

    @Override
    public void open(Configuration parameters) throws Exception {

        // register our state with the state backend
        state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", Long.class));
    }

    @Override
    public void processElement(Event event, Context ctx, Collector<String> out) throws Exception {

        // update our state and timer
        Long current = lastModifiedState.value();
        if (current != null) {
            ctx.timerService().deleteEventTimeTimer(current + TIMEOUT);
        }
        current = max(current, event.timestamp());
        lastModifiedState.update(current);
        ctx.timerService().registerEventTimeTimer(current + TIMEOUT);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {

        // emit alert
        String deviceId = ctx.getCurrentKey();
        out.collect(deviceId);
    }
}

这里假设有一个主程序,它执行如下操作:

代码语言:javascript
复制
DataStream<String> result = stream
    .assignTimestampsAndWatermarks(new MyBoundedOutOfOrdernessAssigner(...))
    .keyBy(e -> e.deviceId)
    .process(new TimeoutFunction());

正如@Dominik所说,这只会对至少出现过一次的键发出警报。您可以通过引入次要事件源来修复它,该次要事件源为每个应该存在的源创建一个人工事件,并将该流与主要源合并。

票数 0
EN

Stack Overflow用户

发布于 2020-06-09 22:07:02

模式现在对我来说非常清楚了。我已经实现了这个解决方案,它的效果很不错。

如果有人需要这些代码,我很乐意与大家分享

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/62220729

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档