Как заставить Celery + Django + Pytest работать вместе

Пытаюсь протестировать конечную точку, которая имеет задачу celery. Похоже, что задачи сельдерея не запускаются в тесте.

  • django==4.1.5
  • celery==5.2.7
  • pytest==7.2.1
  • pytest-django==4.5.2

Конечная точка:


def do_some_stuff(blah: Blah) -> Blah:
          res = cool_task.apply_async(kwargs={
            'cool_id': int(pk),
            'config': config,
            'name': RESUBMIT.value,
        },
            link=update_status.si(
                cool_id=int(pk),
                new_status="why is this so hard",
            )
        )
  [...]

Тест:


@pytest.mark.django_db
def test_my_thing(django_client: Client) -> None:
  [...]
  response = django_client.post(f"/api/myendpoint/{mything.id}/do_some_stuff/")

Попадает на конечную точку. Получает обратно 202, как и ожидалось. Но celery, похоже, не берет задачу в тесте. Метод update_status обновляет базу данных, а я не вижу, чтобы это произошло.

Я пробовал создать приложение celery в тесте, создать worker в тесте, изменить тест, чтобы он использовал основную БД вместо тестовой, установить override_settings в "BROKER_BANDEND='memory'".

Мне бы хотелось получить полный рабочий пример. Это вроде бы элементарно, но это ускользает от меня. Я не понимаю, какая комбинация приспособлений и переопределений мне нужна, чтобы это работало.

Кажется, что все работает, когда я действительно вызываю приложение.

Вернуться на верх