首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >可持续部署的芹菜

可持续部署的芹菜
EN

Stack Overflow用户
提问于 2018-10-03 14:40:43
回答 1查看 556关注 0票数 0

我有一个服务,它公开一个API,然后提供任务,它是用Falcon (API)和Celery (任务管理)实现的。

具体地说,我的工作人员需要很长时间才能加载,他们的代码如下所示

代码语言:javascript
复制
class HeavyOp(celery.Task):
    def __init__(self):
        self._asset = get_heavy_asset()  # <-- takes long time

    @property
    def asset(self):
        return self._asset

@app.task(base=HeavyOp)
def my_task(data):

    return my_task.asset.do_something(data)

实际发生的情况是,在__init__函数中,一些对象被从磁盘读取并在内存中保留,直到工作进程存在。

有时,我想更新那个对象。

有没有办法在不停机的情况下重新加载工人?因为这一切都是在API之后进行的,所以我不希望有那几分钟的时间作为停机时间来加载重对象。

我们可以假设主机具有多个核心,但解决方案必须是单个主机解决方案。

EN

回答 1

Stack Overflow用户

发布于 2018-10-15 19:33:45

我不认为你需要一个自定义的任务基类。您要实现的是单个实例资产类,它在worker初始化后加载,您可以从任务重新加载。

这种方法是可行的:

代码语言:javascript
复制
# worker.py
import os
import sys
import time

from celery import Celery
from celery.signals import worker_ready


app = Celery(include=('tasks',))

class Asset:

    def __init__(self):
        self.time = time.time()



class AssetLoader:
    __shared_state = {}

    def __init__(self):
        self.__dict__ = self.__shared_state
        if '_value' not in self.__dict__:
            self.get_heavy_asset()

    def get_heavy_asset(self):
        self._value = Asset()

    @property
    def value(self):
        return self._value


@worker_ready.connect
def after_worker_ready(sender, **kwargs):
    AssetLoader()

在这里,我将AssetLoader设置为Borg类,但是您可以选择任何其他模式/策略来共享单个Asset实例。为了便于说明,我只是在执行get_heavy_asset时捕获时间戳。

代码语言:javascript
复制
# tasks.py
from worker import app, AssetLoader


@app.task(bind=True)
def load(self):
    AssetLoader().get_heavy_asset()
    return AssetLoader().value.time

@app.task(bind=True)
def my_task(self):
    return AssetLoader().value.time

请记住,Asset是按工作进程共享的,而不是跨工作进程共享的。如果您使用concurrency=1运行,这不会有什么不同,但对于其他任何情况,它都会有所不同。但从我在您的用例中收集的信息来看,无论哪种方式都应该没问题。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/52621100

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档