首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Flink SQL连续窗口Top-N查询的结果不一致

Flink SQL连续窗口Top-N查询的结果不一致
EN

Stack Overflow用户
提问于 2021-11-04 11:38:09
回答 2查看 45关注 0票数 0

对于Flik Kafka SQL源定义:

代码语言:javascript
复制
CREATE TABLE PlayEvents (
  `event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
  `the_kafka_key` STRING, 
  `song_id` BIGINT NOT NULL, 
  `duration` BIGINT, 
  WATERMARK FOR `event_time` AS `event_time` - INTERVAL '1' SECONDS 
) WITH (
  'connector' = 'kafka', 
  'topic' = 'play-events',
  'properties.bootstrap.servers' = 'localhost:29092',
  'key.format' = 'raw',
  'key.fields' = 'the_kafka_key',
  'value.format' = 'avro-confluent',
  'value.avro-confluent.url' = 'http://localhost:8081',
  'value.fields-include' = 'EXCEPT_KEY',
  'scan.startup.mode' = 'earliest-offset'
)

使用Avro格式每个100ms将测试PlayEvent消息发送到play-events

代码语言:javascript
复制
protocol `protocol` {
    record PlayEvent {
        long song_id;
        long duration;
    }
}

我正在运行一个连续的Window Top-N查询:

代码语言:javascript
复制
Configuration strConf = new Configuration();
strConf.setInteger(RestOptions.PORT, 8089);
strConf.setString(RestOptions.BIND_PORT, "8088-8090");

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(strConf);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Configuration tableConfiguration = tableEnv.getConfig().getConfiguration();
tableConfiguration.setString("table.exec.source.idle-timeout", "2 min");

tableEnv.sqlQuery("" +
    "SELECT window_end, song_id, play_count FROM ( " +
    " SELECT *, " +
    "    ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY play_count DESC) AS row_num " +
    " FROM (" +
    "    SELECT window_start, window_end, song_id, COUNT(*) AS play_count " +
    "    FROM TABLE( " +
    "      TUMBLE(TABLE PlayEvents, DESCRIPTOR(event_time), INTERVAL '60' SECONDS)) " +
    "    GROUP BY window_start, window_end, song_id " +
    " ) " +
    ") WHERE row_num <= 4 "
).execute().print();

启动后的第一个打印输出显示的结果一致,每个窗口周期恰好有4个结果:

代码语言:javascript
复制
...
| +I | 2021-11-04 12:23:00.000 |                    3 |                   16 |
| +I | 2021-11-04 12:23:00.000 |                    6 |                   14 |
| +I | 2021-11-04 12:23:00.000 |                    1 |                   12 |
| +I | 2021-11-04 12:23:00.000 |                   11 |                   10 |

| +I | 2021-11-04 12:24:00.000 |                    9 |                   16 |
| +I | 2021-11-04 12:24:00.000 |                    6 |                   13 |
| +I | 2021-11-04 12:24:00.000 |                    7 |                   12 |
| +I | 2021-11-04 12:24:00.000 |                    5 |                   12 |

不过,后续输出显示了一些部分窗口内容:

代码语言:javascript
复制
...
| +I | 2021-11-04 12:25:00.000 |                    9 |                   12 |
| +I | 2021-11-04 12:25:00.000 |                    6 |                    6 |

| +I | 2021-11-04 12:27:00.000 |                   11 |                   18 |
...

例如,该窗口每个间隔仅包含1、2、3个元素,并且有时整个间隔被丢弃。

如果我重新启动查询,所有结果都将正确显示(例如,每个窗口恰好有4个元素。)在第一次打印时,它将继续部分响应。

我尝试了不同的配置,尝试将连续查询写到另一个Kafka主题中,然后从那里查询它。但是不一致的部分结果仍然存在。

不确定我是否遗漏了一些重要的配置,或者这是Flink的已知限制或问题?

另外,我正在使用Flink 1.14进行测试,在本地机器上运行测试。

EN

回答 2

Stack Overflow用户

发布于 2021-11-04 13:43:05

当执行事件时间处理的Flink作业表现出不确定行为时,这是因为处理时间在某种程度上会影响结果。在这种情况下,我的猜测是实际的无序性超过了水印中配置的1秒间隔,导致延迟事件然后被丢弃。

idle-timeout也可以在这方面发挥作用。当被标记为空闲的源重新开始传送事件时,很容易出现这样的情况:水印已经超过了来自该先前空闲源的初始事件中的时间戳。即使一秒钟足以处理通常预期的无序,在这种特殊情况下可能也不够长。

票数 0
EN

Stack Overflow用户

发布于 2021-11-11 13:26:47

这个问题似乎与等于机器核心数量的defaultLocalParallelism (在我的例子中是16)有关。在结果中,16个子任务派生but only one is used来处理输入数据( input Kafka主题仅使用一个分区)。

如果我显式地将parallelism设置为1,则会解决空闲超时和查询结果不一致的问题:

代码语言:javascript
复制
Configuration strConf = ...
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, strConf);

或者你可以这样设置它:

代码语言:javascript
复制
env.getConfig().setParallelism(1);

附注:虽然这解决了我的演示问题,但我想知道为本地环境配置并行/最大并行的正确方法是什么。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69838645

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档