首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在Fink中实时上报数值?

如何在Fink中实时上报数值?
EN

Stack Overflow用户
提问于 2018-05-03 19:55:19
回答 3查看 118关注 0票数 0

我想要三个值,它们是aggValueInLastHouraggValueInLastDayaggValueInLastThreeDay

我已经试过了,如下所示。

但是我不想等待,这意味着我不喜欢使用滑动窗口来做聚合。(3天窗口必须等待3天的数据,这对我们的系统来说是无法承受的。)

当第一个事件到来时,如何获取最近3天的聚合值?

感谢您提前给我的建议!

EN

回答 3

Stack Overflow用户

发布于 2018-05-03 20:46:20

如果您想获得更频繁的更新,您可以使用QueryableState,以适合您的用例的速率轮询状态。

票数 0
EN

Stack Overflow用户

发布于 2018-05-03 23:48:33

您可以利用ContinuousEventTimeTrigger,它将使您的窗口触发的时间比整个窗口的触发时间更短,从而允许您查看中间状态。如果接收器的下游使用者希望每个输出都是部分聚合(而不是完整的当前状态)并对它们求和,则可以选择将其包装在PurgingTrigger中。

票数 0
EN

Stack Overflow用户

发布于 2018-05-04 16:45:59

我试过CEP了。

代码:

代码语言:javascript
复制
AfterMatchSkipStrategy strategy = AfterMatchSkipStrategy.skipShortOnes();
    Pattern<RiskEvent, ?> loginPattern = Pattern.<RiskEvent>begin("start", strategy)
            .where(eventTypeCondition)
            .timesOrMore(1)
            .greedy()
            .within(Time.hours(1));


    KeyedStream<RiskEvent, String> keyedStream = dataStream.keyBy(new KeySelector<RiskEvent, String>() {
        @Override
        public String getKey(RiskEvent riskEvent) throws Exception {
            // key by user for aggregation
            return riskEvent.getEventType() + riskEvent.getDeviceFp();
        }
    });
    PatternStream<RiskEvent> eventPatternStream = CEP.pattern(keyedStream, loginPattern);

    eventPatternStream.select(new PatternSelectFunction<RiskEvent, RiskResult>() {
        @Override
        public RiskResult select(Map<String, List<RiskEvent>> map) throws Exception {
            List<RiskEvent> list = map.get("start");

            ArrayList<Long> times = new ArrayList<>();
            for (RiskEvent riskEvent : list) {
                times.add(riskEvent.getEventTime());
            }
            Long min = Collections.min(times);
            Long max = Collections.max(times);

            Set<String> accountList = list.stream().map(RiskEvent::getUserName).collect(Collectors.toSet());
            logger.info("时间范围:" + new Date(min) + " --- " + new Date(max) + " 事件:" + list.get(0).getEventType() + ", 设备指纹:" + list.get(0).getDeviceFp() + ", 关联账户:" + accountList.toString());
            return null;
        }
    });

也许你注意到了,跳过策略skipShortOnes是一个定制的策略。

向您展示我在CEP lib中的修改。

  1. 在枚举中添加策略。

公共枚举SkipStrategy{ NO_SKIP,SKIP_PAST_LAST_EVENT,SKIP_TO_FIRST,SKIP_TO_LAST,SKIP_SHORT_ONES }

  • AfterMatchSkipStrategy.java中添加访问方法

public static AfterMatchSkipStrategy skipShortOnes() { return new AfterMatchSkipStrategy(SkipStrategy.SKIP_SHORT_ONES);}

  • NFA.javadiscardComputationStatesAccordingToStrategy方法中添加策略操作。

case SKIP_SHORT_ONES: int i= 0;List>> tempResult =新ArrayList<>(matchedResult);for (Map> resultMap : tempResult) { if (i++ == 0) { continue;} matchedResult.remove(resultMap);} break;

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/50154525

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档