Pytest с django celery

У меня есть задача celery, которая создает записи моделей в базе данных. Celery использует Redis в качестве брокера.

Celery config in settings.py:

CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'django-db'
from .models import Message
from itertools import islice
from celery import shared_task

@shared_task(ignore_result=True)
def create_entries(data: list):
    batch_size = 100
    obj_iterator = (Message(**obj) for obj in data)
    while True:
        batch = list(islice(obj_iterator, batch_size))
        if not batch:
            break
        Message.objects.bulk_create(batch, batch_size)

В приложении все работает нормально, но тесты не работают. Код метода тестирования:

import tasks
import pytest
from .models import Message
from celery.result import AsyncResult

@pytest.mark.django_db
@pytest.mark.celery
def test_create_entries():
    message_data = [
        {
            text: 'hello bro',
            client: 'Nick'
        },
    ]
    assert Message.objects.count() == 0
    task = tasks.create_entries.delay(message_data)
    result = AsyncResult(task.task_id)
    assert result.status == 'SUCCESS' # here is a fail, status = PENDING
    assert Message.objects.count() == 1 # here is a fail, count = 0

Но если запустить сервер: python manage.py runserver и запустить задания, то записи создаются успешно.

Какова может быть причина того, что задания не работают в тестах?

Вернуться на верх