Testing with Celery

Tasks and unit tests

To test task behavior in unit tests the preferred method is mocking.

Eager mode

The eager mode enabled by the task_always_eager setting is by definition not suitable for unit tests.

When testing with eager mode you are only testing an emulation of what happens in a worker, and there are many discrepancies between the emulation and what happens in reality.

Note that eagerly executed tasks don’t write results to backend by default. If you want to enable this functionality, have a look at task_store_eager_result.

A Celery task is much like a web view, in that it should only define how to perform the action in the context of being called as a task.

This means optimally tasks only handle things like serialization, message headers, retries, and so on, with the actual logic implemented elsewhere.

Say we had a task like this:

from .models import Product


@app.task(bind=True)
def send_order(self, product_pk, quantity, price):
    price = Decimal(price)  # json serializes this to string.

    # models are passed by id, not serialized.
    product = Product.objects.get(product_pk)

    try:
        product.order(quantity, price)
    except OperationalError as exc:
        raise self.retry(exc=exc)

Note: A task being bound means the first argument to the task will always be the task instance (self). which means you do get a self argument as the first argument and can use the Task class methods and attributes.

You could write unit tests for this task, using mocking like in this example:

from pytest import raises

from celery.exceptions import Retry

# for python 2: use mock.patch from `pip install mock`.
from unittest.mock import patch

from proj.models import Product
from proj.tasks import send_order

class test_send_order:

    @patch('proj.tasks.Product.order')  # < patching Product in module above
    def test_success(self, product_order):
        product = Product.objects.create(
            name='Foo',
        )
        send_order(product.pk, 3, Decimal(30.3))
        product_order.assert_called_with(3, Decimal(30.3))

    @patch('proj.tasks.Product.order')
    @patch('proj.tasks.send_order.retry')
    def test_failure(self, send_order_retry, product_order):
        product = Product.objects.create(
            name='Foo',
        )

        # Set a side effect on the patched methods
        # so that they raise the errors we want.
        send_order_retry.side_effect = Retry()
        product_order.side_effect = OperationalError()

        with raises(Retry):
            send_order(product.pk, 3, Decimal(30.6))

pytest

New in version 4.0.

Celery also makes a pytest plugin available that adds fixtures that you can use in your integration (or unit) test suites.

Enabling

Celery initially ships the plugin in a disabled state, to enable it you can either:

  • pip install celery[pytest]

  • pip install pytest-celery

  • or add an environment variable PYTEST_PLUGINS=celery.contrib.pytest

  • or add pytest_plugins = ("celery.contrib.pytest", ) to your root conftest.py

Marks

celery - Set test app configuration.

The celery mark enables you to override the configuration used for a single test case:

@pytest.mark.celery(result_backend='redis://')
def test_something():
    ...

or for all the test cases in a class:

@pytest.mark.celery(result_backend='redis://')
class test_something:

    def test_one(self):
        ...

    def test_two(self):
        ...

Fixtures

Function scope

celery_app - Celery app used for testing.

This fixture returns a Celery app you can use for testing.

Example:

def test_create_task(celery_app, celery_worker):
    @celery_app.task
    def mul(x, y):
        return x * y

    assert mul.delay(4, 4).get(timeout=10) == 16
celery_worker - Embed live worker.

This fixture starts a Celery worker instance that you can use for integration tests. The worker will be started in a separate thread and will be shutdown as soon as the test returns.

By default the fixture will wait up to 10 seconds for the worker to complete outstanding tasks and will raise an exception if the time limit is exceeded. The timeout can be customized by setting the shutdown_timeout key in the dictionary returned by the celery_worker_parameters() fixture.

Example:

# Put this in your conftest.py
@pytest.fixture(scope='session')
def celery_config():
    return {
        'broker_url': 'amqp://',
        'result_backend': 'redis://'
    }

def test_add(celery_worker):
    mytask.delay()


# If you wish to override some setting in one test cases
# only - you can use the ``celery`` mark:
@pytest.mark.celery(result_backend='rpc')
def test_other(celery_worker):
    ...

Heartbeats are disabled by default which means that the test worker doesn’t send events for worker-online, worker-offline and worker-heartbeat. To enable heartbeats modify the celery_worker_parameters() fixture:

# Put this in your conftest.py
@pytest.fixture(scope="session")
def celery_worker_parameters():
    return {"without_heartbeat": False}
    ...

Session scope

celery_config - Override to setup Celery test app configuration.

You can redefine this fixture to configure the test Celery app.

The config returned by your fixture will then be used to configure the celery_app(), and celery_session_app() fixtures.

Example:

@pytest.fixture(scope='session')
def celery_config():
    return {
        'broker_url': 'amqp://',
        'result_backend': 'rpc',
    }
celery_parameters - Override to setup Celery test app parameters.

You can redefine this fixture to change the __init__ parameters of test Celery app. In contrast to celery_config(), these are directly passed to when instantiating Celery.

The config returned by your fixture will then be used to configure the celery_app(), and celery_session_app() fixtures.

Example:

@pytest.fixture(scope='session')
def celery_parameters():
    return {
        'task_cls':  my.package.MyCustomTaskClass,
        'strict_typing': False,
    }
celery_worker_parameters - Override to setup Celery worker parameters.

You can redefine this fixture to change the __init__ parameters of test Celery workers. These are directly passed to WorkController when it is instantiated.

The config returned by your fixture will then be used to configure the celery_worker(), and celery_session_worker() fixtures.

Example:

@pytest.fixture(scope='session')
def celery_worker_parameters():
    return {
        'queues':  ('high-prio', 'low-prio'),
        'exclude_queues': ('celery'),
    }
celery_enable_logging - Override to enable logging in embedded workers.

This is a fixture you can override to enable logging in embedded workers.

Example:

@pytest.fixture(scope='session')
def celery_enable_logging():
    return True
celery_includes - Add additional imports for embedded workers.

You can override fixture to include modules when an embedded worker starts.

You can have this return a list of module names to import, which can be task modules, modules registering signals, and so on.

Example:

@pytest.fixture(scope='session')
def celery_includes():
    return [
        'proj.tests.tasks',
        'proj.tests.celery_signal_handlers',
    ]
celery_worker_pool - Override the pool used for embedded workers.

You can override fixture to configure the execution pool used for embedded workers.

Example:

@pytest.fixture(scope='session')
def celery_worker_pool():
    return 'prefork'

Warning

You cannot use the gevent/eventlet pools, that is unless your whole test suite is running with the monkeypatches enabled.

celery_session_worker - Embedded worker that lives throughout the session.

This fixture starts a worker that lives throughout the testing session (it won’t be started/stopped for every test).

Example:

# Add this to your conftest.py
@pytest.fixture(scope='session')
def celery_config():
    return {
        'broker_url': 'amqp://',
        'result_backend': 'rpc',
    }

# Do this in your tests.
def test_add_task(celery_session_worker):
    assert add.delay(2, 2) == 4

Warning

It’s probably a bad idea to mix session and ephemeral workers…

celery_session_app - Celery app used for testing (session scope).

This can be used by other session scoped fixtures when they need to refer to a Celery app instance.

use_celery_app_trap - Raise exception on falling back to default app.

This is a fixture you can override in your conftest.py, to enable the “app trap”: if something tries to access the default or current_app, an exception is raised.

Example:

@pytest.fixture(scope='session')
def use_celery_app_trap():
    return True

If a test wants to access the default app, you would have to mark it using the depends_on_current_app fixture:

@pytest.mark.usefixtures('depends_on_current_app')
def test_something():
    something()
Back to Top