我有一个运行FlinkSQL的Flink作业,其设置如下:
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final EnvironmentSettings settings =
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
env.setMaxParallelism(env.getParallelism() * 8);
env.getConfig().setAutoWatermarkInterval(config.autowatermarkInterval());
final TableConfig tConfig = tEnv.getConfig();
tConfig.setIdleStateRetention(Duration.ofMinutes(60));
tConfig.getConfiguration().setString("table.exec.source.idle-timeout", "180000 ms");为了在本地使用Kafka源代码测试这一点,我向Flink作业发射了几个事件。Flink UI显示它产生了一个水印。我等了3分钟,看水印是否提前了,而没有发送新事件(即空闲分区)。然而,没有水印的进展。
注意:我在本地使用了一个Kafka代理,它有三个分区。并且我的测试数据是键控的,因此被发送到同一个分区。但是,即使其他分区空闲,如果我等待3分钟,我也不会看到水印提前。
我们正在运行Flink 1.12.1。
更新:我在我的Flink SQL作业中看到异常:不知道版本是否不匹配。
2021-10-26 16:38:14
java.lang.NoClassDefFoundError: org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest$PartitionData
at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.lambda$null$0(OffsetsForLeaderEpochClient.java:52)
at java.base/java.util.Optional.ifPresent(Unknown Source)
at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.lambda$prepareRequest$1(OffsetsForLeaderEpochClient.java:51)
at java.base/java.util.HashMap.forEach(Unknown Source)
at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.prepareRequest(OffsetsForLeaderEpochClient.java:51)
at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.prepareRequest(OffsetsForLeaderEpochClient.java:37)
at org.apache.kafka.clients.consumer.internals.AsyncClient.sendAsyncRequest(AsyncClient.java:37)
at org.apache.kafka.clients.consumer.internals.Fetcher.lambda$validateOffsetsAsync$5(Fetcher.java:798)
at java.base/java.util.HashMap.forEach(Unknown Source)
at org.apache.kafka.clients.consumer.internals.Fetcher.validateOffsetsAsync(Fetcher.java:774)
at org.apache.kafka.clients.consumer.internals.Fetcher.validateOffsetsIfNeeded(Fetcher.java:498)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2328)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1271)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:249)发布于 2021-10-31 06:32:02
问题是,此设置在Flink 1.12.0或1.12.1中不起作用。我不得不升级到Flink 1.13.2,这个设置是按预期进行的。
唯一的例外是一只红鲱鱼,并不总是可以复制。
https://stackoverflow.com/questions/69729366
复制相似问题