首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >如何在使用Flink-SQL连接的3个Kafka主题生成的临时视图上使用窗口聚合?

如何在使用Flink-SQL连接的3个Kafka主题生成的临时视图上使用窗口聚合?
EN

Stack Overflow用户
提问于 2021-08-09 15:30:18
回答 1查看 256关注 0票数 0

这是我的要求,有三个卡夫卡主题:主题1-设备和主题2-消费者和主题3-订单。我们将使用flink SQL计算过去12小时内每个设备的订单(数量)。我做了以下事情:

  1. Regiter 3表对应3个kafka主题.

代码语言:javascript
复制
// java code 
String creConsumer = "CREATE TABLE " + consumerTable + " (" +
                    " deviceId STRING" +
                    ",deviceFingerprintHash STRING" +
                    ",consumer ROW(consumerUuid STRING)" +
                    ",eventInfo ROW<eventTime BIGINT>" +
                    ",id BIGINT" +
                    ",ts AS TO_TIMESTAMP(FROM_UNIXTIME(eventInfo.eventTime/1000, 'yyyy-MM-dd HH:mm:ss'))" +
                    ",WATERMARK FOR ts AS ts - INTERVAL '5' SECOND" +
                ") WITH (...)";

String createToken = "CREATE TABLE " + orderTokenTable + " (" +
                    "sessionId BIGINT" +
                    ",token STRING" +
                    ",eventInfo ROW(eventTime BIGINT)" +
                    ",ts AS TO_TIMESTAMP(FROM_UNIXTIME(eventInfo.eventTime/1000, 'yyyy-MM-dd HH:mm:ss'))" +
                    ",WATERMARK FOR ts AS ts - INTERVAL '5' SECOND" +
                ") WITH (...)";

String createTransaction = "CREATE TABLE " + orderTransactionTable + " (" +
                    "orderTransactionId BIGINT" +
                    ",consumer ROW(`consumerUuid` STRING)" +
                    ",token STRING" +
                    ",countryCode STRING" +
                    ",consumerTotalAmount ROW<amount STRING>" +
                    ",status STRING" +
                    ",eventInfo ROW<eventTime BIGINT>" +
                    ",ts AS TO_TIMESTAMP(FROM_UNIXTIME(eventInfo.eventTime/1000, 'yyyy-MM-dd HH:mm:ss'))" +
                    ",WATERMARK FOR ts AS withOffset(ts,1000)" +
                ") WITH (...)";

  1. 加入三个表并生成一个视图:

代码语言:javascript
复制
 // java code
 String createWideTable = "CREATE VIEW view_order_consumer AS " +
                "SELECT " +
                    " otc.eventTime " +
                    ",otc.orderTransactionId " +
                    ",otc.token" +
                    ",otc.consumerUuid " +
                    ",otc.countryCode " +
                    ",CAST(otc.amount AS DOUBLE) AS amount " +
                    ",otc.status " +
                    ",csc.deviceId " +
                    ",csc.deviceFingerprintHash " +
                    ",otc.ts " +
                "FROM " +
                    "   order_transaction_completed otc " +
                "INNER JOIN order_token_added ota " +
                    "   ON (otc.token=ota.token AND otc.ts BETWEEN ota.ts - INTERVAL '10' DAY AND ota.ts + INTERVAL '10' DAY)" +
                "INNER JOIN consumer_session_created csc " +
                    "   ON (ota.sessionId=csc.id AND csc.ts BETWEEN otc.ts - INTERVAL '10' DAY AND otc.ts) ";

  1. 使用window进行聚合工作(Hop窗口):

代码语言:javascript
复制
// SQL
select deviceId
     ,HOP_START(voc.ts, INTERVAL '5' SECOND , INTERVAL '10' SECOND)
     ,count(1) as cnt
from consumer_session_created as voc
group by HOP(voc.ts, INTERVAL '5' SECOND , INTERVAL '10' SECOND)
       ,deviceId

  1. 输出结果表数据:

代码语言:javascript
复制
// java
DataStream<Tuple2<Boolean, Row>> retractResultStream = tableEnvironment
                        .toRetractStream(table, Row.class);
                retractResultStream.print();

但是,我无法获得任何结果(没有错误消息)。将sql更改为:

代码语言:javascript
复制
// SQL
select * from view_order_consumer

结果:

代码语言:javascript
复制
6> (true,1628564685939,100100113280,002.qa2dtem5k6umlokop1boud4c8p77c9lhclb8i5ug0na383ed,94e44b95-223b-4479-82b9-b4f710f7f8c3,US,10.0,APPROVED,740baadd20e544e8bdcdc8d2a76cbdc9,c718225f5f1d4876ffc1ce2bb5ab3852,2021-08-10T11:04:45)
6> (true,1628564687358,100100113280,002.qa2dtem5k6umlokop1boud4c8p77c9lhclb8i5ug0na383ed,94e44b95-223b-4479-82b9-b4f710f7f8c3,US,11.0,APPROVED,740baadd20e544e8bdcdc8d2a76cbdc9,c718225f5f1d4876ffc1ce2bb5ab3852,2021-08-10T11:04:47)
6> (true,1628564688364,100100113280,002.qa2dtem5k6umlokop1boud4c8p77c9lhclb8i5ug0na383ed,94e44b95-223b-4479-82b9-b4f710f7f8c3,US,12.0,APPROVED,740baadd20e544e8bdcdc8d2a76cbdc9,c718225f5f1d4876ffc1ce2bb5ab3852,2021-08-10T11:04:48)

经过一些研究,我发现在Flink中,在加入运算符之后,时间-attr将被删除.

有人能告诉我如何使用FLink-sql获得正确的数据吗?

EN

回答 1

Stack Overflow用户

发布于 2021-08-10 19:05:45

我不知道出了什么问题,但以下是一些想法:

FWIW,使用交互工具(如Flink附带的SQL客户端)或内置于Ververica平台的工具更容易迭代和调试这些情况。

你可以用“解释.”若要查看为查询生成的计划,请执行以下操作。这常常提供一些线索。

我不知道WATERMARK FOR ts AS withOffset(ts,1000)会带来什么。你确定那有效吗?在大多数情况下,Flink作业或SQL查询不会产生任何结果,其原因最终是水印的问题。

我试着拆开3路连接,然后做两个连接的序列。这可能会产生一个不同的计划,可能是一个更好的计划。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/68714726

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档