首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >在xdist pytest下运行芹菜测试时的死锁

在xdist pytest下运行芹菜测试时的死锁
EN

Stack Overflow用户
提问于 2022-10-10 14:33:49
回答 1查看 47关注 0票数 0

如果我在没有xdist参与的情况下运行,像这样:

代码语言:javascript
复制
pytest --disable-warnings --verbose -s test_celery_chords.py

效果很好。我看到数据库已经创建,任务正在运行,并且按预期的方式退出。

如果涉及到xdist (-n 2),如下所示:

代码语言:javascript
复制
pytest --disable-warnings --verbose -n 2 -s test_celery_chords.py

最后是挂起的进程(有时是这些消息):

代码语言:javascript
复制
Destroying old test database for alias 'default'...
Chord callback '4c7664ce-89e0-475e-81a7-4973929d2256' raised: ValueError('4c7664ce-89e0-475e-81a7-4973929d2256')
Traceback (most recent call last):
  File "/Users/bob/.virtualenv/testme/lib/python3.10/site-packages/celery/backends/base.py", line 1019, in on_chord_part_return
    raise ValueError(gid)
ValueError: 4c7664ce-89e0-475e-81a7-4973929d2256
Chord callback '4c7664ce-89e0-475e-81a7-4973929d2256' raised: ValueError('4c7664ce-89e0-475e-81a7-4973929d2256')
Traceback (most recent call last):
  File "/Users/bob/.virtualenv/testme/lib/python3.10/site-packages/celery/backends/base.py", line 1019, in on_chord_part_return
    raise ValueError(gid)
ValueError: 4c7664ce-89e0-475e-81a7-4973929d2256
Chord callback '4c7664ce-89e0-475e-81a7-4973929d2256' raised: ValueError('4c7664ce-89e0-475e-81a7-4973929d2256')
Traceback (most recent call last):
  File "/Users/bob/.virtualenv/testme/lib/python3.10/site-packages/celery/backends/base.py", line 1019, in on_chord_part_return
    raise ValueError(gid)
ValueError: 4c7664ce-89e0-475e-81a7-4973929d2256
Chord callback '4c7664ce-89e0-475e-81a7-4973929d2256' raised: ValueError('4c7664ce-89e0-475e-81a7-4973929d2256')
Traceback (most recent call last):
  File "/Users/bob/.virtualenv/testme/lib/python3.10/site-packages/celery/backends/base.py", line 1019, in on_chord_part_return
    raise ValueError(gid)
ValueError: 4c7664ce-89e0-475e-81a7-4973929d2256
Chord callback '4c7664ce-89e0-475e-81a7-4973929d2256' raised: ValueError('4c7664ce-89e0-475e-81a7-4973929d2256')
Traceback (most recent call last):
  File "/Users/bob/.virtualenv/testme/lib/python3.10/site-packages/celery/backends/base.py", line 1019, in on_chord_part_return
    raise ValueError(gid)
ValueError: 4c7664ce-89e0-475e-81a7-4973929d2256

[gw0] ERROR test_celery_chords.py::test_chords Destroying test database for alias 'default'...

结束它的唯一方法是用^C。

这是我的两个测试(本质上是相同的测试)。这些任务(简单的add和普通示例测试)不需要DB,但其他使用DB的Django测试也需要DB。

代码语言:javascript
复制
def test_chords(transactional_db, celery_app, celery_worker, celery_not_eager):

    celery_app.config_from_object("django.conf:settings", namespace="CELERY")
    task = do_average.delay()
    results = task.get()
    assert task.state == "SUCCESS"
    assert len(results[0][1][1]) == 10


def test_chord_differently(transactional_db, celery_app, celery_worker, celery_not_eager):

    celery_app.config_from_object("django.conf:settings", namespace="CELERY")
    task = do_average.delay()
    results = task.get()
    assert task.state == "SUCCESS"
    assert len(results[0][1][1]) == 10

和任务(应该不重要)

代码语言:javascript
复制
@shared_task
def _add(x: int, y: int) -> int:
    print(f"{x} + {y} {time.time()}")
    return x + y


@shared_task
def _average(numbers: List[int]) -> float:
    print(f"AVERAGING {sum(numbers)} / {len(numbers)}")
    return sum(numbers) / len(numbers)


@shared_task
def do_average():
    tasks = [_add.s(i, i) for i in range(10)]
    print(f"Creating chord of {len(tasks)} tasks at {time.time()}")
    return chord(tasks)(_average.s())

使用这个的conftest.py:

代码语言:javascript
复制
@pytest.fixture
def celery_not_eager(settings):
    settings.CELERY_TASK_ALWAYS_EAGER = False
    settings.CELERY_TASK_EAGER_PROPAGATES = False

热测试-固定装置

代码语言:javascript
复制
celery_app -- .../python3.10/site packages/celery/contrib/pytest.py:173
    Fixture creating a Celery application instance.

celery_worker -- .../python3.10/site-packages/celery/contrib/pytest.py:195
    Fixture: Start worker in a thread, stop it when the test returns.

使用

代码语言:javascript
复制
django=4.1.2
pytest-celery==0.0.0
pytest-cov==3.0.0
pytest-django==4.5.2
pytest-xdist==2.5.0
EN

回答 1

Stack Overflow用户

发布于 2022-11-05 10:26:54

虽然我还没有解决这个问题,但我已经找到了一种使用@pytest.mark.xdist_group(name="celery")来修饰测试类的解决方法,我可以这样做:

代码语言:javascript
复制
@pytest.mark.xdist_group(name="celery")
@override_settings(CELERY_TASK_ALWAYS_EAGER=False)
@override_settings(CELERY_TASK_EAGER_PROPAGATES=False)
class SyncTaskTestCase2(TransactionTestCase):
    @classmethod
    def setUpClass(cls):
        super().setUpClass()
        cls.celery_worker = start_worker(app, perform_ping_check=False)
        cls.celery_worker.__enter__()
        print(f"Celery Worker started {time.time()}")

    @classmethod
    def tearDownClass(cls):
        print(f"Tearing down Superclass {time.time()}")
        super().tearDownClass()
        print(f"Tore down Superclass {time.time()}")
        cls.celery_worker.__exit__(None, None, None)
        print(f"Celery Worker torn down {time.time()}")

    def test_success(self):
        print(f"Starting test at {time.time()}")
        self.task = do_average_in_chord.delay()
        self.task.get()
        print(f"Finished Averaging at {time.time()}")
        assert self.task.successful()

这与命令行选项--dist loadgroup一起强制所有“芹菜”组在相同的运行进程上运行,从而防止死锁,并允许-numprocesses 10运行到完成。

这里最大的缺点是给芹菜工人开了9秒的罚单,这让你很容易把所有的芹菜测试都推到一个等级。

代码语言:javascript
复制
# This accomplishes the same things as the unitest above WITHOUT having a Class wrapped around the tests it also eliminates the 9 second teardown wait.
@pytest.mark.xdist_group(name="celery")
@pytest.mark.django_db # Why do I need this and transactional_db???
def test_averaging_in_a_chord(
    transactional_db,
    celery_session_app,
    celery_session_worker,
    use_actual_celery_worker,
):
    task = do_average_in_chord.delay()
    task.get()
    assert task.successful()

你在你的conftest.py中确实需要这个

代码语言:javascript
复制
from typing import Type
import time

import pytest
from pytest_django.fixtures import SettingsWrapper

from celery import Celery
from celery.contrib.testing.worker import start_worker


@pytest.fixture(scope="function")
def use_actual_celery_worker(settings: SettingsWrapper) -> SettingsWrapper:
    """Turns of CELERY_TASK_ALWAYS_EAGER and CELERY_TASK_EAGER_PROPAGATES for a single test. """
    settings.CELERY_TASK_ALWAYS_EAGER = False
    settings.CELERY_TASK_EAGER_PROPAGATES = False
    return settings


@pytest.fixture(scope="session")
def celery_session_worker(celery_session_app: Celery):
    """Re-implemented this so that my celery app gets used.  This keeps the priority queue stuff the same
    as it is in production.  If BROKER_BACKEND is set to "memory" then rabbit shouldn't be involved anyway."""
    celery_worker = start_worker(
        celery_session_app, perform_ping_check=False, shutdown_timeout=0.5
    )
    celery_worker.__enter__()
    yield celery_worker

    # This causes the worker to exit immediately so that we don't have a 9 second wait for the timeout.
    celery_session_app.control.shutdown()

    print(f"Tearing down Celery Worker {time.time()}")
    celery_worker.__exit__(None, None, None)
    print(f"Celery Worker torn down {time.time()}")


@pytest.fixture(scope="session")
def celery_session_app() -> Celery:

    from workshop.celery import app

    """ Get the app you would regularly use for celery tasks and return it.  This insures all of your default
    app options mirror what you use at runtime."""
    yield app
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/74016562

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档