我有一个服务,它公开一个API,然后提供任务,它是用Falcon (API)和Celery (任务管理)实现的。
具体地说,我的工作人员需要很长时间才能加载,他们的代码如下所示
class HeavyOp(celery.Task):
def __init__(self):
self._asset = get_heavy_asset() # <-- takes long time
@property
def asset(self):
return self._asset
@app.task(base=HeavyOp)
def my_task(data):
return my_task.asset.do_something(data)实际发生的情况是,在__init__函数中,一些对象被从磁盘读取并在内存中保留,直到工作进程存在。
有时,我想更新那个对象。
有没有办法在不停机的情况下重新加载工人?因为这一切都是在API之后进行的,所以我不希望有那几分钟的时间作为停机时间来加载重对象。
我们可以假设主机具有多个核心,但解决方案必须是单个主机解决方案。
发布于 2018-10-15 19:33:45
我不认为你需要一个自定义的任务基类。您要实现的是单个实例资产类,它在worker初始化后加载,您可以从任务重新加载。
这种方法是可行的:
# worker.py
import os
import sys
import time
from celery import Celery
from celery.signals import worker_ready
app = Celery(include=('tasks',))
class Asset:
def __init__(self):
self.time = time.time()
class AssetLoader:
__shared_state = {}
def __init__(self):
self.__dict__ = self.__shared_state
if '_value' not in self.__dict__:
self.get_heavy_asset()
def get_heavy_asset(self):
self._value = Asset()
@property
def value(self):
return self._value
@worker_ready.connect
def after_worker_ready(sender, **kwargs):
AssetLoader()在这里,我将AssetLoader设置为Borg类,但是您可以选择任何其他模式/策略来共享单个Asset实例。为了便于说明,我只是在执行get_heavy_asset时捕获时间戳。
# tasks.py
from worker import app, AssetLoader
@app.task(bind=True)
def load(self):
AssetLoader().get_heavy_asset()
return AssetLoader().value.time
@app.task(bind=True)
def my_task(self):
return AssetLoader().value.time请记住,Asset是按工作进程共享的,而不是跨工作进程共享的。如果您使用concurrency=1运行,这不会有什么不同,但对于其他任何情况,它都会有所不同。但从我在您的用例中收集的信息来看,无论哪种方式都应该没问题。
https://stackoverflow.com/questions/52621100
复制相似问题