我们在WSGI应用程序中使用RQ。我们所做的是在运行任务的不同后端服务器中有几个不同的进程,连接到(可能)几个不同的任务服务器。为了更好地配置此设置,我们在系统中使用了一个自定义管理层,负责运行工作人员、设置任务队列等。
当作业失败时,我们希望实现重试,该重试在不断增加的延迟后重试几次,并最终完成作业或使其失败并在日志记录系统中记录错误条目。不过,我不知道应如何落实。我已经创建了一个自定义工作脚本,它允许我们将错误记录到数据库中,我第一次尝试重试的时候就是这样的:
# This handler would ideally wait some time, then requeue the job.
def worker_retry_handler(job, exc_type, exc_value, tb):
print 'Doing retry handler.'
current_retry = job.meta[attr.retry] or 2
if current_retry >= 129600:
log_error_message('Job catastrophic failure.', ...)
else:
current_retry *= 2
log_retry_notification(current_retry)
job.meta[attr.retry] = current_retry
job.save()
time.sleep(current_retry)
job.perform()
return False正如我所提到的,我们在worker文件中也有一个函数,它正确地解析了它应该连接到的服务器,并且可以发布作业。问题不一定是如何发布作业,而是如何处理异常处理程序中的作业实例。
任何帮助都将不胜感激。如果有关于更好的方法的建议或建议,那也是很棒的。谢谢!
发布于 2013-01-18 16:11:02
我认为有两个可能的问题:
发布于 2020-09-23 16:06:34
我有一个更好的解决方案
from rq import Queue, Worker
from redis import Redis
redis_conn = Redis(host=REDIS_HOST, health_check_interval=30)
queues = [
Queue(queue_name, connection=redis_conn, result_ttl=0)
for queue_name in ["Low", "Fast"]
]
worker = Worker(queues, connection=redis, exception_handlers=[retry_handler])
def retry_handler(job, exc_type, exception, traceback):
if isinstance(exception, RetryException):
sleep(RetryException.sleep_time)
job.requeue()
return False处理程序本身负责决定异常处理是否已经完成,或者是否应该转移到堆栈上的下一个处理程序。处理程序可以通过返回布尔值来指示这一点。False意味着停止处理异常,True意味着继续并切换到堆栈上的下一个异常处理程序。
对于实现者来说,重要的是要知道,默认情况下,当处理程序没有显式返回值(因此 to )时,这将被解释为True (即继续使用下一个处理程序)。
若要防止处理程序链中的下一个异常处理程序执行,请使用未通过的自定义异常处理程序,例如:
https://stackoverflow.com/questions/14389609
复制相似问题