我想要三个值,它们是aggValueInLastHour,aggValueInLastDay,aggValueInLastThreeDay。
我已经试过了,如下所示。

但是我不想等待,这意味着我不喜欢使用滑动窗口来做聚合。(3天窗口必须等待3天的数据,这对我们的系统来说是无法承受的。)
当第一个事件到来时,如何获取最近3天的聚合值?
感谢您提前给我的建议!
发布于 2018-05-03 20:46:20
如果您想获得更频繁的更新,您可以使用QueryableState,以适合您的用例的速率轮询状态。
发布于 2018-05-03 23:48:33
您可以利用ContinuousEventTimeTrigger,它将使您的窗口触发的时间比整个窗口的触发时间更短,从而允许您查看中间状态。如果接收器的下游使用者希望每个输出都是部分聚合(而不是完整的当前状态)并对它们求和,则可以选择将其包装在PurgingTrigger中。
发布于 2018-05-04 16:45:59
我试过CEP了。

代码:
AfterMatchSkipStrategy strategy = AfterMatchSkipStrategy.skipShortOnes();
Pattern<RiskEvent, ?> loginPattern = Pattern.<RiskEvent>begin("start", strategy)
.where(eventTypeCondition)
.timesOrMore(1)
.greedy()
.within(Time.hours(1));
KeyedStream<RiskEvent, String> keyedStream = dataStream.keyBy(new KeySelector<RiskEvent, String>() {
@Override
public String getKey(RiskEvent riskEvent) throws Exception {
// key by user for aggregation
return riskEvent.getEventType() + riskEvent.getDeviceFp();
}
});
PatternStream<RiskEvent> eventPatternStream = CEP.pattern(keyedStream, loginPattern);
eventPatternStream.select(new PatternSelectFunction<RiskEvent, RiskResult>() {
@Override
public RiskResult select(Map<String, List<RiskEvent>> map) throws Exception {
List<RiskEvent> list = map.get("start");
ArrayList<Long> times = new ArrayList<>();
for (RiskEvent riskEvent : list) {
times.add(riskEvent.getEventTime());
}
Long min = Collections.min(times);
Long max = Collections.max(times);
Set<String> accountList = list.stream().map(RiskEvent::getUserName).collect(Collectors.toSet());
logger.info("时间范围:" + new Date(min) + " --- " + new Date(max) + " 事件:" + list.get(0).getEventType() + ", 设备指纹:" + list.get(0).getDeviceFp() + ", 关联账户:" + accountList.toString());
return null;
}
});也许你注意到了,跳过策略skipShortOnes是一个定制的策略。
向您展示我在CEP lib中的修改。
公共枚举SkipStrategy{ NO_SKIP,SKIP_PAST_LAST_EVENT,SKIP_TO_FIRST,SKIP_TO_LAST,SKIP_SHORT_ONES }
AfterMatchSkipStrategy.java中添加访问方法public static AfterMatchSkipStrategy skipShortOnes() { return new AfterMatchSkipStrategy(SkipStrategy.SKIP_SHORT_ONES);}
NFA.java的discardComputationStatesAccordingToStrategy方法中添加策略操作。case SKIP_SHORT_ONES: int i= 0;List>> tempResult =新ArrayList<>(matchedResult);for (Map> resultMap : tempResult) { if (i++ == 0) { continue;} matchedResult.remove(resultMap);} break;
https://stackoverflow.com/questions/50154525
复制相似问题