我正在做一个项目,目标是运行一个守护进程,该守护进程将任务发送到芹菜队列,Redis用作代理。每个任务必须一次处理一次(不允许并发)。
为此,我在我的守护进程中实现了以下代码,该守护进程充当Redis的锁:
while True:
for foo in bar:
if not self.redis_client.exists(foo.name):
# Send the task to the Celery queue
task = celery_app.send_task('buzz', context={'name': foo.name})
redis_client.send(foo.name, task.id)
time.sleep(10)一旦任务完成或失败,锁就会被任务本身释放。
出于某些我不理解的原因,该任务有时由两个工作进程同时运行:
[2018-04-11 15:23:45,705: INFO/ForkPoolWorker-1] Task has been executed in 101.43s for foo
[2018-04-11 15:23:45,881: INFO/ForkPoolWorker-4] Task has been executed in 114.66s for foo这并不经常发生,但我完全不希望它发生。该如何解释这种行为呢?会不会和Redis写key/value对的开销时间有关?
作为附加信息,我还有一个在同一服务器上运行的Flower实例。
发布于 2018-04-12 03:44:03
这里有很多缺少的细节,但我将尝试帮助您:由于您的要求-没有并发-我猜您只有一个芹菜工作程序在运行。在运行此worker时,您可以通过-c标志(或--concurrency)指定并发级别-确保将其设置为1,这样一次只有一个worker实例处于活动状态。参考here
例如:celery -A proj worker --loglevel=INFO --concurrency=1 -n worker1@%h
您应该知道的另一件事是worker_prefetch_multiplier,它默认情况下一次预取4条消息。您可能也想将其更改为1(我猜您没有描述完整的场景)。参考here
最后一件事,关于你的redis锁,考虑使用SETNX (如果不存在就设置)- more info - here
祝好运!
https://stackoverflow.com/questions/49779372
复制相似问题