首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >不触发flink cep sql事件

不触发flink cep sql事件
EN

Stack Overflow用户
提问于 2021-10-25 16:30:10
回答 1查看 132关注 0票数 0

我在Flink SQL中使用CEP模式,它正在按预期工作,连接到Kafka broker。但是当我连接到基于集群的云kafka设置时,Flink CEP并没有触发。以下是我的sql:

代码语言:javascript
复制
create table agent_action_detail 
(
    agent_id String, 
    room_id String, 
    create_time Bigint, 
    call_type String, 
    application_id String, 
    connect_time Bigint, 
    row_time TIMESTAMP_LTZ(3), WATERMARK for row_time as row_time  - INTERVAL '1' MINUTE) 
with ('connector'='kafka', 'topic'='agent-action-detail', ...)

然后,我以json格式发送消息,如

代码语言:javascript
复制
{"agent_id":"agent_221","room_id":"room1","create_time":1635206828877,"call_type":"inbound","application_id":"app1","connect_time":1635206501735,"row_time":"2021-10-25 16:07:09.019Z"}

在flink web用户界面中,水印可以很好地工作flink网络用户界面

我运行cep sql:

代码语言:javascript
复制
select * from agent_action_detail
 match_recognize(
    partition by agent_id 
    order by row_time 
    measures 
        last(BF.create_time) as create_time, 
        first(AF.connect_time) as connect_time 
    one row per match AFTER MATCH SKIP PAST LAST ROW 
    pattern (BF+ AF) define BF as BF.connect_time > 0 ,AF as AF.connect_time > 0
 )

每一条卡夫卡消息,connect_time都是> 0,但flink没有触发。有人能帮忙解决这个问题吗?谢谢!

代码语言:javascript
复制
select * from agent_action_detail match_recognize( partition by agent_id order by row_time  measures AF.connect_time as connect_time one row per match pattern (BF AF) WITHIN INTERVAL '1' second define BF as (last(BF.connect_time, 1) < 1), AF as AF.connect_time >= 100)

下面是另一个cep sql仍然不能工作。agent_action_detail表由另一个flink插入,如

代码语言:javascript
复制
insert into agent_action_detail select data.agent_id, data.room_id, data.create_time, data.call_type, data.application_id, data.connect_time, now() from source_table where type = 'xxx'
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-10-25 19:15:35

有几件事情会导致模式匹配而不产生结果:

  • 输入实际上并不包含模式
  • 水印做得不对
  • 这种模式在某种程度上是病态的。

这种特殊的模式循环没有退出条件。这种模式不允许模式匹配引擎的内部状态被清除,这将导致问题。

如果您直接使用Flink,我会告诉您尝试添加until(condition)within(time)来限制可能匹配的数量。

使用MATCH_RECOGNIZE,查看是否可以向模式添加一个不同的终止元素。

更新:由于在修改模式后仍然没有得到任何结果,所以您应该确定水印是否是问题的根源。CEP依赖于按时间对输入流进行排序,这取决于水印--但前提是您使用的是事件时间。

测试这一点的最简单方法是切换到使用处理时间:

代码语言:javascript
复制
create table agent_action_detail 
(
    agent_id String, 
    ...
    row_time AS PROCTIME()
)
with (...)

如果这样做有效,那么要么是时间戳,要么是水印。例如,如果所有事件都延迟了,则不会得到任何结果。在您的例子中,我想知道row_time列中有任何数据。

如果这没有揭示问题,请分享一个最小的可重复的例子,包括观察问题所需的数据。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69711674

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档