首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >什么时候使用Kafka事务性API?

什么时候使用Kafka事务性API?
EN

Stack Overflow用户
提问于 2019-10-04 15:29:27
回答 1查看 3.2K关注 0票数 4

我试图理解Kafka的事务性API。这个链接将原子读-进程-写入周期定义为:

首先,让我们考虑原子读写周期意味着什么。简而言之,这意味着如果应用程序在某个主题分区tp0的偏移量X处使用消息A,并在对消息A进行一些处理后将消息B写入主题分区tp1 (B= F(A) ),那么读写周期只有在消息A和B被认为成功地消费和一起发布时才是原子的,或者根本没有。

它还说:

使用为至少一次交付语义配置的香草Kafka生产者和消费者,流处理应用程序可能会以以下方式完全丢失一次处理语义:

  1. 由于内部重试,producer.send()可能导致消息B重复写入。这是由幂等的生产者,而不是重点关注的其余这篇文章。
  2. 我们可能会重新处理输入消息A,导致重复的B消息被写入输出,违反了一次处理语义。如果流处理应用程序在写入B之后但在标记A为消耗之前崩溃,则可能发生再处理。因此,当它恢复时,它将再次消耗A,然后再次写入B,从而造成重复。
  3. 最后,在分布式环境中,应用程序将崩溃,或者--更糟的是!-temporarily将失去与系统其他部分的连接。通常,新的实例会自动开始,以取代那些被认为丢失的实例。通过这个过程,我们可能会有多个实例处理相同的输入主题并写入相同的输出主题,从而导致重复的输出,并且违反了一次处理语义。我们称之为“僵尸实例”的问题。

我们在Kafka中设计了事务API来解决第二个和第三个问题。事务允许在读-进程-写周期中精确地进行一次处理,方法是使这些周期具有原子性,并方便僵尸围栏。

怀疑:

  1. 上面的第2点和第3点描述了何时可以发生消息复制,并使用事务性API进行处理。在任何情况下,事务性API也有助于避免消息丢失吗?
  2. 大多数在线(例如,这里这里)的Kafka事务性API示例涉及: while (true) { ConsumerRecords records = consumer.poll(Long.MAX_VALUE);producer.beginTransaction();for (ConsumerRecord record : records) producer.send(“outputTopic”,record);producerRecord组);producer.commitTransaction();} 这基本上是读-进程-写循环。那么,事务性API只在读-进程-写循环中有用吗?
  3. 文章给出了非读进程写入场景中事务性API的示例: producer.initTransactions();尝试{ producer.beginTransaction();producer.send(record1);producer.send(record2);producer.commitTransaction();} catch(ProducerFencedException e) { producer.close();} catch(KafkaException e) { producer.abortTransaction();} 上面写着: 这允许生产者向多个分区发送一批消息,这样,批处理中的所有消息最终对任何使用者都是可见的,或者对消费者是不可见的。

此示例是否正确,并展示了与读-进程-写循环不同的使用事务性API的另一种方法?(请注意,它也没有将偏移提交到事务。)

  1. 在我的应用程序中,我只需使用kafka的消息,进行处理并将它们记录到数据库中。那是我的全部管道。 所以,我想这不是读-过程-写周期.Kafka事务性API对我的场景有什么用处吗? 另外,我需要确保每条消息都被正确地处理一次。我想在生产者中设置idempotent=true就足够了,我不需要事务性API,对吗? 我可能运行多个管道实例,但我不会将处理输出写入Kafka。所以我想这永远不会涉及僵尸(复制的制片人写信给卡夫卡)。所以,我想事务性API不会帮助我避免重复的处理场景,对吗?(为了避免重复处理,我可能必须在同一个数据库事务中将这两种偏移量与对数据库的处理输出一起持久化,并在生产者重新启动期间读取偏移量。)
EN

回答 1

Stack Overflow用户

发布于 2020-05-14 17:16:05

所以,我想这不是读-过程-写周期.Kafka事务性API对我的场景有什么用处吗?

这是一个读-过程-写,除了你是写到一个数据库,而不是卡夫卡。Kafka有自己的事务管理器,因此,在具有幂等性的事务中写入将启用一次处理,前提是您可以正确地恢复您的使用者写处理器的状态。因为DB的事务管理器与kafka的事务管理器不同步,所以您不能这样做。相反,您可以做的是确保即使kafka事务对于您的数据库不是原子的,它们最终仍然是一致的。

让我们假设您的使用者读、写到DB,然后是ack。如果DB失败了,您将不进行修改,并且可以根据偏移量恢复正常运行。如果ack失败,您将处理两次并保存到数据库两次。如果你能使这个操作成为幂等的,那么你是安全的。这意味着您的处理器必须是纯的,而DB必须执行dedupe:两次处理相同的消息总是会在DB上产生相同的结果。

另外,我需要确保每条消息都被正确地处理一次。我想在生产者中设置idempotent=true就足够了,我不需要事务性API,对吗?

假设您尊重a点的要求,在不同的存储上使用持久性处理一次,还需要在初始写入和重复之间,保存的对象没有发生其他更改。假设有一个值写成X,然后其他一些参与者将其更改为Y,然后消息被重新处理并将其更改为X。这可以避免,例如,通过使数据库表成为一个日志,类似于kafka主题。

我可能运行多个管道实例,但我不会将处理输出写入Kafka。所以我想这永远不会涉及僵尸(复制的制片人写信给卡夫卡)。所以,我想事务性API不会帮助我避免重复的处理场景,对吗?(为了避免重复处理,我可能必须在同一个数据库事务中将这两种偏移量与对数据库的处理输出一起持久化,并在生产者重新启动期间读取偏移量。)

是生产者写到您所消费的主题,可能会创建僵尸消息。那个制片人需要和卡夫卡合作,这样僵尸就会被忽视。事务性API和您的使用者将确保此生产者以原子方式编写,并且您的使用者读取提交的消息,尽管不是原子性的。如果你只需要一次,那就足够了。如果消息应该是原子编写的,那么您也需要事务。无论哪种方式,您的读-写/消费-生产处理器需要是纯的,你必须降低。您的DB也是此处理器的一部分,因为DB实际上是持久化的。

我在网上找过一点,也许这个链接对你有帮助:处理保证

你发布的链接:精确一次语义卡夫卡交易都很棒。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/58239378

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档