总的来说,我对春天还有点陌生--卡夫卡/卡夫卡。我的问题相当简短。我有一个只使用使用者的应用程序,它不断地从Kafka读取消息,处理消息,并使用Ack侦听器手动确认消息。我有一个上游生产者专用应用程序的依赖关系,它们负责向Kafka主题发送消息,以便我消费。我们最近实现了跨生产者和消费者的事务,但我想了解更多关于故障点的信息,以及如何处理那些回滚的事务,这样它们就不会丢失?我已经读到,对于kafka容器工厂上的事务,最好使用AfterRollbackProcessor而不是SeekToCurrentErrorHandler,并将StatefulRetry设置为true。我使用事务的原因是为了在更新的版本中实现一次正确的Kafka语义,因为我们处理大量的数据库持久化,并且由于DB约束而无法承受重复事务。我想知道我的@KafkaListener是否必须使用@Transactional进行注释,因为我在声明不应该这样做之前读过一篇文章,但是其他的帖子可能是这样的,这就是我不确定的原因。我见过很多关于生产者和消费者应用程序的问题,但我还没有看到一个关于不同角色的单独应用程序(即使最终可能是相同的)。简而言之,我只想知道与Kafka合并事务时的最佳实践是什么,以及在这种情况下如何处理失败。
发布于 2020-01-10 18:59:46
Kafka事务对于只使用消费者的应用程序来说是不必要的开销。事务只有在生成记录时才有用。
我使用的
事务正是为了实现--在其更新的版本中使用卡夫卡语义,因为我们处理了大量的数据库持久化问题,并且由于DB约束而无法承受重复事务。
当涉及到其他技术时,没有对“精确一次”的保证。一次只适用于
read->process->write读写是Kafka的场景。这是一个常见的误解。
而且,即使使用kafka只读/进程/写,“精确一次”的语义也只适用于整个shebang。也就是说,只有当写入成功时才提交读的偏移量。
process步骤将至少获得一次语义,因此每当您在流程步骤的其他地方编写时,都需要去复制逻辑,而不管是否有一个写入步骤和(如果有编写),您使用的事务恰好是其中的一次。
对于从Kafka读取和写入DB而不写入Kafka的情况,侦听器上的@Transactional是正确的方法(使用dup逻辑以避免重复)。
对于只需要一次Kafka语义(读/处理/写)但在流程步骤中也要写入DB的情况,您可以在侦听器容器中使用ChainedKafkaTransactionManager,以便DB事务与Kafka事务同步(但是对于DB提交成功但Kafka事务失败的情况,仍然有一个小窗口)。所以你仍然需要去dup逻辑,即使那样。在这种情况下,您不需要想要侦听器上的@Transactional。
编辑
Producer-only有点不同;假设您想在一个事务中发布10条记录,您希望它们都在(提交)或退出(回滚)中。那么,您必须使用事务。
事务中生成的记录的使用者应该使用isolation.level=read_committed,这样他们就不会看到未提交的写操作(默认为read_uncommitted)。
如果您一次只发布单个记录,并且不涉及其他事务资源,那么如果只涉及Kafka,那么使用事务就没有意义了。
但是,如果您正在从DB或JMS等读取数据并写入Kafka,您可能希望同步DB和Kafka事务,但重复的概率仍然不是零;如何处理这一点取决于您提交事务的顺序。
通常,去复制依赖于应用程序;通常使用应用程序数据中的一些键,例如,SQL INSERT语句是以DB中不存在的键为条件的。
Kafka为每个记录提供了一个方便的唯一键,结合了主题/分区/偏移量。您可以将这些数据与数据一起存储在DB中,以防止重复。
EDIT2
当不使用事务时通常使用SeekToCurrentErrorHandler (STCEH);当侦听器抛出异常时,错误处理程序会重置偏移量,以便在下一次轮询中重新获取记录。经过多次尝试后,我们放弃并调用一个“恢复器”,例如DeadLetterPublishingRecoverer将失败的记录写入另一个主题。
但是,它仍然可以与事务一起使用。
错误处理程序在事务的范围内调用(在回滚之前的),因此,如果它抛出一个异常(除非恢复程序“消耗”失败),事务仍然会回滚。如果恢复成功,事务将提交。
AfterRollbackProcessor (ARP)是在增加恢复能力之前添加到STCEH中的。它在本质上与STCEH完全相同,但它在事务范围的之外运行(在之后运行回滚)。
如果STCEH已经执行了查找,那么配置这两种配置不会对任何事情产生影响,因为如果STCEH已经执行了查找,那么ARP就什么都不做了。
我仍然更喜欢在事务和STCEH中使用ARP,如果只是为日志消息获取适当的日志类别的话。也许还有其他我现在想不起来的原因。
注意,现在STCEH和ARP都支持重试和回退,根本不需要配置侦听器级有状态重试。如果您想要使用内存中的重试而不导致对代理的往返重取相同的记录,无状态重试可能仍然是有用的。
https://stackoverflow.com/questions/59687384
复制相似问题