我需要计算一天中A发生的次数,以及15分钟内B发生的次数。流可以是A1、A2、B1、B2、A3、B3、B4、B5、A4、A5、A6、A7、B6。在我的例子中,事件结果是A2,B1 A3,B3 A7,B6。当匹配发生时,我需要接收实时结果。我有点累了。我认为只有使用flink cep才能做到这一点。但是flink-sql-cep不支持聚合。它只计算已发生事件。在这种情况下,如何使用单个SQL完成此任务。
我试了两个步骤,我先用flink sql cep来匹配,然后再沉到kafka。在步骤中,我使用前kafka和使用窗口聚合。
第一步:选择引脚作为引脚,' first -step‘作为result_id,cast(order_amount as varchar)作为result_value,event_time作为result_time from stra_dtpipeline MATCH_RECOGNIZE (按引脚分区
按event_time度量排序
t1.pin作为引脚,'1‘作为order_amount,LOCALTIMESTAMP作为event_time在匹配后每匹配一行跳到间隔'30’秒内的下一行模式(t1 t2
定义
t1 as t1.act_type='100001‘,t2 as t2.act_type='100002’)第二步: select pin,'job5‘as result_id,cast(sum(1) over (PARTITION by PARTITION,'%Y%m%d') as VARCHAR)前一天的间隔'1’与当前行之间的order BY event_time行) as VARCHAR) as VARCHAR,CURRENT_TIMESTAMP as result_time from stra_dtpipeline_mid where stra_dtpipeline_mid_id=‘first-step’和DAYOFMONTH(CURRENT_DATE)=DAYOFMONTH(event_time)
我希望用一个SQL来完成这项任务。
发布于 2019-10-14 16:05:33
您可以使用子查询或视图将这两个查询组合为一个查询。
那就像这样
SELECT a, b OVER (...) ORDER BY event_time FROM (SELECT x, y MATCH_RECOGNIZE ...) WHERE ...或
CREATE VIEW pattern AS SELECT x, y MATCH_RECOGNIZE ...
SELECT ... FROM pattern WHERE ...https://stackoverflow.com/questions/58350758
复制相似问题