Как заставить Celery + Django + Pytest работать вместе
Пытаюсь протестировать конечную точку, которая имеет задачу celery. Похоже, что задачи сельдерея не запускаются в тесте.
- django==4.1.5
- celery==5.2.7
- pytest==7.2.1
- pytest-django==4.5.2
Конечная точка:
def do_some_stuff(blah: Blah) -> Blah:
res = cool_task.apply_async(kwargs={
'cool_id': int(pk),
'config': config,
'name': RESUBMIT.value,
},
link=update_status.si(
cool_id=int(pk),
new_status="why is this so hard",
)
)
[...]
Тест:
@pytest.mark.django_db
def test_my_thing(django_client: Client) -> None:
[...]
response = django_client.post(f"/api/myendpoint/{mything.id}/do_some_stuff/")
Попадает на конечную точку. Получает обратно 202, как и ожидалось. Но celery, похоже, не берет задачу в тесте. Метод update_status
обновляет базу данных, а я не вижу, чтобы это произошло.
Я пробовал создать приложение celery в тесте, создать worker в тесте, изменить тест, чтобы он использовал основную БД вместо тестовой, установить override_settings в "BROKER_BANDEND='memory'".
Мне бы хотелось получить полный рабочий пример. Это вроде бы элементарно, но это ускользает от меня. Я не понимаю, какая комбинация приспособлений и переопределений мне нужна, чтобы это работало.
Кажется, что все работает, когда я действительно вызываю приложение.