首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >基于apache的有状态复杂事件处理

基于apache的有状态复杂事件处理
EN

Stack Overflow用户
提问于 2017-09-12 13:12:57
回答 1查看 407关注 0票数 1

我希望根据两个具有相同标识符的事件来检测两个事件是否在定义的时间框架内发生。例如,DoorEvent如下所示:

代码语言:javascript
复制
<doorevent>
  <door>
    <id>1</id>
    <status>open</status>
  </door>
  <timestamp>12345679</timestamp>
</doorevent> 

<doorevent>
  <door>
    <id>1</id>
    <status>close</status>
  </door>
  <timestamp>23456790</timestamp>
</doorevent>

下面示例中的DoorEvent java类具有相同的结构。

我想在打开后5分钟内发现id 1关闭的那扇门。为此,我尝试使用Apache库。传入的流包含来自20个门的所有打开和关闭消息。

代码语言:javascript
复制
Pattern<String, ?> pattern = Pattern.<String>begin("door_open").where(
    new SimpleCondition<String>() {
        private static final long serialVersionUID = 1L;
        public boolean filter(String doorevent) {
            DoorEvent event = new DoorEvent().parseInstance(doorevent, DataType.XML);
            if (event.getDoor().getStatus().equals("open")){
                // save state of door as open
                return true;
            }
            return false;                           
        }
    }
)
.followedByAny("door_close").where(
    new SimpleCondition<String>() {
            private static final long serialVersionUID = 1L;
            public boolean filter(String doorevent) throws JsonParseException, JsonMappingException, IOException {
                DoorEvent event = new DoorEvent().parseInstance(doorevent, DataType.XML);
                if (event.getDoor().getStatus().equals("close")){
                    // check if close is of previously opened door
                    return true;
                }
                return false;
            }
        }
)
.within(Time.minutes(5));

如何将门1的状态保存为在door_open中打开,以便在door_close步骤中,我知道门1是关闭的,而不是其他的门?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2017-09-12 20:42:31

如果您有Flink 1.3.0及以上,那么您想要做的事情非常直接。

您的模式应该如下所示:

代码语言:javascript
复制
Pattern.<DoorEvent>begin("first")
        .where(new SimpleCondition<DoorEvent>() {
          private static final long serialVersionUID = 1390448281048961616L;

          @Override
          public boolean filter(DoorEvent event) throws Exception {
            return event.getDoor().getStatus().equals("open");
          }
        })
        .followedBy("second")
        .where(new IterativeCondition<DoorEvent>() {
          private static final long serialVersionUID = -9216505110246259082L;

          @Override
          public boolean filter(DoorEvent secondEvent, Context<DoorEvent> ctx) throws Exception {

            if (!secondEvent.getDoor().getStatus().equals("close")) {
              return false;
            }

            for (DoorEvent firstEvent : ctx.getEventsForPattern("first")) {
              if (secondEvent.getDoor().getEventID().equals(firstEvent.getDoor().getEventId())) {
                return true;
              }
            }
            return false;
          }
        })
        .within(Time.minutes(5));

因此,基本上您可以使用IterativeConditions并获得匹配的第一种模式的上下文,并在该列表上进行迭代,同时对所需的模式进行比较,并按您的需要继续进行。

IterativeConditions很昂贵,应该相应地处理。

有关条件的更多信息,请在Flink -条件这里查看

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/46177415

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档