我使用的是Spring boot.Java 8中的KafkaTemplate
我的主要目标是消费者不应该两次消费消息。
1)调用表获取100行并发送给kafka
2)假设我处理了70行(我得到了成功确认),然后Kafka宕机了(Kafka在重试机制计时内没有恢复)
那么,当我重新启动spring boot应用程序时,如何确保这70条消息不会再次发送。
一种选择是我可以在DB表message is_sent = Y or N中设置标志。
有没有其他有效的方法?
发布于 2020-04-28 00:33:06
我会使用带有Kafka Connect的JDBC source connector (取决于您当前使用的数据库),它可以正确地处理这种情况。
如果你还想写自己的生产者,Kafka常见问题解答的this section应该是有用的:
如何获取来自Kafka的一次消息?
只有一次语义有两个部分:在数据生产过程中避免重复和在数据消费过程中避免重复。
有两种方法可以在数据生产期间获得只有一次的语义:
如果你做这些事情中的一个,Kafka托管的日志将是无重复的。然而,没有重复的阅读也依赖于消费者的一些合作。如果使用者定期对其位置设置检查点,那么如果它失败并重新启动,它将从检查点位置重新启动。因此,如果数据输出和检查点不是原子写入的,那么在这里也有可能获得副本。这是您的存储系统特有的问题。例如,如果您使用的是数据库,则可以在一个事务中一起提交这些内容。LinkedIn编写的HDFS加载器Camus对Hadoop加载执行类似的操作。另一种不需要事务的替代方案是将偏移量与加载的数据一起存储,并使用主题/分区/偏移量组合进行重复数据删除。
我认为有两个改进会让这件事变得更容易:
通过可选地在服务器上集成对此的支持,
发布于 2020-04-28 00:51:24
对于Kafka,我已经看到了存储指向id的指针以跟踪您在主题中所处位置的实现,并使用某种类型的分布式存储在集群级别跟踪这一点。我在那里没有做太多的工作,所以我将尝试提供一个我们在SQS中用于dup检测的解决方案。很可能Kafka有一个比这个更好的解决方案来解决重复,只是想在那里添加,以便您也可以查看替代解决方案。
我在使用AWS SQS处理点对点消息传递用例时遇到了同样的问题,因为它提供了至少一次交付保证,而不是一次且只有一次。
我们最终使用Redis和它的分布式锁定策略来解决这个问题。我在这里写了一个https://angularthinking.blogspot.com/。
高级方法是创建一个分布式锁,以便使用适合您的用例的TTL在缓存中放置一个条目。我们使用LUA脚本来执行putIfNotExists()方法,如上面的博客所示。规模是我们关心的问题之一,通过上面的实现,我们能够每秒处理数以万计的消息,而SQS和redis的伸缩性非常好。我们必须根据吞吐量和缓存增长将TTL调整为最佳值。我们确实有24小时或更短的复制窗口的好处,所以这个决定依赖于redis是可以的。如果您有较长的窗口,其中重复可能发生在几天或几个月,redis选项可能不适合。
我们还考虑了使用DynamoDB来实现putIfNotExists(),但是redis在这个用例中表现得更好,特别是在使用LUA脚本实现原生putIfNotExists的情况下。
祝你搜索成功。
https://stackoverflow.com/questions/61463259
复制相似问题