Pytest с django celery
У меня есть задача celery, которая создает записи моделей в базе данных. Celery использует Redis в качестве брокера.
Celery config in settings.py:
CELERY_BROKER_URL = 'redis://localhost:6379/0'
CELERY_RESULT_BACKEND = 'django-db'
from .models import Message
from itertools import islice
from celery import shared_task
@shared_task(ignore_result=True)
def create_entries(data: list):
batch_size = 100
obj_iterator = (Message(**obj) for obj in data)
while True:
batch = list(islice(obj_iterator, batch_size))
if not batch:
break
Message.objects.bulk_create(batch, batch_size)
В приложении все работает нормально, но тесты не работают. Код метода тестирования:
import tasks
import pytest
from .models import Message
from celery.result import AsyncResult
@pytest.mark.django_db
@pytest.mark.celery
def test_create_entries():
message_data = [
{
text: 'hello bro',
client: 'Nick'
},
]
assert Message.objects.count() == 0
task = tasks.create_entries.delay(message_data)
result = AsyncResult(task.task_id)
assert result.status == 'SUCCESS' # here is a fail, status = PENDING
assert Message.objects.count() == 1 # here is a fail, count = 0
Но если запустить сервер: python manage.py runserver
и запустить задания, то записи создаются успешно.
Какова может быть причина того, что задания не работают в тестах?