首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在flink卡夫卡的生产者和消费者中只有一次

在flink卡夫卡的生产者和消费者中只有一次
EN

Stack Overflow用户
提问于 2022-01-07 14:00:33
回答 1查看 1.2K关注 0票数 1

我试图在Exactly-Once集成中实现Flink-Kafka语义。我的生产者模块如下所示:

代码语言:javascript
复制
val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.enableCheckpointing(1000)
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000) //Gap after which next checkpoint can be written.
    env.getCheckpointConfig.setCheckpointTimeout(4000) //Checkpoints have to complete within 4secs
    env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) //Only 1 checkpoints can be executed at a time
    env.getCheckpointConfig.enableExternalizedCheckpoints(
      ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //Checkpoints are retained if the job is cancelled explicitly
    //env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10)) //Number of restart attempts, Delay in each restart

    val myProducer = new FlinkKafkaProducer[String](
      "topic_name", // target topic
      new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), // serialization schema
      getProperties(), // producer config
      FlinkKafkaProducer.Semantic.EXACTLY_ONCE) //Producer Config 

消费模块

代码语言:javascript
复制
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    properties.setProperty("zookeeper.connect", "localhost:2181")

    val consumer = new FlinkKafkaConsumer[String]("topic_name", new SimpleStringSchema(), properties)

我制作了很少的唱片,并把它推给了这个制作人。这些记录的形式如下:

代码语言:javascript
复制
1  
2  
3  
4  
5  
6  
..  
..  

诸若此类。那么假设在推送这些数据的时候,生产者能够把数据推到第四记录,并且由于一些失败,所以当它重新启动并运行时,它会从第五次开始推高记录吗?我的财产够吗?

我将按照第一个用户提到的this link在使用者端添加一个属性。我是否也应该在生产者端添加Idempotent属性?

我的Flink版本是1.13.5Scala 2.11.12和我正在使用Flink Kafka connector 2.11

我认为我无法使用EXACTLY_ONCE提交事务,因为检查点不是在上述路径上编写的。附加Web的屏幕截图:

我需要为此设置任何财产吗?

EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-01-07 15:29:07

对于生产者一方,Flink Kafka使用者将在分布式检查点中记录当前偏移量,如果使用者任务失败,它将从最新的检查点重新启动,并从记录在检查点中的偏移量重新发出。例如,假设最新的检查点记录偏移了3,然后flink继续发出4,5,然后故障转移,那么Flink将继续从4发出记录。注意,这不会导致重复,因为所有操作符的状态在处理记录3之后也会回落到状态。

对于生产者一方,Flink使用两阶段提交1来实现精确一次.大致上,Flink生产者将依赖Kafka的交易来编写数据,并且只有在事务提交之后才正式提交数据。用户可以使用Semantics.EXACTLY_ONCE启用此功能。

1

2

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/70622321

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档