我试图在ConfluentCloud上的流上进行窗口聚合。但我没能得到预期的结果。桌子就像改变了的样子。
我有一个名为“会话”的主题,我创建了一个基于“会话”主题的流。
流脚本:
CREATE OR REPLACE STREAM sessions_stream (
requestId VARCHAR,
type VARCHAR,
custId VARCHAR,
channelCode VARCHAR
) WITH (
KAFKA_TOPIC = 'sessions',
VALUE_FORMAT = 'JSON'
);然后我制作了一张有翻滚窗户的桌子。
表脚本:
CREATE OR REPLACE TABLE agg_sessions
AS SELECT
REQUESTID REQUESTID,
LATEST_BY_OFFSET(CUSTID) CUSTID,
LATEST_BY_OFFSET(CHANNELCODE) CHANNELCODE,
COLLECT_LIST(TYPE, ',') TYPES
FROM sessions_stream
WINDOW TUMBLING (SIZE 1 MINUTES)
GROUP BY agg_sessions.REQUESTID
EMIT CHANGES;这些都是我制作的记录到会话的主题;
{"requestId": "232", "type": "trial1", "custId": "1234", "channelCode": "branch1"}
{"requestId": "232", "type": "trial2", "custId": "1234", "channelCode": "branch1"}
{"requestId": "232", "type": "trial3", "custId": "1234", "channelCode": "branch1"}
{"requestId": "232", "type": "trial4", "custId": "1234", "channelCode": "branch1"}我从agg_sessions获得了以下结果:
{ "CUSTID": "1234", "CHANNELCODE": "branch1", "TYPES": [ "trial1," ]}
{ "CUSTID": "1234", "CHANNELCODE": "branch1", "TYPES": [ "trial1,","trial2," ]}
{ "CUSTID": "1234", "CHANNELCODE": "branch1", "TYPES": [ "trial1,","trial2,","trial3," ]}
{ "CUSTID": "1234", "CHANNELCODE": "branch1", "TYPES": [ "trial1,","trial2,","trial3,","trial4," ]}但是,我希望在输出表/主题"agg_sessions“上为每个REQUESTID创建一个记录。只有这一点:
{ "CUSTID": "1234", "CHANNELCODE": "branch1", "TYPES": [ "trial1,","trial2,","trial3,","trial4," ]}我该怎么做呢?有谁可以帮我?你能为解决方案展示不同的视角吗?谢谢你的回答。
发布于 2022-08-12 14:52:55
CREATE OR REPLACE TABLE agg_sessions
AS SELECT
requestId,
LATEST_BY_OFFSET(custid) CUSTID,
LATEST_BY_OFFSET(channelCode) CHANNELCODE,
COLLECT_LIST(type) TYPES
FROM sessions_stream
WINDOW TUMBLING (SIZE 1 MINUTES)
GROUP BY requestId
EMIT CHANGES;

https://stackoverflow.com/questions/72128703
复制相似问题