我正在和Flink CEP贪婪的运算符进行一场战斗。
给定以下java代码:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
List<String> strings = Arrays.asList("1,3,5,5,5,5,6,".split(","));
DataStream<String> input = env.fromCollection(strings);
Pattern<String, ?> pattern = Pattern.<String>
begin("start").where(new SimpleCondition<String>() {
@Override
public boolean filter(String value) throws Exception {
return value.equals("5");
}
}).oneOrMore().greedy()
.followedBy("end").where(new SimpleCondition<String>() {
@Override
public boolean filter(String value) throws Exception {
return value.equals("6");
}
});
PatternStream<String> patternStream = CEP.pattern(input, pattern);
DataStream<String> result = patternStream.select(new PatternSelectFunction<String, String>() {
@Override
public String select(Map<String, List<String>> pattern) throws Exception {
System.err.println("=======");
pattern.values().forEach(match -> match.forEach(event -> System.err.println(event)));
System.err.println("=======");
return "-";
}
});
result.print();
env.execute("Flink Streaming Java API Skeleton");我希望看到:只发出"5 5 5 6“
但是,它与"5 5 5 6“、"5 5 5 6”、"5 5 6“、"5 6”
如果我这样做了:
begin("start").where(new SimpleCondition<String>() {
@Override
public boolean filter(String value) throws Exception {
return value.equals("3");
}
}).followedBy("middle").where(new SimpleCondition<String>() {
@Override
public boolean filter(String value) throws Exception {
return value.equals("5");
}
}).oneOrMore().greedy()
.followedBy("end").where(new SimpleCondition<String>() {
@Override
public boolean filter(String value) throws Exception {
return value.equals("6");
}
});然而,(因此提供了不同的起始匹配)贪婪操作符通过发出"3 5 5 5 6“来按预期工作。
有没有可能让一个贪婪的匹配者在没有不同的开始模式的情况下抓取所有的匹配?
还是我错过了什么?
斯蒂芬
发布于 2018-05-04 02:03:45
感谢Chesnay Schepler的上述评论:
有一个关于贪婪匹配的已知bug可以解释这种行为: issues.apache.org/jira/browse/FLINK-8914
我暂时会注意到这一点作为答案。
发布于 2019-04-02 11:47:58
要控制将分配给事件的匹配数,您需要指定名为AfterMatchSkipStrategy的跳过策略。
使用Pattern.begin("start",AfterMatchSkipStrategy.skipPastLastEvent())
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
List<String> strings = Arrays.asList("1,3,5,5,5,5,6,".split(","));
DataStream<String> input = env.fromCollection(strings);
Pattern<String, ?> pattern = Pattern.<String>
begin("start", AfterMatchSkipStrategy.skipPastLastEvent()).where(new SimpleCondition<String>() {
@Override
public boolean filter(String value) throws Exception {
return value.equals("5");
}
}).oneOrMore().greedy()
.followedBy("end").where(new SimpleCondition<String>() {
@Override
public boolean filter(String value) throws Exception {
return value.equals("6");
}
});
PatternStream<String> patternStream = CEP.pattern(input, pattern);
DataStream<String> result = patternStream.select(new PatternSelectFunction<String, String>() {
@Override
public String select(Map<String, List<String>> pattern) throws Exception {
System.err.println("=======");
pattern.values().forEach(match -> match.forEach(event -> System.err.println(event)));
System.err.println("=======");
return "-";
}
});
result.print();
env.execute("Flink Streaming Java API Skeleton");https://stackoverflow.com/questions/48028061
复制相似问题