如果我在没有xdist参与的情况下运行,像这样:
pytest --disable-warnings --verbose -s test_celery_chords.py效果很好。我看到数据库已经创建,任务正在运行,并且按预期的方式退出。
如果涉及到xdist (-n 2),如下所示:
pytest --disable-warnings --verbose -n 2 -s test_celery_chords.py最后是挂起的进程(有时是这些消息):
Destroying old test database for alias 'default'...
Chord callback '4c7664ce-89e0-475e-81a7-4973929d2256' raised: ValueError('4c7664ce-89e0-475e-81a7-4973929d2256')
Traceback (most recent call last):
File "/Users/bob/.virtualenv/testme/lib/python3.10/site-packages/celery/backends/base.py", line 1019, in on_chord_part_return
raise ValueError(gid)
ValueError: 4c7664ce-89e0-475e-81a7-4973929d2256
Chord callback '4c7664ce-89e0-475e-81a7-4973929d2256' raised: ValueError('4c7664ce-89e0-475e-81a7-4973929d2256')
Traceback (most recent call last):
File "/Users/bob/.virtualenv/testme/lib/python3.10/site-packages/celery/backends/base.py", line 1019, in on_chord_part_return
raise ValueError(gid)
ValueError: 4c7664ce-89e0-475e-81a7-4973929d2256
Chord callback '4c7664ce-89e0-475e-81a7-4973929d2256' raised: ValueError('4c7664ce-89e0-475e-81a7-4973929d2256')
Traceback (most recent call last):
File "/Users/bob/.virtualenv/testme/lib/python3.10/site-packages/celery/backends/base.py", line 1019, in on_chord_part_return
raise ValueError(gid)
ValueError: 4c7664ce-89e0-475e-81a7-4973929d2256
Chord callback '4c7664ce-89e0-475e-81a7-4973929d2256' raised: ValueError('4c7664ce-89e0-475e-81a7-4973929d2256')
Traceback (most recent call last):
File "/Users/bob/.virtualenv/testme/lib/python3.10/site-packages/celery/backends/base.py", line 1019, in on_chord_part_return
raise ValueError(gid)
ValueError: 4c7664ce-89e0-475e-81a7-4973929d2256
Chord callback '4c7664ce-89e0-475e-81a7-4973929d2256' raised: ValueError('4c7664ce-89e0-475e-81a7-4973929d2256')
Traceback (most recent call last):
File "/Users/bob/.virtualenv/testme/lib/python3.10/site-packages/celery/backends/base.py", line 1019, in on_chord_part_return
raise ValueError(gid)
ValueError: 4c7664ce-89e0-475e-81a7-4973929d2256
[gw0] ERROR test_celery_chords.py::test_chords Destroying test database for alias 'default'...结束它的唯一方法是用^C。
这是我的两个测试(本质上是相同的测试)。这些任务(简单的add和普通示例测试)不需要DB,但其他使用DB的Django测试也需要DB。
def test_chords(transactional_db, celery_app, celery_worker, celery_not_eager):
celery_app.config_from_object("django.conf:settings", namespace="CELERY")
task = do_average.delay()
results = task.get()
assert task.state == "SUCCESS"
assert len(results[0][1][1]) == 10
def test_chord_differently(transactional_db, celery_app, celery_worker, celery_not_eager):
celery_app.config_from_object("django.conf:settings", namespace="CELERY")
task = do_average.delay()
results = task.get()
assert task.state == "SUCCESS"
assert len(results[0][1][1]) == 10和任务(应该不重要)
@shared_task
def _add(x: int, y: int) -> int:
print(f"{x} + {y} {time.time()}")
return x + y
@shared_task
def _average(numbers: List[int]) -> float:
print(f"AVERAGING {sum(numbers)} / {len(numbers)}")
return sum(numbers) / len(numbers)
@shared_task
def do_average():
tasks = [_add.s(i, i) for i in range(10)]
print(f"Creating chord of {len(tasks)} tasks at {time.time()}")
return chord(tasks)(_average.s())使用这个的conftest.py:
@pytest.fixture
def celery_not_eager(settings):
settings.CELERY_TASK_ALWAYS_EAGER = False
settings.CELERY_TASK_EAGER_PROPAGATES = False热测试-固定装置
celery_app -- .../python3.10/site packages/celery/contrib/pytest.py:173
Fixture creating a Celery application instance.
celery_worker -- .../python3.10/site-packages/celery/contrib/pytest.py:195
Fixture: Start worker in a thread, stop it when the test returns.使用
django=4.1.2
pytest-celery==0.0.0
pytest-cov==3.0.0
pytest-django==4.5.2
pytest-xdist==2.5.0发布于 2022-11-05 10:26:54
虽然我还没有解决这个问题,但我已经找到了一种使用@pytest.mark.xdist_group(name="celery")来修饰测试类的解决方法,我可以这样做:
@pytest.mark.xdist_group(name="celery")
@override_settings(CELERY_TASK_ALWAYS_EAGER=False)
@override_settings(CELERY_TASK_EAGER_PROPAGATES=False)
class SyncTaskTestCase2(TransactionTestCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
cls.celery_worker = start_worker(app, perform_ping_check=False)
cls.celery_worker.__enter__()
print(f"Celery Worker started {time.time()}")
@classmethod
def tearDownClass(cls):
print(f"Tearing down Superclass {time.time()}")
super().tearDownClass()
print(f"Tore down Superclass {time.time()}")
cls.celery_worker.__exit__(None, None, None)
print(f"Celery Worker torn down {time.time()}")
def test_success(self):
print(f"Starting test at {time.time()}")
self.task = do_average_in_chord.delay()
self.task.get()
print(f"Finished Averaging at {time.time()}")
assert self.task.successful()这与命令行选项--dist loadgroup一起强制所有“芹菜”组在相同的运行进程上运行,从而防止死锁,并允许-numprocesses 10运行到完成。
这里最大的缺点是给芹菜工人开了9秒的罚单,这让你很容易把所有的芹菜测试都推到一个等级。
# This accomplishes the same things as the unitest above WITHOUT having a Class wrapped around the tests it also eliminates the 9 second teardown wait.
@pytest.mark.xdist_group(name="celery")
@pytest.mark.django_db # Why do I need this and transactional_db???
def test_averaging_in_a_chord(
transactional_db,
celery_session_app,
celery_session_worker,
use_actual_celery_worker,
):
task = do_average_in_chord.delay()
task.get()
assert task.successful()你在你的conftest.py中确实需要这个
from typing import Type
import time
import pytest
from pytest_django.fixtures import SettingsWrapper
from celery import Celery
from celery.contrib.testing.worker import start_worker
@pytest.fixture(scope="function")
def use_actual_celery_worker(settings: SettingsWrapper) -> SettingsWrapper:
"""Turns of CELERY_TASK_ALWAYS_EAGER and CELERY_TASK_EAGER_PROPAGATES for a single test. """
settings.CELERY_TASK_ALWAYS_EAGER = False
settings.CELERY_TASK_EAGER_PROPAGATES = False
return settings
@pytest.fixture(scope="session")
def celery_session_worker(celery_session_app: Celery):
"""Re-implemented this so that my celery app gets used. This keeps the priority queue stuff the same
as it is in production. If BROKER_BACKEND is set to "memory" then rabbit shouldn't be involved anyway."""
celery_worker = start_worker(
celery_session_app, perform_ping_check=False, shutdown_timeout=0.5
)
celery_worker.__enter__()
yield celery_worker
# This causes the worker to exit immediately so that we don't have a 9 second wait for the timeout.
celery_session_app.control.shutdown()
print(f"Tearing down Celery Worker {time.time()}")
celery_worker.__exit__(None, None, None)
print(f"Celery Worker torn down {time.time()}")
@pytest.fixture(scope="session")
def celery_session_app() -> Celery:
from workshop.celery import app
""" Get the app you would regularly use for celery tasks and return it. This insures all of your default
app options mirror what you use at runtime."""
yield apphttps://stackoverflow.com/questions/74016562
复制相似问题