这是我的要求,有三个卡夫卡主题:主题1-设备和主题2-消费者和主题3-订单。我们将使用flink SQL计算过去12小时内每个设备的订单(数量)。我做了以下事情:
// java code
String creConsumer = "CREATE TABLE " + consumerTable + " (" +
" deviceId STRING" +
",deviceFingerprintHash STRING" +
",consumer ROW(consumerUuid STRING)" +
",eventInfo ROW<eventTime BIGINT>" +
",id BIGINT" +
",ts AS TO_TIMESTAMP(FROM_UNIXTIME(eventInfo.eventTime/1000, 'yyyy-MM-dd HH:mm:ss'))" +
",WATERMARK FOR ts AS ts - INTERVAL '5' SECOND" +
") WITH (...)";
String createToken = "CREATE TABLE " + orderTokenTable + " (" +
"sessionId BIGINT" +
",token STRING" +
",eventInfo ROW(eventTime BIGINT)" +
",ts AS TO_TIMESTAMP(FROM_UNIXTIME(eventInfo.eventTime/1000, 'yyyy-MM-dd HH:mm:ss'))" +
",WATERMARK FOR ts AS ts - INTERVAL '5' SECOND" +
") WITH (...)";
String createTransaction = "CREATE TABLE " + orderTransactionTable + " (" +
"orderTransactionId BIGINT" +
",consumer ROW(`consumerUuid` STRING)" +
",token STRING" +
",countryCode STRING" +
",consumerTotalAmount ROW<amount STRING>" +
",status STRING" +
",eventInfo ROW<eventTime BIGINT>" +
",ts AS TO_TIMESTAMP(FROM_UNIXTIME(eventInfo.eventTime/1000, 'yyyy-MM-dd HH:mm:ss'))" +
",WATERMARK FOR ts AS withOffset(ts,1000)" +
") WITH (...)"; // java code
String createWideTable = "CREATE VIEW view_order_consumer AS " +
"SELECT " +
" otc.eventTime " +
",otc.orderTransactionId " +
",otc.token" +
",otc.consumerUuid " +
",otc.countryCode " +
",CAST(otc.amount AS DOUBLE) AS amount " +
",otc.status " +
",csc.deviceId " +
",csc.deviceFingerprintHash " +
",otc.ts " +
"FROM " +
" order_transaction_completed otc " +
"INNER JOIN order_token_added ota " +
" ON (otc.token=ota.token AND otc.ts BETWEEN ota.ts - INTERVAL '10' DAY AND ota.ts + INTERVAL '10' DAY)" +
"INNER JOIN consumer_session_created csc " +
" ON (ota.sessionId=csc.id AND csc.ts BETWEEN otc.ts - INTERVAL '10' DAY AND otc.ts) ";// SQL
select deviceId
,HOP_START(voc.ts, INTERVAL '5' SECOND , INTERVAL '10' SECOND)
,count(1) as cnt
from consumer_session_created as voc
group by HOP(voc.ts, INTERVAL '5' SECOND , INTERVAL '10' SECOND)
,deviceId// java
DataStream<Tuple2<Boolean, Row>> retractResultStream = tableEnvironment
.toRetractStream(table, Row.class);
retractResultStream.print();但是,我无法获得任何结果(没有错误消息)。将sql更改为:
// SQL
select * from view_order_consumer结果:
6> (true,1628564685939,100100113280,002.qa2dtem5k6umlokop1boud4c8p77c9lhclb8i5ug0na383ed,94e44b95-223b-4479-82b9-b4f710f7f8c3,US,10.0,APPROVED,740baadd20e544e8bdcdc8d2a76cbdc9,c718225f5f1d4876ffc1ce2bb5ab3852,2021-08-10T11:04:45)
6> (true,1628564687358,100100113280,002.qa2dtem5k6umlokop1boud4c8p77c9lhclb8i5ug0na383ed,94e44b95-223b-4479-82b9-b4f710f7f8c3,US,11.0,APPROVED,740baadd20e544e8bdcdc8d2a76cbdc9,c718225f5f1d4876ffc1ce2bb5ab3852,2021-08-10T11:04:47)
6> (true,1628564688364,100100113280,002.qa2dtem5k6umlokop1boud4c8p77c9lhclb8i5ug0na383ed,94e44b95-223b-4479-82b9-b4f710f7f8c3,US,12.0,APPROVED,740baadd20e544e8bdcdc8d2a76cbdc9,c718225f5f1d4876ffc1ce2bb5ab3852,2021-08-10T11:04:48)经过一些研究,我发现在Flink中,在加入运算符之后,时间-attr将被删除.
有人能告诉我如何使用FLink-sql获得正确的数据吗?
发布于 2021-08-10 19:05:45
我不知道出了什么问题,但以下是一些想法:
FWIW,使用交互工具(如Flink附带的SQL客户端)或内置于Ververica平台的工具更容易迭代和调试这些情况。
你可以用“解释.”若要查看为查询生成的计划,请执行以下操作。这常常提供一些线索。
我不知道WATERMARK FOR ts AS withOffset(ts,1000)会带来什么。你确定那有效吗?在大多数情况下,Flink作业或SQL查询不会产生任何结果,其原因最终是水印的问题。
我试着拆开3路连接,然后做两个连接的序列。这可能会产生一个不同的计划,可能是一个更好的计划。
https://stackoverflow.com/questions/68714726
复制相似问题