Как установить повторные задания в случае неудачи в Django-Celery

Я пытаюсь запустить задачу с помощью celery. Мне нужно отправлять пост-запросы на удаленный сервер, пока пользователь нажимает кнопку отправки, Поэтому я попробовал использовать celery с Redis с такой конфигурацией в файле настроек:

BROKER_URL = os.environ.get("REDIS_URL")
CELERY_RESULT_BACKEND = os.environ.get("REDIS_URL")
CELERY_ACCEPT_CONTENT = ["application/json"]
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Dubai'

Согласно документации для apply_async я могу определить опции повторной попытки, как показано ниже:

__task_expiration = 60
__interval_start = 1 * 60

api_generator.apply_async(args=(*args),
                                group=user_key,
                                expires=__task_expiration,
                                retry=True,
                                retry_policy={
                                  "max_retries": 3,
                                  "interval_start": __interval_start
                                })

В документации я нашел следующее определение для apply_async:

apply_async(args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, shadow=None, **options)

и, следуя документации, я могу установить это с помощью retry и retry_policy

enter image description here

и пример кода для определения опций повторной попытки

add.apply_async((2, 2), retry=True, retry_policy={
    'max_retries': 3,
    'interval_start': 0,
    'interval_step': 0.2,
    'interval_max': 0.2,
})

Я хочу, чтобы моя задача запускалась 3 раза в случае любого сбоя, а интервал между каждой повторной попыткой составлял 60 секунд. определение моей задачи выглядит следующим образом:

@shared_task
def api_generator(*args):
    import requests
    import json
    url = os.environ.get("API_URL_CALL")
    api_access_key = os.environ.get("API_ACCESS_KEY")

    headers = {
        "Authorization": api_access_key,
        "Content-Type": "application/json"
    }

    json_schema = generate_json(*args)

    response = requests.request("POST", url, headers=headers, data=json.dumps(json_schema), timeout=30)

    if response.status_code != 200:
        raise NameError("API Response error")

    return response.status_code

но когда мой код терпит неудачу, я не вижу никакого механизма повторного выполнения в логах celery, в чем здесь проблема? как я могу определить повторное выполнение при вызове моих задач с помощью метода apply_async? Я поднимаю NameError("Exception") для сообщения работнику, что произошла ошибка.

[EDIT 1: Добавлено acks_late]

Есть две вещи, которые могут пойти не так, когда вы посылаете задание работнику Celery:

  1. Проблемы с соединением с брокером и очередью сообщений.
  2. Исключения, возникающие на рабочем устройстве.

Первую проблему можно решить, определив retry и retry_policy так, как это сделали вы.

Второй вид (который вы хотите решить), может быть решен вызовом self.retry() при отказе задачи.

В зависимости от типа вашей проблемы, может быть полезно установить CELERY_ACKS_LATE = True.

Посмотрите эти ссылки для получения дополнительной информации:

Восстановление потерянных или неудачных задач (Celery, Django и RabbitMQ)

https://coderbook.com/@marcus/how-to-automatically-retry-failed-tasks-with-celery/

Вернуться на верх