在过去几天学习Flink CEP库时,我的印象是它没有为Flink的标准功能添加任何新的基本功能。似乎Flink CEP的唯一目的是使事件处理更容易,具有清晰的语义和直观的代码结构。例如,Flink只显示事件匹配跳过的5语义学。虽然这些语义对于很多情况来说都是足够的,但它可能不能解决具体的问题,这使我们回到了普通的Flink。
测试用例是以下模式:
Emmit a alert(represented by 'a') for each non-overlapping pair of numbers in a stream
以模式为代表:
Pattern.begin[EventType]("pair",skipStrategy).where(new AlwaysTrueFunction()).times(2)
因此,对于输入(在流中从左向右输入的数字) 1 1 1 1 1,预期的输出将是a a,但5种匹配跳过策略中没有一种会给出正确的结果:
No-skip: a a a a
Skip-to-next: a a a a
Skip-past-last-event: a a a a
Skip-to-first[1]: a a a a
Skip-to-last[1]: a a a a尽管这些策略无法生成所需的模式,但可以使用带有RichFunction计数器的ValueState来确定何时发出新的警报,从而将输入流转换为事件流。
因此,我希望就这些问题作出一些说明:
发布于 2020-03-05 17:57:45
谢谢你和Flink CEP一起玩。
Flink CEP是Flink之上的一个库。因此,它没有添加任何无法使用香草链接(ProcessFunctions等)实现的功能。实际上,它是作为一个特殊的操作符实现的,它正在检查与特定模式匹配的元素,它的许多功能可能甚至可以实现为一个ProcessFunction (有很多工具)。
尽管如此,Flink CEP可能不会添加无法用普通Flink实现的功能,但它增加了表现性,从而使一些使用程序更易于实现。其他API也是如此,例如Flink中的窗口API,您可以使用ProcessFunctions实现它(有很多工具)。
现在说到效率,答案是“这取决于”。手工制作一个针对您的usecase的特殊过程函数,并为您的工作负载进行所有优化,可以比FlinkCEP更有效,因为后者是一个通用库。如果您有专门知识和时间,那么最佳的解决方案总是使用两者(CEP和vanilla Flink)来实现PoCs,并为您的情况选择最有效的方法。
https://stackoverflow.com/questions/60537257
复制相似问题