我希望根据两个具有相同标识符的事件来检测两个事件是否在定义的时间框架内发生。例如,DoorEvent如下所示:
<doorevent>
<door>
<id>1</id>
<status>open</status>
</door>
<timestamp>12345679</timestamp>
</doorevent>
<doorevent>
<door>
<id>1</id>
<status>close</status>
</door>
<timestamp>23456790</timestamp>
</doorevent>下面示例中的DoorEvent java类具有相同的结构。
我想在打开后5分钟内发现id 1关闭的那扇门。为此,我尝试使用Apache库。传入的流包含来自20个门的所有打开和关闭消息。
Pattern<String, ?> pattern = Pattern.<String>begin("door_open").where(
new SimpleCondition<String>() {
private static final long serialVersionUID = 1L;
public boolean filter(String doorevent) {
DoorEvent event = new DoorEvent().parseInstance(doorevent, DataType.XML);
if (event.getDoor().getStatus().equals("open")){
// save state of door as open
return true;
}
return false;
}
}
)
.followedByAny("door_close").where(
new SimpleCondition<String>() {
private static final long serialVersionUID = 1L;
public boolean filter(String doorevent) throws JsonParseException, JsonMappingException, IOException {
DoorEvent event = new DoorEvent().parseInstance(doorevent, DataType.XML);
if (event.getDoor().getStatus().equals("close")){
// check if close is of previously opened door
return true;
}
return false;
}
}
)
.within(Time.minutes(5));如何将门1的状态保存为在door_open中打开,以便在door_close步骤中,我知道门1是关闭的,而不是其他的门?
发布于 2017-09-12 20:42:31
如果您有Flink 1.3.0及以上,那么您想要做的事情非常直接。
您的模式应该如下所示:
Pattern.<DoorEvent>begin("first")
.where(new SimpleCondition<DoorEvent>() {
private static final long serialVersionUID = 1390448281048961616L;
@Override
public boolean filter(DoorEvent event) throws Exception {
return event.getDoor().getStatus().equals("open");
}
})
.followedBy("second")
.where(new IterativeCondition<DoorEvent>() {
private static final long serialVersionUID = -9216505110246259082L;
@Override
public boolean filter(DoorEvent secondEvent, Context<DoorEvent> ctx) throws Exception {
if (!secondEvent.getDoor().getStatus().equals("close")) {
return false;
}
for (DoorEvent firstEvent : ctx.getEventsForPattern("first")) {
if (secondEvent.getDoor().getEventID().equals(firstEvent.getDoor().getEventId())) {
return true;
}
}
return false;
}
})
.within(Time.minutes(5));因此,基本上您可以使用IterativeConditions并获得匹配的第一种模式的上下文,并在该列表上进行迭代,同时对所需的模式进行比较,并按您的需要继续进行。
IterativeConditions很昂贵,应该相应地处理。
有关条件的更多信息,请在Flink -条件这里查看
https://stackoverflow.com/questions/46177415
复制相似问题