我的django应用程序允许用户相互发送消息,我将最近的一些消息汇集在一起,并使用芹菜和redis将它们发送到电子邮件中。
每次用户发送消息时,我都会向数据库中添加一条消息,然后触发一个异步任务来汇集该用户最近60秒的消息,并将其作为电子邮件发送。
tasks.pushMessagePool.apply_async(args = (fromUser,), countdown = 60)如果用户在接下来的60秒内发送了5条消息,那么我的假设是应该创建5个任务,但只有第一个任务发送电子邮件,而其他4个任务什么也不做。我实现了一个简单的锁定机制,以确保消息只被视为一次,并确保数据库锁定。
@shared_task
def pushMessagePool(fromUser, ignore_result=True):
lockCode = randint(0,10**9)
data.models.Messages.objects.filter(fromUser = fromUser, locked=False).update(locked=True, lockCode = lockCode)
M = data.models.Messages.objects.filter(fromUser = fromUser, lockCode = lockCode)
sendEmail(M,lockCode)使用这种设置,我仍然会偶尔(~10%)得到重复。副本将在10毫秒内触发,并且它们具有不同的lockCodes。
为什么这个锁定机制不起作用?celery指的是旧的DB快照吗?这没有任何意义。
发布于 2016-03-11 03:06:14
Djangojack,这里有类似的问题吗?但是对于SQS。我不确定它是否也适用于Redis?
在创建SQS队列时,您需要将默认的可见性超时设置为大于您期望的任务运行的最大时间。这是SQS在将消息传递给一个消费者之后,使所有其他消费者看不到的时间。我相信默认值是30秒。因此,如果任务耗时超过30秒,SQS会将相同的消息传递给另一个消费者,因为它假设第一个消费者死了,并且没有完成任务。
来自@gustavo-ambrozio在this answer上的评论。
https://stackoverflow.com/questions/35920780
复制相似问题