首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >避免重复Kafka生产者消息

避免重复Kafka生产者消息
EN

Stack Overflow用户
提问于 2020-04-28 00:10:28
回答 2查看 477关注 0票数 1

我使用的是Spring boot.Java 8中的KafkaTemplate

我的主要目标是消费者不应该两次消费消息。

1)调用表获取100行并发送给kafka

2)假设我处理了70行(我得到了成功确认),然后Kafka宕机了(Kafka在重试机制计时内没有恢复)

那么,当我重新启动spring boot应用程序时,如何确保这70条消息不会再次发送。

一种选择是我可以在DB表message is_sent = Y or N中设置标志。

有没有其他有效的方法?

EN

回答 2

Stack Overflow用户

发布于 2020-04-28 00:33:06

我会使用带有Kafka ConnectJDBC source connector (取决于您当前使用的数据库),它可以正确地处理这种情况。

如果你还想写自己的生产者,Kafka常见问题解答的this section应该是有用的:

如何获取来自Kafka的一次消息?

只有一次语义有两个部分:在数据生产过程中避免重复和在数据消费过程中避免重复。

有两种方法可以在数据生产期间获得只有一次的语义:

  1. 对每个分区使用单个编写器,每次遇到网络错误时,都会检查该分区中的最后一条消息,以查看上次写入是否成功
  2. 在消息中包含主键(UUID或其他信息)并对使用者执行重复数据删除。

如果你做这些事情中的一个,Kafka托管的日志将是无重复的。然而,没有重复的阅读也依赖于消费者的一些合作。如果使用者定期对其位置设置检查点,那么如果它失败并重新启动,它将从检查点位置重新启动。因此,如果数据输出和检查点不是原子写入的,那么在这里也有可能获得副本。这是您的存储系统特有的问题。例如,如果您使用的是数据库,则可以在一个事务中一起提交这些内容。LinkedIn编写的HDFS加载器Camus对Hadoop加载执行类似的操作。另一种不需要事务的替代方案是将偏移量与加载的数据一起存储,并使用主题/分区/偏移量组合进行重复数据删除。

我认为有两个改进会让这件事变得更容易:

通过可选地在服务器上集成对此的支持,

  1. 生产者幂等性可以自动完成,而且成本更低。现有的高级消费者
  2. 不会公开很多更细粒度的偏移量控制(例如,重置位置)。我们很快就会解决这个问题,
票数 0
EN

Stack Overflow用户

发布于 2020-04-28 00:51:24

对于Kafka,我已经看到了存储指向id的指针以跟踪您在主题中所处位置的实现,并使用某种类型的分布式存储在集群级别跟踪这一点。我在那里没有做太多的工作,所以我将尝试提供一个我们在SQS中用于dup检测的解决方案。很可能Kafka有一个比这个更好的解决方案来解决重复,只是想在那里添加,以便您也可以查看替代解决方案。

我在使用AWS SQS处理点对点消息传递用例时遇到了同样的问题,因为它提供了至少一次交付保证,而不是一次且只有一次。

我们最终使用Redis和它的分布式锁定策略来解决这个问题。我在这里写了一个https://angularthinking.blogspot.com/

高级方法是创建一个分布式锁,以便使用适合您的用例的TTL在缓存中放置一个条目。我们使用LUA脚本来执行putIfNotExists()方法,如上面的博客所示。规模是我们关心的问题之一,通过上面的实现,我们能够每秒处理数以万计的消息,而SQS和redis的伸缩性非常好。我们必须根据吞吐量和缓存增长将TTL调整为最佳值。我们确实有24小时或更短的复制窗口的好处,所以这个决定依赖于redis是可以的。如果您有较长的窗口,其中重复可能发生在几天或几个月,redis选项可能不适合。

我们还考虑了使用DynamoDB来实现putIfNotExists(),但是redis在这个用例中表现得更好,特别是在使用LUA脚本实现原生putIfNotExists的情况下。

祝你搜索成功。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/61463259

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档