我该如何增加卡夫卡的延迟工作呢?据我所知,它不涉及每条消息,而是每个主题。我的工作有不同的时间表,我希望它们被消耗掉。假设一个在接下来的4个小时内,另一个将是12月1日,等等。
卡夫卡是否有本土的支持这个或其他第三方的方式来实现同样的?
我正在考虑使用Redis来代替延迟队列,并在计划完成后将作业推送给kafka,但如果可能的话,我只想使用一个依赖项。
发布于 2014-11-15 17:07:01
卡夫卡没有就业的概念。它只是一个愚蠢的高性能消息排队服务。根据您的需求,您可以考虑将作业存储在支持按作业执行时间进行索引的存储中,比如某些RDBMS。然后在某些过程中,周期性地提取执行时间在一些小范围内的作业( last_check_time、current_time+lookahead_interval ),并将它们放入一个卡夫卡主题中进行最终处理。
发布于 2017-04-20 06:51:44
这里的回答有点延迟。现在,在最新的Kafka版本0.10+中,可以使用每条消息使用新的时间戳从延迟流中消费。我现在使用它是为了实现一个连续的聚合数据集,而不需要求助于外部依赖项。
这些记录是通过的,并且在第一个事件发生后60分钟内可能会有更新/删除,所以在我看到所有更新之前,我不能声明一个是“最终”。
因此,为了处理这种情况,我使用了两次创建/更新/删除的主题,第一次是实时的(或尽可能快),第二次是延迟90分钟,以确保我不会错过任何东西。在实时用户上,我在本地存储创建所需的所有更新。然后,在延迟的使用者上,当我收到一个特定的“创建”时,我将查找本地存储中的任何更新/删除,更新记录,以便它知道它的最终状态,并将它再次生成到最终的主题中。
为了确保我没有耗尽磁盘空间,我还不断地截断本地存储,以便它最多保存两个小时的更新/删除。
发布于 2019-11-06 11:19:06
不幸的是,Kafka没有能力像某些消息队列那样延迟消息的可见性。一旦消息发布,它将立即提供给所有消费者。这方面唯一的小例外是当发布发生在事务范围内,并且使用者启用了读提交隔离模式。即使如此,延迟也将是最小的。
卡夫卡将所有加工语义留给消费者自由裁量。如果需要延迟处理,则可能需要在使用者端使用持久数据存储(例如RDBMS或Redis)或另一个队列。您肯定不想用Thread.sleep()阻止生产者的唱片消费,因为这会影响您的记录投票能力,而且Kafka最终会认为您的消费者失败了。
https://stackoverflow.com/questions/26890222
复制相似问题