我想知道是否有一种方法可以设置RabbitMQ或Redis来处理芹菜,这样当我发送任务到队列时,它不会进入任务列表,而是进入基于任务有效负载的一组任务中,以避免重复。
下面是我对更多上下文的设置: Python + Celery。我已经尝试了RabbitMQ作为后端,现在我使用Redis作为后端,因为我不需要100%的可靠性,更容易使用,小内存占用等。
我大约有1000个ids需要重复工作。我的数据管道的阶段1由调度器触发,它输出阶段2的任务。任务只包含需要完成工作的id,实际数据存储在数据库中。我可以运行阶段1和阶段2任务的任何组合或顺序,而不会造成损害。
如果阶段2没有足够的处理能力来处理阶段1输出的大量任务,我的任务队列就会越来越大。如果任务队列使用集合而不是列表作为底层数据结构,则不必如此。
是否有现成的解决方案可以将列表切换到集作为分布式任务队列?芹菜能做到这一点吗?我最近看到Redis刚刚发布了一个队列系统的alpha版本,所以现在还没有准备好投入生产使用。
我应该以不同的方式构建我的管道吗?
发布于 2015-05-21 18:32:58
您可以使用外部数据结构来存储和监控您的芹菜队列的当前状态。1.以redis key-value为例。每当你将一个任务推入到芹菜中时,你在redis中将一个带有'id‘字段的键标记为true。
这种方法确保了在芹菜队列中每个任务id只有一个实例在运行。你也可以通过在redis键上使用INCR和DECR命令来增强它,当任务被推送和after_return时,每个id只允许N个任务。
发布于 2015-05-17 09:37:01
您在阶段2中的任务是否可以检查工作是否已经完成,如果已经完成,则不再进行该工作?这样,即使你的任务列表会增加,你需要做的工作量也不会增加。
我还没有遇到集合/列表的解决方案,我认为有很多其他方法可以绕过这个问题。
发布于 2015-05-19 20:20:43
在Redis中为你的作业队列使用一个SortedSet。它确实是一个集合,所以如果你将完全相同的数据放入其中,它不会添加新的值(它绝对需要是完全相同的数据,你不能覆盖Redis的SortedSet中使用的散列函数)。
你需要一个分数来使用SortedSet,你可以使用时间戳(值为双精度,例如使用unixtime ),这将允许你获得最新的项目/最旧的项目,如果你想要的话。ZRANGEBYSCORE可能是您要查找的命令。http://redis.io/commands/zrangebyscore
此外,如果您需要额外的行为,您可以将所有内容包装在Lua脚本中,以实现原子行为和自定义驱逐策略(如果需要)。例如,调用获取任务的"get“脚本,并自动将其从队列中移除,或者在背压过大的情况下清除数据等。
https://stackoverflow.com/questions/30278054
复制相似问题