Повторное выполнение модульного теста Celery

В настоящее время я пишу модульные тесты для моих задач celery и хотел бы проверить, что моя задача повторяется.

Примечание: ALWAYS_EAGER установлен на True в тестовых настройках

@app.shared_task(bind=True, soft_time_limit=600, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3},
                 retry_backoff=3)
def my_task(self, obj_pk):
    try:
        # ...
        function_call_that_can_raise_exception()
    except Exception as exc:
        last_try = self.request.retries >= self.retry_kwargs["max_retries"]

        if last_try:
            # ....
        else:
            # ...
        
        raise_with_context(exc)

Я могу проверить свой последний запуск, подражая celery.app.task.Task.request;

@mock.patch("celery.app.task.Task.request")
def test_my_task(self):
    mock_task_request.retries = 3
    my_task(12)
    # some asserts

Как я могу проверить, что моя задача действительно повторно выполняется автоматически?

Хитрость в том, чтобы использовать apply, а не delay или apply_async:

def test_my_task_is_retried(self):
    my_task.apply(kwargs={"obj_pk": 12})
    # assert what should only happen in the last run
Вернуться на верх