首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Flink SQL不遵守“table.exec.source.空闲-超时”设置

Flink SQL不遵守“table.exec.source.空闲-超时”设置
EN

Stack Overflow用户
提问于 2021-10-26 20:07:28
回答 1查看 352关注 0票数 0

我有一个运行FlinkSQL的Flink作业,其设置如下:

代码语言:javascript
复制
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final EnvironmentSettings settings =
        EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

env.setMaxParallelism(env.getParallelism() * 8);
env.getConfig().setAutoWatermarkInterval(config.autowatermarkInterval());

final TableConfig tConfig = tEnv.getConfig();
tConfig.setIdleStateRetention(Duration.ofMinutes(60));

tConfig.getConfiguration().setString("table.exec.source.idle-timeout", "180000 ms");

为了在本地使用Kafka源代码测试这一点,我向Flink作业发射了几个事件。Flink UI显示它产生了一个水印。我等了3分钟,看水印是否提前了,而没有发送新事件(即空闲分区)。然而,没有水印的进展。

注意:我在本地使用了一个Kafka代理,它有三个分区。并且我的测试数据是键控的,因此被发送到同一个分区。但是,即使其他分区空闲,如果我等待3分钟,我也不会看到水印提前。

  1. 在工作UI中的任何位置,我可以看到我设置的3分钟的值是否真的被选中了?我是否使用了正确的单位(秒对毫秒)

  1. 还有什么我可以检查的来测试这个设置?

我们正在运行Flink 1.12.1。

更新:我在我的Flink SQL作业中看到异常:不知道版本是否不匹配。

代码语言:javascript
复制
2021-10-26 16:38:14
java.lang.NoClassDefFoundError: org/apache/kafka/common/requests/OffsetsForLeaderEpochRequest$PartitionData
    at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.lambda$null$0(OffsetsForLeaderEpochClient.java:52)
    at java.base/java.util.Optional.ifPresent(Unknown Source)
    at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.lambda$prepareRequest$1(OffsetsForLeaderEpochClient.java:51)
    at java.base/java.util.HashMap.forEach(Unknown Source)
    at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.prepareRequest(OffsetsForLeaderEpochClient.java:51)
    at org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient.prepareRequest(OffsetsForLeaderEpochClient.java:37)
    at org.apache.kafka.clients.consumer.internals.AsyncClient.sendAsyncRequest(AsyncClient.java:37)
    at org.apache.kafka.clients.consumer.internals.Fetcher.lambda$validateOffsetsAsync$5(Fetcher.java:798)
    at java.base/java.util.HashMap.forEach(Unknown Source)
    at org.apache.kafka.clients.consumer.internals.Fetcher.validateOffsetsAsync(Fetcher.java:774)
    at org.apache.kafka.clients.consumer.internals.Fetcher.validateOffsetsIfNeeded(Fetcher.java:498)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateFetchPositions(KafkaConsumer.java:2328)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1271)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1235)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
    at org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:249)
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2021-10-31 06:32:02

问题是,此设置在Flink 1.12.0或1.12.1中不起作用。我不得不升级到Flink 1.13.2,这个设置是按预期进行的。

唯一的例外是一只红鲱鱼,并不总是可以复制。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/69729366

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档