在测试烧瓶应用程序时,celery_worker夹具不起作用,因为芹菜附带的pytest夹具不能在烧瓶应用程序上下文中运行。
# tasks.py
@current_app.task(bind=True)
def some_task(name, sha):
return Release.query.filter_by(name=name, sha=sha).all()
# test_celery.py
def test_some_celery_task(celery_worker):
async_result = some_task.delay(default_appname, default_sha)
assert len(async_result.get()) == 0上面的测试只会抛出RuntimeError: No application found.并拒绝运行。
通常,在烧瓶项目中使用芹菜时,我们必须继承celery.Celery并修补__call__方法,以便实际的芹菜任务在烧瓶应用程序上下文中运行,如下所示:
def make_celery(app):
celery = Celery(app.import_name)
celery.config_from_object('citadel.config')
class EruGRPCTask(Task):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return super(EruGRPCTask, self).__call__(*args, **kwargs)
celery.Task = EruGRPCTask
celery.autodiscover_tasks(['citadel'])
return celery但是看看celery.contrib.pytest,我看不出用同样的方法处理这些固定装置,也就是修改基本的芹菜应用程序,这样任务就可以在烧瓶应用程序上下文中运行。
发布于 2018-11-09 06:52:45
run.py
from flask import Flask
from celery import Celery
celery = Celery()
def make_celery(app):
celery.conf.update(app.config)
class ContextTask(celery.Task):
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
return celery
@celery.task
def add(x, y):
return x + y
def create_app():
app = Flask(__name__)
# CELERY_BROKER_URL
app.config['BROKER_URL'] = 'sqla+sqlite:///celerydb.sqlite'
app.config['CELERY_RESULT_BACKEND'] = 'db+sqlite:///results.sqlite'
make_celery(app)
return app
app = create_app()test_celery.py
import pytest
from run import app as app_, add
@pytest.fixture(scope='session')
def app(request):
ctx = app_.app_context()
ctx.push()
def teardown():
ctx.pop()
request.addfinalizer(teardown)
return app_
@pytest.fixture(scope='session')
def celery_app(app):
from run import celery
# for use celery_worker fixture
from celery.contrib.testing import tasks # NOQA
return celery
def test_add(celery_app, celery_worker):
assert add.delay(1, 2).get() == 3我希望这能帮到你!
关于RESTful API的更多例子,项目构建.:https://github.com/TTWShell/hobbit-core
为什么不使用夹具celery_config,celery_app,celery_worker在celery.contrib.pytest中?见芹菜医生:芹菜试验。
因为这个实例芹菜两次,一个在run.py中,另一个在celery.contrib.pytest.celery_app中。当我们delay一个任务时,会发生错误。
我们重写了celery_app ,用于在烧瓶应用程序上下文中运行芹菜。
发布于 2017-11-30 18:57:16
我没有使用celery.contrib.pytest,但我想提出一个不错的解决方案。
首先,您需要的是将芹菜任务划分为sync和async部分。这里是sync_tasks.py的一个例子
def filtering_something(my_arg1):
# do something here
def processing_something(my_arg2):
# do something hereasync_tasks.py(或您的芹菜任务)示例:
@current_app.task(bind=True)
def async_filtering_something(my_arg1):
# just call sync code from celery task...
return filtering_something(my_arg1)
@current_app.task(bind=True)
def async_processing_something(my_arg2):
processing_something(my_arg2)
# or one more call...
# or one more call...在这种情况下,您可以为所有功能编写测试,而不依赖于Celery application。
from unittest import TestCase
class SyncTasks(TestCase):
def test_filtering_something(self):
# ....
def test_processing_something(self):
# ....有什么福利待遇?
celery app和flask app分离。worker_pool、brokers、连接池或其他方面产生问题。celery.contrib.pytest,但是您可以用100%的测试覆盖代码。mocks。希望这能有所帮助。
https://stackoverflow.com/questions/47570623
复制相似问题