Тестирование с Celery

Задачи и модульные тесты

Для тестирования поведения задачи в модульных тестах предпочтительным методом является мокинг.

Режим нетерпения

Режим eager, включенный настройкой task_always_eager, по определению не подходит для модульных тестов.

При тестировании в режиме eager вы тестируете только эмуляцию того, что происходит в рабочем процессе, а между эмуляцией и тем, что происходит в реальности, существует множество расхождений.

Обратите внимание, что задания, выполняемые с нетерпением, по умолчанию не записывают результаты в бэкенд. Если вы хотите включить эту функциональность, посмотрите task_store_eager_result.

Задача Celery во многом похожа на веб-представление, поскольку она должна определять, как выполнить действие только в контексте вызова в качестве задачи.

Это означает, что оптимально задачи обрабатывают только такие вещи, как сериализация, заголовки сообщений, повторные попытки и так далее, а реальная логика реализуется в другом месте.

Допустим, у нас есть такая задача:

from .models import Product


@app.task(bind=True)
def send_order(self, product_pk, quantity, price):
    price = Decimal(price)  # json serializes this to string.

    # models are passed by id, not serialized.
    product = Product.objects.get(product_pk)

    try:
        product.order(quantity, price)
    except OperationalError as exc:
        raise self.retry(exc=exc)

Note: Задача, имеющая значение bound, означает, что первым аргументом задачи всегда будет экземпляр задачи (self). Это означает, что вы получаете аргумент self в качестве первого аргумента и можете использовать методы и атрибуты класса Task.

Вы можете написать модульные тесты для этой задачи, используя мокинг, как в этом примере:

from pytest import raises

from celery.exceptions import Retry

# for python 2: use mock.patch from `pip install mock`.
from unittest.mock import patch

from proj.models import Product
from proj.tasks import send_order

class test_send_order:

    @patch('proj.tasks.Product.order')  # < patching Product in module above
    def test_success(self, product_order):
        product = Product.objects.create(
            name='Foo',
        )
        send_order(product.pk, 3, Decimal(30.3))
        product_order.assert_called_with(3, Decimal(30.3))

    @patch('proj.tasks.Product.order')
    @patch('proj.tasks.send_order.retry')
    def test_failure(self, send_order_retry, product_order):
        product = Product.objects.create(
            name='Foo',
        )

        # Set a side effect on the patched methods
        # so that they raise the errors we want.
        send_order_retry.side_effect = Retry()
        product_order.side_effect = OperationalError()

        with raises(Retry):
            send_order(product.pk, 3, Decimal(30.6))

pytest

Добавлено в версии 4.0.

Celery также делает доступным плагин pytest, который добавляет фикстуры, которые вы можете использовать в своих интеграционных (или модульных) наборах тестов.

Включение

Celery изначально поставляет плагин в отключенном состоянии, чтобы включить его, вы можете либо:

  • pip install celery[pytest]

  • pip install pytest-celery

  • или добавить переменную окружения PYTEST_PLUGINS=celery.contrib.pytest

  • или добавьте pytest_plugins = ("celery.contrib.pytest", ) в ваш корневой файл conftest.py

Маркс

celery - Установить конфигурацию тестового приложения.

Отметка celery позволяет переопределить конфигурацию, используемую для одного тестового случая:

@pytest.mark.celery(result_backend='redis://')
def test_something():
    ...

или для всех тестовых случаев в классе:

@pytest.mark.celery(result_backend='redis://')
class test_something:

    def test_one(self):
        ...

    def test_two(self):
        ...

Приспособления

Объем функций

celery_app - Приложение Celery, используемое для тестирования.

Это приспособление возвращает приложение Celery, которое вы можете использовать для тестирования.

Пример:

def test_create_task(celery_app, celery_worker):
    @celery_app.task
    def mul(x, y):
        return x * y

    assert mul.delay(4, 4).get(timeout=10) == 16
celery_worker - Встраивание живого рабочего.

Это приспособление запускает рабочий экземпляр Celery, который вы можете использовать для интеграционных тестов. Рабочий будет запущен в отдельном потоке и будет выключен, как только тест вернется.

По умолчанию приспособление будет ждать до 10 секунд, пока рабочий завершит невыполненные задачи, и вызовет исключение, если лимит времени будет превышен. Таймаут можно настроить, установив ключ shutdown_timeout в словаре, возвращаемом приспособлением celery_worker_parameters().

Пример:

# Put this in your conftest.py
@pytest.fixture(scope='session')
def celery_config():
    return {
        'broker_url': 'amqp://',
        'result_backend': 'redis://'
    }

def test_add(celery_worker):
    mytask.delay()


# If you wish to override some setting in one test cases
# only - you can use the ``celery`` mark:
@pytest.mark.celery(result_backend='rpc')
def test_other(celery_worker):
    ...

По умолчанию сердцебиение отключено, что означает, что тестовый рабочий не посылает события для worker-online, worker-offline и worker-heartbeat. Чтобы включить сердцебиение, измените фикстуру celery_worker_parameters():

# Put this in your conftest.py
@pytest.fixture(scope="session")
def celery_worker_parameters():
    return {"without_heartbeat": False}
    ...

Объем сессии

celery_config - Переопределение для настройки конфигурации тестового приложения Celery.

Вы можете переопределить это приспособление для настройки тестового приложения Celery.

Конфиг, возвращаемый вашим приспособлением, будет затем использоваться для конфигурирования приспособлений celery_app() и << 1 >>>.

Пример:

@pytest.fixture(scope='session')
def celery_config():
    return {
        'broker_url': 'amqp://',
        'result_backend': 'rpc',
    }
celery_parameters - Переопределение для установки параметров тестового приложения Celery.

Вы можете переопределить это приспособление, чтобы изменить параметры __init__ тестового приложения Celery. В отличие от celery_config(), эти параметры передаются непосредственно при инстанцировании Celery.

Конфиг, возвращаемый вашим приспособлением, будет затем использоваться для конфигурирования приспособлений celery_app() и << 1 >>>.

Пример:

@pytest.fixture(scope='session')
def celery_parameters():
    return {
        'task_cls':  my.package.MyCustomTaskClass,
        'strict_typing': False,
    }
celery_worker_parameters - Переопределение для установки параметров рабочего Celery.

Вы можете переопределить это приспособление для изменения __init__ параметров тестовых Celery workers. Они непосредственно передаются в WorkController при его инстанцировании.

Конфиг, возвращаемый вашим приспособлением, будет затем использоваться для конфигурирования приспособлений celery_worker() и << 1 >>>.

Пример:

@pytest.fixture(scope='session')
def celery_worker_parameters():
    return {
        'queues':  ('high-prio', 'low-prio'),
        'exclude_queues': ('celery'),
    }
celery_enable_logging - Переопределение для включения протоколирования во встроенных рабочих.

Это свойство можно переопределить, чтобы включить ведение журнала во встроенных рабочих.

Пример:

@pytest.fixture(scope='session')
def celery_enable_logging():
    return True
celery_includes - Добавление дополнительных импортов для встроенных рабочих.

Вы можете переопределить приспособление для включения модулей при запуске встроенного рабочего.

Вы можете заставить его возвращать список имен модулей для импорта, которые могут быть модулями задач, модулями, регистрирующими сигналы, и так далее.

Пример:

@pytest.fixture(scope='session')
def celery_includes():
    return [
        'proj.tests.tasks',
        'proj.tests.celery_signal_handlers',
    ]
celery_worker_pool - переопределение пула, используемого для встроенных рабочих.

Вы можете переопределить fixture, чтобы настроить пул выполнения, используемый для встроенных рабочих.

Пример:

@pytest.fixture(scope='session')
def celery_worker_pool():
    return 'prefork'

Предупреждение

Вы не можете использовать пулы gevent/eventlet, то есть если только весь ваш набор тестов не работает с включенными monkeypatches.

celery_session_worker - Встроенный рабочий, который живет на протяжении всей сессии.

Этот фикстур запускает рабочий, который живет в течение всего сеанса тестирования (он не будет запускаться/останавливаться для каждого теста).

Пример:

# Add this to your conftest.py
@pytest.fixture(scope='session')
def celery_config():
    return {
        'broker_url': 'amqp://',
        'result_backend': 'rpc',
    }

# Do this in your tests.
def test_add_task(celery_session_worker):
    assert add.delay(2, 2) == 4

Предупреждение

Возможно, это плохая идея - смешивать сессионных и эфемерных работников…

celery_session_app - Приложение Celery, используемое для тестирования (область действия сессии).

Это может быть использовано другими привязками с привязкой к сессии, когда им нужно сослаться на экземпляр приложения Celery.

use_celery_app_trap - Вызывать исключение при возврате к приложению по умолчанию.

Это свойство можно переопределить в вашем conftest.py, чтобы включить «ловушку приложений»: если что-то пытается получить доступ к приложению по умолчанию или текущему_приложению, возникает исключение.

Пример:

@pytest.fixture(scope='session')
def use_celery_app_trap():
    return True

Если тест хочет получить доступ к приложению по умолчанию, вы должны пометить его с помощью фиксатора depends_on_current_app:

@pytest.mark.usefixtures('depends_on_current_app')
def test_something():
    something()
Вернуться на верх