Автоматический повтор невыполненных задач Celery

В этом руководстве мы рассмотрим, как автоматически повторять неудачные задания Celery.

Серия Django + Celery:

  1. Асинхронные задачи с Django и Celery
  2. Обработка периодических задач в Django с помощью Celery и Docker
  3. Автоматический повтор невыполненных задач Celery (эта статья)
  4. Работа с транзакциями базы данных Celery и Django

Цели

После прочтения вы должны уметь:

  1. Повторное выполнять неудачную задачу Celery с использованием как метода retry, так и аргумента декоратора
  2. Использовать экспоненциальный откат при повторном выполнении неудачной задачи
  3. Использовать задачу на основе класса для повторного использования аргументов повторной попытки

Задача Celery

Исходный код этого руководства вы можете найти на GitHub.

Предположим, что у нас есть задача Celery следующего вида:

@shared_task
def task_process_notification():
    if not random.choice([0, 1]):
        # имитируем случайную ошибку
        raise Exception()

    requests.post('https://httpbin.org/delay/5')

В реальном мире это может быть внутренний или внешний сторонний сервис. Независимо от сервиса, предположим, что он очень ненадежен, особенно в пиковые периоды. Как мы можем справиться с отказами?

Следует отметить, что многие начинающие Celery путаются, почему в одних руководствах используется app.task, а в других - shared_task. Так вот, shared_task позволяет вам определять задачи Celery без необходимости импортировать экземпляр Celery, поэтому он может сделать ваш код задачи более многоразовым.

Решение 1: использование блока try/except

Мы можем использовать блок try/except, чтобы поймать исключение и поднять вопрос retry:

@shared_task(bind=True)
def task_process_notification(self):
    try:
        if not random.choice([0, 1]):
            # имитируем случайную ошибку
            raise Exception()

        requests.post('https://httpbin.org/delay/5')
    except Exception as e:
        logger.error('исключительный случай, повторите попытку через 5 секунд')
        raise self.retry(exc=e, countdown=5) 

Примечания:

  1. Поскольку мы установили bind в True, это связанная задача, поэтому первым аргументом задачи всегда будет текущий экземпляр задачи (self). Поэтому мы можем вызвать self.retry для повторного выполнения неудачной задачи.
  2. Пожалуйста, не забудьте вызвать исключение, возвращаемое методом self.retry, чтобы это сработало.
  3. Если установить аргумент обратного countdown отсчета равным 5, задача повторит попытку после 5-секундной задержки.

Выполним приведенный ниже код в оболочке Python:

>>> from polls.tasks import task_process_notification
>>> task_process_notification.delay()

Вы должны увидеть в выводе терминала Celery worker примерно следующее:

Task polls.tasks.task_process_notification[06e1f985-90d4-4453-9870-fab57c5885c4] retry: Повторная попытка через 5 с: Exception()
Task polls.tasks.task_process_notification[06e1f985-90d4-4453-9870-fab57c5885c4] retry: Повторная попытка через 5 с: Exception()
Task polls.tasks.task_process_notification[06e1f985-90d4-4453-9870-fab57c5885c4] succeeded in 3.3638455480104312s: None

Как вы можете видеть, задача Celery дважды потерпела неудачу, а в третий раз завершилась успешно.

Решение 2: Декоратор повторного выполнения задачи

В Celery 4.0 добавлена встроенная поддержка повторных попыток, так что вы можете позволить исключению всплыть и указать в декораторе, как его обработать:

@shared_task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 7, 'countdown': 5})
def task_process_notification(self):
    if not random.choice([0, 1]):
        # имитируем случайную ошибку
        raise Exception()

    requests.post('https://httpbin.org/delay/5') 

Примечания:

  1. autoretry_for принимает список/кортеж типов исключений, для которых вы хотите выполнять повторные попытки.
  2. retry_kwargs принимает словарь дополнительных опций для указания того, как будут выполняться автоповторы. В приведенном выше примере задача будет повторять попытку после 5-секундной задержки (через countdown) и допускает максимум 7 попыток (через max_retries). Celery will stop retrying after 7 failed attempts and raise an exception.

Exponential Backoff

Если вашей задаче Celery необходимо отправить запрос в сторонний сервис, то хорошей идеей будет использовать экспоненциальный откат, чтобы не перегружать сервис.

Celery поддерживает это по умолчанию:

@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=True, retry_kwargs={'max_retries': 5})
def task_process_notification(self):
    if not random.choice([0, 1]):
        # имитируем случайную ошибку
        raise Exception()

    requests.post('https://httpbin.org/delay/5') 

В данном примере первая повторная попытка должна выполняться через 1 с, следующая - через 2 с, третья - через 4 с, четвертая - через 8 с и так далее:

[02:09:59,014: INFO/ForkPoolWorker-8] Task polls.tasks.task_process_notification[fbe041b6-e6c1-453d-9cc9-cb99236df6ff] retry: Retry in 1s: Exception()
[02:10:00,210: INFO/ForkPoolWorker-2] Task polls.tasks.task_process_notification[fbe041b6-e6c1-453d-9cc9-cb99236df6ff] retry: Повторная попытка через 2 с: Exception()
[02:10:02,291: INFO/ForkPoolWorker-4] Task polls.tasks.task_process_notification[fbe041b6-e6c1-453d-9cc9-cb99236df6ff] retry: Повторная попытка через 4 с: Exception() 

Вы также можете установить retry_backoff в число для использования в качестве коэффициента задержки:

@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=5, retry_kwargs={'max_retries': 5})
def task_process_notification(self):
    if not random.choice([0, 1]):
        # имитируем случайную ошибку
        raise Exception()

    requests.post('https://httpbin.org/delay/5') 

Пример:

[02:21:45,887: INFO/ForkPoolWorker-8] Task polls.tasks.task_process_notification[6a0b2682-74f5-410b-af1e-352069238f3d] retry: Повторная попытка через 5 с: Exception()
[02:21:55,170: INFO/ForkPoolWorker-2] Task polls.tasks.task_process_notification[6a0b2682-74f5-410b-af1e-352069238f3d] retry: Повторная попытка через 10 секунд: Exception()
[02:22:15,706: INFO/ForkPoolWorker-4] Task polls.tasks.task_process_notification[6a0b2682-74f5-410b-af1e-352069238f3d] retry: Повторная попытка через 20 секунд: Exception()
[02:22:55,450: INFO/ForkPoolWorker-6] Task polls.tasks.task_process_notification[6a0b2682-74f5-410b-af1e-352069238f3d] retry: Повторная попытка через 40 секунд: Exception() 

По умолчанию экспоненциальный откат также вносит случайный джиттер, чтобы избежать одновременного выполнения всех задач.

Случайность

Когда вы создаете пользовательскую стратегию повторных попыток для задачи Celery (которая должна отправить запрос другому сервису), вам следует добавить некоторую случайность в расчет задержки, чтобы предотвратить одновременное выполнение всех задач, что приведет к грохочущему стаду.

Celery has you covered here as well with retry_jitter:

@shared_task(bind=True, autoretry_for=(Exception,), retry_backoff=5, retry_jitter=True, retry_kwargs={'max_retries': 5})
def task_process_notification(self):
    if not random.choice([0, 1]):
        # имитируем случайную ошибку
        raise Exception()

    requests.post('https://httpbin.org/delay/5') 

По умолчанию эта опция установлена в True, что помогает предотвратить проблему громогласного стада при использовании встроенной в Celery функции retry_backoff.

Базовый класс задачи

Если вы обнаружите, что в декораторах задач Celery вы пишете одни и те же аргументы повтора, вы можете (начиная с Celery 4.4) определить аргументы повтора в базовом классе, который затем можно использовать в качестве базового класса в задачах Celery:

class BaseTaskWithRetry(celery.Task):
    autoretry_for = (Exception, KeyError)
    retry_kwargs = {'max_retries': 5}
    retry_backoff = True


@shared_task(bind=True, base=BaseTaskWithRetry)
def task_process_notification(self):
    raise Exception() 

Так что если вы запустите задачу в оболочке Python, вы увидите следующее:

[03:12:29,002: INFO/ForkPoolWorker-8] Task polls.tasks.task_process_notification[3231ef9b-00c7-4ab1-bf0b-2fdea6fa8348] retry: Retry in 1s: Exception()
[03:12:30,445: INFO/ForkPoolWorker-8] Task polls.tasks.task_process_notification[3231ef9b-00c7-4ab1-bf0b-2fdea6fa8348] retry: Повторная попытка через 2 с: Exception()
[03:12:33,080: INFO/ForkPoolWorker-8] Task polls.tasks.task_process_notification[3231ef9b-00c7-4ab1-bf0b-2fdea6fa8348] retry: Повторная попытка через 3 с: Exception()
 

Заключение

В этом руководстве по Celery мы рассмотрели, как автоматически повторять неудачные задачи celery.

Кроме того, исходный код этого руководства можно найти на GitHub.

Серия Django + Celery:

  1. Асинхронные задачи с Django и Celery
  2. Обработка периодических задач в Django с помощью Celery и Docker
  3. Автоматический повтор невыполненных задач Celery (эта статья)
  4. Работа с транзакциями базы данных Celery и Django
Вернуться на верх