对于Flik Kafka SQL源定义:
CREATE TABLE PlayEvents (
`event_time` TIMESTAMP(3) METADATA FROM 'timestamp',
`the_kafka_key` STRING,
`song_id` BIGINT NOT NULL,
`duration` BIGINT,
WATERMARK FOR `event_time` AS `event_time` - INTERVAL '1' SECONDS
) WITH (
'connector' = 'kafka',
'topic' = 'play-events',
'properties.bootstrap.servers' = 'localhost:29092',
'key.format' = 'raw',
'key.fields' = 'the_kafka_key',
'value.format' = 'avro-confluent',
'value.avro-confluent.url' = 'http://localhost:8081',
'value.fields-include' = 'EXCEPT_KEY',
'scan.startup.mode' = 'earliest-offset'
)使用Avro格式每个100ms将测试PlayEvent消息发送到play-events:
protocol `protocol` {
record PlayEvent {
long song_id;
long duration;
}
}我正在运行一个连续的Window Top-N查询:
Configuration strConf = new Configuration();
strConf.setInteger(RestOptions.PORT, 8089);
strConf.setString(RestOptions.BIND_PORT, "8088-8090");
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(strConf);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Configuration tableConfiguration = tableEnv.getConfig().getConfiguration();
tableConfiguration.setString("table.exec.source.idle-timeout", "2 min");
tableEnv.sqlQuery("" +
"SELECT window_end, song_id, play_count FROM ( " +
" SELECT *, " +
" ROW_NUMBER() OVER (PARTITION BY window_start, window_end ORDER BY play_count DESC) AS row_num " +
" FROM (" +
" SELECT window_start, window_end, song_id, COUNT(*) AS play_count " +
" FROM TABLE( " +
" TUMBLE(TABLE PlayEvents, DESCRIPTOR(event_time), INTERVAL '60' SECONDS)) " +
" GROUP BY window_start, window_end, song_id " +
" ) " +
") WHERE row_num <= 4 "
).execute().print();启动后的第一个打印输出显示的结果一致,每个窗口周期恰好有4个结果:
...
| +I | 2021-11-04 12:23:00.000 | 3 | 16 |
| +I | 2021-11-04 12:23:00.000 | 6 | 14 |
| +I | 2021-11-04 12:23:00.000 | 1 | 12 |
| +I | 2021-11-04 12:23:00.000 | 11 | 10 |
| +I | 2021-11-04 12:24:00.000 | 9 | 16 |
| +I | 2021-11-04 12:24:00.000 | 6 | 13 |
| +I | 2021-11-04 12:24:00.000 | 7 | 12 |
| +I | 2021-11-04 12:24:00.000 | 5 | 12 |不过,后续输出显示了一些部分窗口内容:
...
| +I | 2021-11-04 12:25:00.000 | 9 | 12 |
| +I | 2021-11-04 12:25:00.000 | 6 | 6 |
| +I | 2021-11-04 12:27:00.000 | 11 | 18 |
...例如,该窗口每个间隔仅包含1、2、3个元素,并且有时整个间隔被丢弃。
如果我重新启动查询,所有结果都将正确显示(例如,每个窗口恰好有4个元素。)在第一次打印时,它将继续部分响应。
我尝试了不同的配置,尝试将连续查询写到另一个Kafka主题中,然后从那里查询它。但是不一致的部分结果仍然存在。
不确定我是否遗漏了一些重要的配置,或者这是Flink的已知限制或问题?
另外,我正在使用Flink 1.14进行测试,在本地机器上运行测试。
发布于 2021-11-04 13:43:05
当执行事件时间处理的Flink作业表现出不确定行为时,这是因为处理时间在某种程度上会影响结果。在这种情况下,我的猜测是实际的无序性超过了水印中配置的1秒间隔,导致延迟事件然后被丢弃。
idle-timeout也可以在这方面发挥作用。当被标记为空闲的源重新开始传送事件时,很容易出现这样的情况:水印已经超过了来自该先前空闲源的初始事件中的时间戳。即使一秒钟足以处理通常预期的无序,在这种特殊情况下可能也不够长。
发布于 2021-11-11 13:26:47
这个问题似乎与等于机器核心数量的defaultLocalParallelism (在我的例子中是16)有关。在结果中,16个子任务派生but only one is used来处理输入数据( input Kafka主题仅使用一个分区)。
如果我显式地将parallelism设置为1,则会解决空闲超时和查询结果不一致的问题:
Configuration strConf = ...
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1, strConf);或者你可以这样设置它:
env.getConfig().setParallelism(1);附注:虽然这解决了我的演示问题,但我想知道为本地环境配置并行/最大并行的正确方法是什么。
https://stackoverflow.com/questions/69838645
复制相似问题