首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >基于时间的siddhi数据融合

基于时间的siddhi数据融合
EN

Stack Overflow用户
提问于 2019-03-11 15:43:46
回答 1查看 154关注 0票数 0

我正在尝试根据时间窗口聚合传感器数据,并在达到30秒窗口(汇总)后将其写入Cassandra。

例如,名为"temp“的传感器在30秒内发送3个读数。我喜欢获取该传感器在过去30秒内的平均值,并在window完成时将平均值写入Cassandra。

这是我的代码

代码语言:javascript
复制
BasicConfigurator.configure();


        // Create Siddhi Application
        String siddhiApp = "define stream SensorEventStream (sensorid string, value double); " +
                " " +
                "@info(name = 'query1') " +
                "from SensorEventStream#window.time(30 sec)  " +
                "select sensorid, avg(value) as value " +
                "group by sensorid " +
                "insert into AggregateSensorEventStream ;";

        // Creating Siddhi Manager
        SiddhiManager siddhiManager = new SiddhiManager();

        //Generating runtime
        SiddhiAppRuntime siddhiAppRuntime = siddhiManager.createSiddhiAppRuntime(siddhiApp);

        //Adding callback to retrieve output events from query
        siddhiAppRuntime.addCallback("AggregateSensorEventStream", new StreamCallback() {


            @Override
            public void receive(org.wso2.siddhi.core.event.Event[] events) {
                 EventPrinter.print(events);
            }
        });

        //Retrieving input handler to push events into Siddhi
        InputHandler inputHandler = siddhiAppRuntime.getInputHandler("SensorEventStream");

        //Starting event processing
        siddhiAppRuntime.start();

        //Sending events to Siddhi
        inputHandler.send(new Object[]{"Temp", 26d});
        Thread.sleep(1000);
        inputHandler.send(new Object[]{"Temp", 25d});
        Thread.sleep(1000);
        inputHandler.send(new Object[]{"Temp", 24d});
        Thread.sleep(60000);
        inputHandler.send(new Object[]{"Temp", 23d});

        //Shutting down the runtime
        siddhiAppRuntime.shutdown();

        //Shutting down Siddhi
        siddhiManager.shutdown();

输出结果如下所示

代码语言:javascript
复制
0 [main] INFO org.wso2.siddhi.core.util.EventPrinter  - [Event{timestamp=1552281656960, data=[Temp, 26.0], isExpired=false}]
1002 [main] INFO org.wso2.siddhi.core.util.EventPrinter  - [Event{timestamp=1552281657971, data=[Temp, 25.5], isExpired=false}]
2003 [main] INFO org.wso2.siddhi.core.util.EventPrinter  - [Event{timestamp=1552281658972, data=[Temp, 25.0], isExpired=false}]
62004 [main] INFO org.wso2.siddhi.core.util.EventPrinter  - [Event{timestamp=1552281718972, data=[Temp, 23.0], isExpired=false}]

从这个演示代码中,我看到它立即发送3个事件的第一个temp的平均值,在30秒窗口之后,它没有做任何事情。然后打印23。

当窗口在30秒后滚动时,我如何获得通知?我想这就是接收函数的作用。

我不确定我是否误解了这里的功能。对于siddi,这是可能的吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2019-03-11 23:35:09

这是预期的行为,窗口是一个滑动窗口。在这里,当第一个事件到来时,第一秒,窗口只保存第一个事件,所以平均值是26。然后,当第二个事件到达时,窗口具有26d和25d,然后是25.5中的平均值。同样,第三秒平均25d。然后,在31秒、32秒和33秒时,这些事件将从窗口过期。因此,当您的第四个事件到来时(63秒),窗口中只有最新的事件,因此average将是值本身。此窗口在事件到达时立即计算平均值,具体取决于它之前30秒内接收的事件。

从你的问题看,你似乎想要timeBatch窗口。这里,平均值仅在批次结束时计算。例如,在这种情况下,第30秒、第60秒、第90秒等等。有关示例,请参阅timeBatch文档。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/55097216

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档