我在Flink SQL中使用CEP模式,它正在按预期工作,连接到Kafka broker。但是当我连接到基于集群的云kafka设置时,Flink CEP并没有触发。以下是我的sql:
create table agent_action_detail
(
agent_id String,
room_id String,
create_time Bigint,
call_type String,
application_id String,
connect_time Bigint,
row_time TIMESTAMP_LTZ(3), WATERMARK for row_time as row_time - INTERVAL '1' MINUTE)
with ('connector'='kafka', 'topic'='agent-action-detail', ...)然后,我以json格式发送消息,如
{"agent_id":"agent_221","room_id":"room1","create_time":1635206828877,"call_type":"inbound","application_id":"app1","connect_time":1635206501735,"row_time":"2021-10-25 16:07:09.019Z"}在flink web用户界面中,水印可以很好地工作flink网络用户界面。
我运行cep sql:
select * from agent_action_detail
match_recognize(
partition by agent_id
order by row_time
measures
last(BF.create_time) as create_time,
first(AF.connect_time) as connect_time
one row per match AFTER MATCH SKIP PAST LAST ROW
pattern (BF+ AF) define BF as BF.connect_time > 0 ,AF as AF.connect_time > 0
)每一条卡夫卡消息,connect_time都是> 0,但flink没有触发。有人能帮忙解决这个问题吗?谢谢!
select * from agent_action_detail match_recognize( partition by agent_id order by row_time measures AF.connect_time as connect_time one row per match pattern (BF AF) WITHIN INTERVAL '1' second define BF as (last(BF.connect_time, 1) < 1), AF as AF.connect_time >= 100)下面是另一个cep sql仍然不能工作。agent_action_detail表由另一个flink插入,如
insert into agent_action_detail select data.agent_id, data.room_id, data.create_time, data.call_type, data.application_id, data.connect_time, now() from source_table where type = 'xxx'发布于 2021-10-25 19:15:35
有几件事情会导致模式匹配而不产生结果:
这种特殊的模式循环没有退出条件。这种模式不允许模式匹配引擎的内部状态被清除,这将导致问题。
如果您直接使用Flink,我会告诉您尝试添加until(condition)或within(time)来限制可能匹配的数量。
使用MATCH_RECOGNIZE,查看是否可以向模式添加一个不同的终止元素。
更新:由于在修改模式后仍然没有得到任何结果,所以您应该确定水印是否是问题的根源。CEP依赖于按时间对输入流进行排序,这取决于水印--但前提是您使用的是事件时间。
测试这一点的最简单方法是切换到使用处理时间:
create table agent_action_detail
(
agent_id String,
...
row_time AS PROCTIME()
)
with (...)如果这样做有效,那么要么是时间戳,要么是水印。例如,如果所有事件都延迟了,则不会得到任何结果。在您的例子中,我想知道row_time列中有任何数据。
如果这没有揭示问题,请分享一个最小的可重复的例子,包括观察问题所需的数据。
https://stackoverflow.com/questions/69711674
复制相似问题