Как установить повторные задания в случае неудачи в Django-Celery
Я пытаюсь запустить задачу с помощью celery. Мне нужно отправлять пост-запросы на удаленный сервер, пока пользователь нажимает кнопку отправки, Поэтому я попробовал использовать celery с Redis с такой конфигурацией в файле настроек:
BROKER_URL = os.environ.get("REDIS_URL")
CELERY_RESULT_BACKEND = os.environ.get("REDIS_URL")
CELERY_ACCEPT_CONTENT = ["application/json"]
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Dubai'
Согласно документации для apply_async я могу определить опции повторной попытки, как показано ниже:
__task_expiration = 60
__interval_start = 1 * 60
api_generator.apply_async(args=(*args),
group=user_key,
expires=__task_expiration,
retry=True,
retry_policy={
"max_retries": 3,
"interval_start": __interval_start
})
В документации я нашел следующее определение для apply_async:
apply_async(args=None, kwargs=None, task_id=None, producer=None, link=None, link_error=None, shadow=None, **options)
и, следуя документации, я могу установить это с помощью retry и retry_policy
и пример кода для определения опций повторной попытки
add.apply_async((2, 2), retry=True, retry_policy={
'max_retries': 3,
'interval_start': 0,
'interval_step': 0.2,
'interval_max': 0.2,
})
Я хочу, чтобы моя задача запускалась 3 раза в случае любого сбоя, а интервал между каждой повторной попыткой составлял 60 секунд. определение моей задачи выглядит следующим образом:
@shared_task
def api_generator(*args):
import requests
import json
url = os.environ.get("API_URL_CALL")
api_access_key = os.environ.get("API_ACCESS_KEY")
headers = {
"Authorization": api_access_key,
"Content-Type": "application/json"
}
json_schema = generate_json(*args)
response = requests.request("POST", url, headers=headers, data=json.dumps(json_schema), timeout=30)
if response.status_code != 200:
raise NameError("API Response error")
return response.status_code
но когда мой код терпит неудачу, я не вижу никакого механизма повторного выполнения в логах celery, в чем здесь проблема? как я могу определить повторное выполнение при вызове моих задач с помощью метода apply_async? Я поднимаю NameError("Exception")
для сообщения работнику, что произошла ошибка.
[EDIT 1: Добавлено acks_late
]
Есть две вещи, которые могут пойти не так, когда вы посылаете задание работнику Celery:
- Проблемы с соединением с брокером и очередью сообщений.
- Исключения, возникающие на рабочем устройстве.
Первую проблему можно решить, определив retry
и retry_policy
так, как это сделали вы.
Второй вид (который вы хотите решить), может быть решен вызовом self.retry()
при отказе задачи.
В зависимости от типа вашей проблемы, может быть полезно установить CELERY_ACKS_LATE = True
.
Посмотрите эти ссылки для получения дополнительной информации:
Восстановление потерянных или неудачных задач (Celery, Django и RabbitMQ)
https://coderbook.com/@marcus/how-to-automatically-retry-failed-tasks-with-celery/