我们正在尝试运行一个FlinkSQL查询,该查询应用了一些重复,然后窗口并聚合了该去复制的结果,但是在查询计划时遇到了以下错误:org.apache.flink.table.api.TableException: StreamPhysicalWindowAggregate doesn't support consuming update and delete changes which is produced by node Deduplicate(keep=[FirstRow], key=[order_id], order=[ROWTIME])
我们成功地获得了一个简单的示例查询,再现了这个问题:
CREATE TABLE Orders (
order_id STRING,
user_id STRING,
product STRING,
num BIGINT,
user_action_time TIMESTAMP(3),
WATERMARK FOR user_action_time AS user_action_time - INTERVAL '5' SECOND
) WITH ('connector' = 'datagen');
WITH Deduplicated AS (
SELECT
order_id,
user_id,
product,
num,
user_action_time
FROM
(
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY order_id
ORDER BY
user_action_time ASC
) AS row_num
FROM
Orders
)
WHERE
row_num = 1
)
SELECT
user_id,
SUM(num) as num_sum
FROM
TABLE(
TUMBLE(
TABLE Deduplicated,
DESCRIPTOR(user_action_time),
INTERVAL '5' MINUTES
)
)
GROUP BY
user_id,
window_start,
window_end如果使用PROCTIME而不是ROWTIME运行相同的查询,则查询将成功运行。
我们使用的是flink 1.15.0
这是预期的行为吗?
发布于 2022-08-23 20:14:23
你能试试这个吗?我不知道问题出在哪里。
WITH Deduplicated AS (
SELECT
order_id,
user_id,
product,
num,
user_action_time
FROM
(
SELECT
*,
ROW_NUMBER() OVER (
PARTITION BY order_id
ORDER BY
user_action_time ASC
) AS row_num
FROM
Orders
)
WHERE
row_num = 1
)
SELECT
user_id,
SUM(num) as num_sum
FROM Deduplicated
GROUP BY user_id, TUMBLE(user_action_time, INTERVAL '5' MINUTES);发布于 2022-08-31 08:40:20
目前这是不可能的。您可以跟踪https://issues.apache.org/jira/browse/FLINK-27539 (还不确定这是Flink的bug还是新特性)
https://stackoverflow.com/questions/73461147
复制相似问题