首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >正在进行的快照太多。增加kafka生产者池大小或减少并发检查点数量

正在进行的快照太多。增加kafka生产者池大小或减少并发检查点数量
EN

Stack Overflow用户
提问于 2020-03-21 06:39:15
回答 2查看 534关注 0票数 1

我正在开发一个沉入Kafka的Flink应用程序。我创建了一个默认池大小为5的Kafka生产者。我使用以下配置启用了检查点:

代码语言:javascript
复制
    env.enableCheckpointing(1800000);//checkpointing for every 30 minutes.

    // set mode to exactly-once (this is the default)
    env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

    // make sure 500 ms of progress happen between checkpoints
    env.getCheckpointConfig().setMinPauseBetweenCheckpoints(5000);

    // checkpoints have to complete within one minute, or are discarded
    env.getCheckpointConfig().setCheckpointTimeout(60000);

    // allow only one checkpoint to be in progress at the same time
    env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

应用程序有时会继续崩溃,并出现以下异常。这是kafka生产者池大小或检查点的问题吗?

代码语言:javascript
复制
2020-03-20 22:31:23,859 INFO  org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction  - FlinkKafkaProducer011 0/1 aborted recovered transaction TransactionHolder{handle=KafkaTransactionState [transactionalId=FileSplitReader -> metrics-map -> Sink: components-topic-sink-4ab008489d4c8ed0fe577883438cc1ff-1, producerId=21, epoch=3], transactionStartTime=1584742933826}
2020-03-20 22:31:23,860 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Error during disposal of stream operator.
java.lang.NullPointerException
    at org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.dispose(ContinuousFileReaderOperator.java:164)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:668)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.cleanUpInvoke(StreamTask.java:579)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:481)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:748)
2020-03-20 22:31:23,861 INFO  org.apache.flink.runtime.taskmanager.Task                     - FileSplitReader -> metrics-map -> Sink: components-topic-sink (1/1) (92b7f3ed8f6362fe0087efd40eb94016) switched from RUNNING to FAILED.
org.apache.flink.streaming.connectors.kafka.FlinkKafka011Exception: Too many ongoing snapshots. Increase kafka producers pool size or decrease number of concurrent checkpoints.
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.createTransactionalProducer(FlinkKafkaProducer011.java:934)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:701)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.beginTransaction(FlinkKafkaProducer011.java:97)
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:394)
    at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:385)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011.initializeState(FlinkKafkaProducer011.java:862)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
    at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:284)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.initializeStateAndOpen(StreamTask.java:1006)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:454)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:94)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:449)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:461)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532)
    at java.lang.Thread.run(Thread.java:748)
EN

回答 2

Stack Overflow用户

发布于 2020-03-21 11:43:07

如果不接触环境,就很难判断。

它可能与您正在运行的特定代码有关。你基本上是在点击this exception。

有几件事:

这是一个与代码中的数组相关的类似问题:Interrupted while joining ioThread / Error during disposal of stream operator in flink application

  1. 听起来像是在Kubernetes中运行,如果您查看this,您可以看到问题可能与失败的拆卸或作业和任务管理器之间缺乏连接有关,因此您可能需要检查Kubernetes集群中的网络,并确保所有Flink pods都可以相互通信。
票数 0
EN

Stack Overflow用户

发布于 2020-03-22 00:22:47

我建议你升级到最新的flink/kafka连接器--它看起来像是在运行FlinkKafkaProducer011,它是为Kafka 0.11设计的。

您应该从通用的Kafka连接器使用FlinkKafkaProducer:flink-connector-kafka。从Flink 1.9开始,它使用Kafka 2.2.0客户端。

对于maven,您想要指定

代码语言:javascript
复制
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.10.0</version>
</dependency>

或者如果您使用的是Scala 2.12,请将2.11替换为2.12。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60782483

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档