我们最近被迫用RQ代替芹菜,因为它更简单,芹菜给我们带来了太多的问题。现在,我们无法找到动态创建多个队列的方法,因为我们需要同时完成多个作业。因此,基本上,对我们其中一个路由的每个请求都应该启动一个作业,让多个用户等待一个用户的作业完成,然后我们才能继续下一个作业是没有意义的。我们定期向服务器发送请求,以获取作业的状态和一些元数据。这样我们就可以用进度条更新用户(这可能是一个很长的过程,所以为了UX,必须这样做)
我们使用的是Django和Python的rq库。我们没有使用django-rq (请让我知道使用它有什么好处)
到目前为止,我们在一个控制器中启动了一个任务,如下所示:
redis_conn = Redis()
q = Queue(connection=redis_conn)
job = django_rq.enqueue(render_task, new_render.pk, domain=domain, data=csv_data, timeout=1200)然后在我们的render_task方法中,我们根据长任务的状态将元数据添加到作业中:
current_job = get_current_job()
current_job.meta['state'] = 'PROGRESS'
current_job.meta['process_percent'] = process_percent
current_job.meta['message'] = 'YOUTUBE'
current_job.save()现在我们有了另一个端点,它获取当前任务及其元数据,并将其传递回客户端(这是通过oeriodic请求实现的)
我们如何在不阻塞其他作业的情况下并发运行作业?我们应该动态地创建队列吗?有没有办法利用Worker来实现这一点?
发布于 2016-05-12 02:05:26
据我所知,RQ没有任何设施来管理多个工人。您必须启动一个新的工作进程来定义它将使用哪个队列。其中一种方法是使用Supervisor,这对我来说效果很好。在supervisor中,您可以为给定队列和多个进程配置工作进程,使其具有并发性。例如,您可以让具有5个工作进程的“高优先级”队列和具有1个工作进程的“低优先级”队列。
发布于 2019-05-09 23:29:03
运行多个worker不仅是可能的,而且是理想的。我为start命令使用了一个bash文件来进入虚拟环境,并使用一个自定义的Worker类启动。
这里有一个主管配置,它对我来说非常适合RQ工作人员,在生产工作负载下也是如此。请注意,startretries很高,因为它在AWS上运行,并且在部署期间需要重试。
[program:rq-workers]
process_name=%(program_name)s_%(process_num)02d
command=/usr/local/bin/start_rq_worker.sh
autostart=true
autorestart=true
user=root
numprocs=5
startretries=50
stopsignal=INT
killasgroup=true
stopasgroup=true
stdout_logfile=/opt/elasticbeanstalk/tasks/taillogs.d/super_logs.conf
redirect_stderr=truestart_rq_worker.sh的内容
#!/bin/bash
date > /tmp/date
source /opt/python/run/venv/bin/activate
source /opt/python/current/env
/opt/python/run/venv/bin/python /opt/python/current/app/manage.py
rqworker --worker-class rq.SimpleWorker default发布于 2016-11-30 16:47:04
我想建议一个使用django-rq的非常简单的解决方案:
示例settings.py
...
RQ_QUEUES = {
'default': {
'HOST': os.getenv('REDIS_HOST', 'localhost'),
'PORT': 6379,
'DB': 0,
'DEFAULT_TIMEOUT': 360,
},
'low': {
'HOST': os.getenv('REDIS_HOST', 'localhost'),
'PORT': 6379,
'DB': 0,
'DEFAULT_TIMEOUT': 360,
}
}
...运行配置
运行python manage.py rqworker default low的次数(例如,每次在它自己的外壳中,或者作为它自己的Docker容器)与所需的工作进程数一样多。命令中队列的顺序决定了它们的优先级。此时,所有工作进程都在监听这两个队列。
代码中的
调用要运行的作业时,传入所需的队列:
对于高/普通优先级作业,您可以不带任何参数进行调用,作业将进入默认队列。对于低优先级,您必须在作业级别指定:
@job('low')
def my_low_priority_job():
# some code然后给my_low_priority_job.delay()打电话。
或者,在调用时确定优先级:
queue = django_rq.get_queue('low')
queue.enqueue(my_variable_priority_job)https://stackoverflow.com/questions/32598945
复制相似问题