首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >我能和Flink CEP做一场懒惰的比赛吗?

我能和Flink CEP做一场懒惰的比赛吗?
EN

Stack Overflow用户
提问于 2016-07-06 13:30:27
回答 1查看 263关注 0票数 1

我想使用FlinkCEP只在模式上进行“懒惰”匹配。我该怎么做?例如,我有一个输入流ACABCABCB,我希望在一个followedBy C上匹配,只得到3个匹配,而不是6个匹配。

我创建了下面的示例来说明我的问题。

代码语言:javascript
复制
val env = StreamExecutionEnvironment.createLocalEnvironment(1)
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)

case class MyEvent(id: Int, kind: String, value: String)
case class MyAggregatedEvent(id: Int, concatenatedValue: String)

val eventStream = env.fromElements(
  MyEvent(1, "A", "1"), MyEvent(1, "C", "1"),
  MyEvent(1, "A", "2"), MyEvent(1, "B", "1"), MyEvent(1, "C", "2"),
  MyEvent(1, "A", "3"), MyEvent(1, "D", "2"), MyEvent(1, "C", "3"),
  MyEvent(1, "B", "3")
)

val pattern: Pattern[MyEvent, _] = Pattern
  .begin[MyEvent]("pA").where(e => e.kind == "A")
  .next("pC").where(e => e.kind == "C")
  .within(Time.seconds(5))

val patternNextStream: PatternStream[MyEvent] = CEP.pattern(eventStream.keyBy(_.id), pattern)

val outNextStream: DataStream[MyAggregatedEvent] = patternNextStream.flatSelect {
  (pattern: scala.collection.mutable.Map[String, MyEvent], collector: Collector[MyAggregatedEvent]) =>
    val partA = pattern.get("pA").get
    val partC = pattern.get("pC").get

    collector.collect(MyAggregatedEvent(partA.id, partA.value + "=>" + partC.value))
}
outNextStream.print()

env.execute("Experiment")

这给了我以下输出:

MyAggregatedEvent(1,1=>1)

当我将模式更改为:

代码语言:javascript
复制
val pattern: Pattern[MyEvent, _] = Pattern
  .begin[MyEvent]("pA").where(e => e.kind == "A")
  .followedBy("pC").where(e => e.kind == "C")
  .within(Time.seconds(5))

然后打印以下内容:

MyAggregatedEvent(1,1=>1) MyAggregatedEvent(1,1=>2) MyAggregatedEvent(1,2=>2) MyAggregatedEvent(1,1=>3) MyAggregatedEvent(1,2=>3) MyAggregatedEvent(1,3=>3)

如何创建一个只匹配每个事件一次的模式,以便我的输出是:

MyAggregatedEvent(1,1=>1) MyAggregatedEvent(1,2=>2) MyAggregatedEvent(1,3=>3)

EN

回答 1

Stack Overflow用户

发布于 2016-07-07 08:36:34

目前,Flink的CEP库不支持这一点。匹配的语义还不能控制。我认为最好首先添加一个MATCH_ALL和一个匹配的MATCH_FIRST模式。一旦看到完全匹配的序列,MATCH_FIRST就会丢弃所有的中间状态。这应该涵盖您的用例。

票数 1
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/38225286

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档