首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Kafka事务-应用程序停止并重新启动时跳过一个偏移量

Kafka事务-应用程序停止并重新启动时跳过一个偏移量
EN

Stack Overflow用户
提问于 2020-02-13 18:23:37
回答 1查看 112关注 0票数 0

为了公平起见,我已经在没有事务方案的普通kafka中测试了它,当我多次尝试重新运行ProducerTest时,它不会跳过偏移量。

代码语言:javascript
复制
object ProducerTest extends LazyLogging {
  def main(args: Array[String]): Unit = {
    val props = new Properties()
    props.put("bootstrap.servers", "kafka.local:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("acks", "all")
    props.put("retries", "3")
    val producer = new KafkaProducer[String, String](props)
    val record = new ProducerRecord[String, String]("zxc", "key", "value")
    val record2 = new ProducerRecord[String, String]("zxc", "key2", "value2")
    val record3 = new ProducerRecord[String, String]("zxc", "key3", "value3")
    producer.send(record)
    producer.send(record2)
    producer.send(record3)
    Thread.sleep(3000)
  }
}

但是当我在生产者上启用事务时,当ProducerTestWithTransaction应用程序重新运行时,它将跳过一个偏移量。就像我第一次开始的时候,它的偏移量是0,1,2,然后在重新运行后,它将是4,5,6,跳过3,依此类推。

代码语言:javascript
复制
object ProducerTestWithTransaction extends LazyLogging {
  def main(args: Array[String]): Unit = {
    val props = new Properties()
    props.put("bootstrap.servers", "kafka.local:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("enable.idempotence", "true")
    props.put("transactional.id", "alona")
    props.put("acks", "all")
    props.put("retries", "3")
    val producer = new KafkaProducer[String, String](props)
    val record = new ProducerRecord[String, String]("wew", "key", "value")
    val record2 = new ProducerRecord[String, String]("wew", "key2", "value2")
    val record3 = new ProducerRecord[String, String]("wew", "key3", "value3")
    producer.initTransactions()
    try {
      producer.beginTransaction()
      producer.send(record)
      producer.send(record2)
      producer.send(record3)
      producer.commitTransaction()
    } catch {
      case e: ProducerFencedException => producer.close()
      case e: Exception => producer.abortTransaction();
    }
  }
}

请告诉我为什么会发生这种情况?是否有解决此问题的方法来避免跳过偏移。谢谢!注意:我使用的是kafka-clients2.4.0和wurstmeister/kafka:2.12-2.3.0

EN

回答 1

Stack Overflow用户

发布于 2020-02-13 21:35:47

事务插入标记事件,AFAIK,这可能就是您所看到的

但是,在这两个代码中,都应该在运行时关闭钩子中调用producer.flush()

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/60205547

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档