为了公平起见,我已经在没有事务方案的普通kafka中测试了它,当我多次尝试重新运行ProducerTest时,它不会跳过偏移量。
object ProducerTest extends LazyLogging {
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put("bootstrap.servers", "kafka.local:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("acks", "all")
props.put("retries", "3")
val producer = new KafkaProducer[String, String](props)
val record = new ProducerRecord[String, String]("zxc", "key", "value")
val record2 = new ProducerRecord[String, String]("zxc", "key2", "value2")
val record3 = new ProducerRecord[String, String]("zxc", "key3", "value3")
producer.send(record)
producer.send(record2)
producer.send(record3)
Thread.sleep(3000)
}
}但是当我在生产者上启用事务时,当ProducerTestWithTransaction应用程序重新运行时,它将跳过一个偏移量。就像我第一次开始的时候,它的偏移量是0,1,2,然后在重新运行后,它将是4,5,6,跳过3,依此类推。
object ProducerTestWithTransaction extends LazyLogging {
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put("bootstrap.servers", "kafka.local:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("enable.idempotence", "true")
props.put("transactional.id", "alona")
props.put("acks", "all")
props.put("retries", "3")
val producer = new KafkaProducer[String, String](props)
val record = new ProducerRecord[String, String]("wew", "key", "value")
val record2 = new ProducerRecord[String, String]("wew", "key2", "value2")
val record3 = new ProducerRecord[String, String]("wew", "key3", "value3")
producer.initTransactions()
try {
producer.beginTransaction()
producer.send(record)
producer.send(record2)
producer.send(record3)
producer.commitTransaction()
} catch {
case e: ProducerFencedException => producer.close()
case e: Exception => producer.abortTransaction();
}
}
}请告诉我为什么会发生这种情况?是否有解决此问题的方法来避免跳过偏移。谢谢!注意:我使用的是kafka-clients2.4.0和wurstmeister/kafka:2.12-2.3.0
发布于 2020-02-13 21:35:47
事务插入标记事件,AFAIK,这可能就是您所看到的
但是,在这两个代码中,都应该在运行时关闭钩子中调用producer.flush()
https://stackoverflow.com/questions/60205547
复制相似问题