首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >使用Celery worker与SQLAlchemy DB进行交互,包括根据请求了解用户

使用Celery worker与SQLAlchemy DB进行交互,包括根据请求了解用户
EN

Stack Overflow用户
提问于 2019-02-23 07:36:57
回答 3查看 2.8K关注 0票数 3

我在这方面做了大量的研究,包括尝试像this这样的答案。芹菜似乎无法访问我的Flask应用程序的上下文。

我很清楚我的芹菜对象,什么将装饰我的任务,必须有权访问我的Flask应用程序的上下文。我相信它是应该的,因为我遵循this指南创建了我的芹菜对象。我不确定是不是因为我使用的是Flask-HTTPAuth,所以造成了混淆。

以下是我所拥有的一些内容。

代码语言:javascript
复制
def make_celery(app):
    celery = Celery(app.import_name, backend=app.config["CELERY_RESULT_BACKEND"], broker=app.config["CELERY_BROKER_URL"])
    celery.conf.update(app.config)
    TaskBase = celery.Task
    class ContextTask(TaskBase):
        abstract = True
        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)
    celery.Task = ContextTask
    return celery

app = Flask(__name__)
auth = HTTPBasicAuth()
app.config["SQLALCHEMY_DATABASE_URI"] = "sqlite:///flask_app.db"
app.config["CELERY_BROKER_URL"] = "redis://localhost:6379"
app.config["CELERY_RESULT_BACKEND"] = "redis://localhost:6379"
celery = make_celery(app)
db = SQLAlchemy(app)

@celery.task(bind=True, name="flask_app.item_loop")
def loop(self):
    items = g.user.items
    for item in items:
        print(item)

不过,使用Flask运行此任务是行不通的。我尝试通过点击服务器来启动这个功能(在授权的时候!)。

代码语言:javascript
复制
@app.route("/item_loop")
@auth.login_required
def item_loop():
    result = loop.delay()
    return "It's running."

但是芹菜工人告诉我任务raised unexpected: AttributeError("'_AppCtxGlobals' object has no attribute 'user'",),我相信这意味着,如上所述,我的芹菜对象没有应用程序上下文,即使我使用了推荐的工厂模式。

EN

回答 3

Stack Overflow用户

回答已采纳

发布于 2019-02-24 00:41:15

虽然Dave和Greg的答案中的建议是有效的,但他们没有强调的是您对在芹菜任务中使用应用程序上下文的误解。

您有一个使用Flask-HTTPAuth的Flask应用程序。您可能有一个将g.user设置为经过身份验证的用户的verify_password处理程序。这意味着当您处理请求时,您可以作为g.user访问该用户。这一切都很好。

您还拥有一个或多个Celery工作进程,它们是与Flask服务器没有直接连接的独立进程。Flask服务器和Celery工作进程之间的唯一通信是通过您正在使用的消息代理(通常是Redis或RabbitMQ)进行的。

根据您的需要,芹菜工人可能需要访问Flask应用程序。当使用将其配置存储在app.config字典中的Flask扩展时,这是非常常见的。需要这样做的两个常见扩展是Flask-SQLAlchemy和Flask-Mail。如果不访问app.config,Celery任务将无法打开到数据库的连接或发送电子邮件,因为它不知道数据库和/或电子邮件服务器的详细信息。

为了让Celery worker访问配置,公认的做法是在每个worker中创建重复的Flask应用程序。这些是辅助应用程序,它们与主Flask服务器使用的实际应用程序对象没有任何连接。它们的唯一用途是保存原始app.config字典的副本,您的任务或您的任务正在使用的任何Flask扩展都可以访问该字典。

因此,期望Flask服务器中的g.user集也可以作为Celery任务中的g.user进行访问是无效的,因为这些对象是来自不同应用程序实例的不同g对象。

如果需要在Celery任务中使用经过身份验证的用户,您应该做的是将user_id (通常是g.user.id)作为参数传递给您的任务。然后,在您的任务中,您可以使用此id从数据库加载用户。希望这能有所帮助!

票数 5
EN

Stack Overflow用户

发布于 2019-02-23 10:38:23

要从任务执行中检索用户,您可以尝试传递user对象(如果芹菜可以对其进行筛选),或者传递足够的信息以使任务可以检索User对象(例如,用户的id)。在后一种情况下,您的任务将如下所示

代码语言:javascript
复制
@celery.task(bind=True, name="flask_app.item_loop")
def loop(self, user_id):
    user = User.query.get(user_id)
    items = user.items
    for item in items:
        print(item)

您将通过以下方式启动它(假设您正在使用flask_login)

代码语言:javascript
复制
result = loop.delay(current_user.id)
票数 2
EN

Stack Overflow用户

发布于 2019-02-23 10:50:13

正如@Dave W.Smith所指出的,与依赖g检索用户相比,将用户信息作为参数传递给Celery任务可能是一种更好的方法。根据Flask documentation on app context的说法,g的生命周期就是一个请求。由于Celery任务是异步执行的,因此它将在不同的应用程序上下文中执行,而不是在请求中定义用户的上下文中执行。

票数 2
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/54836672

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档