我试图在Exactly-Once集成中实现Flink-Kafka语义。我的生产者模块如下所示:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
env.enableCheckpointing(1000)
env.getCheckpointConfig.setMinPauseBetweenCheckpoints(1000) //Gap after which next checkpoint can be written.
env.getCheckpointConfig.setCheckpointTimeout(4000) //Checkpoints have to complete within 4secs
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) //Only 1 checkpoints can be executed at a time
env.getCheckpointConfig.enableExternalizedCheckpoints(
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) //Checkpoints are retained if the job is cancelled explicitly
//env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 10)) //Number of restart attempts, Delay in each restart
val myProducer = new FlinkKafkaProducer[String](
"topic_name", // target topic
new KeyedSerializationSchemaWrapper[String](new SimpleStringSchema()), // serialization schema
getProperties(), // producer config
FlinkKafkaProducer.Semantic.EXACTLY_ONCE) //Producer Config 消费模块
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
properties.setProperty("zookeeper.connect", "localhost:2181")
val consumer = new FlinkKafkaConsumer[String]("topic_name", new SimpleStringSchema(), properties)我制作了很少的唱片,并把它推给了这个制作人。这些记录的形式如下:
1
2
3
4
5
6
..
.. 诸若此类。那么假设在推送这些数据的时候,生产者能够把数据推到第四记录,并且由于一些失败,所以当它重新启动并运行时,它会从第五次开始推高记录吗?我的财产够吗?
我将按照第一个用户提到的this link在使用者端添加一个属性。我是否也应该在生产者端添加Idempotent属性?
我的Flink版本是1.13.5,Scala 2.11.12和我正在使用Flink Kafka connector 2.11。
我认为我无法使用EXACTLY_ONCE提交事务,因为检查点不是在上述路径上编写的。附加Web的屏幕截图:


我需要为此设置任何财产吗?
发布于 2022-01-07 15:29:07
https://stackoverflow.com/questions/70622321
复制相似问题