Celery, Redis: умножение задач и их невыполнение

Сельдерей съедает оперативную память.

Мы используем Celery в Django REST с Redis в качестве брокера. Celery используется для отправки и повторной отправки обратных вызовов в случае неудачи (политика повторной отправки заключалась в попытке отправки обратных вызовов с экспоненциально растущим таймаутом между попытками, уже удалена).

Однако примерно каждые 1м 40с использование оперативной памяти увеличивается на 48мб, а в логах на несколько секунд появляется спам об этом:

celery_worker-1  | [2024-06-06 19:17:32,442: INFO/MainProcess] Task core.requests.celery_tasks.send_callback_task[6135c471-7f7e-471c-9f05-ba126418c002] received
celery_worker-1  | [2024-06-06 19:17:32,444: INFO/MainProcess] Task core.requests.celery_tasks.send_callback_task[6135c471-7f7e-471c-9f05-ba126418c002] received
celery_worker-1  | [2024-06-06 19:17:32,445: INFO/MainProcess] Task core.requests.celery_tasks.send_callback_task[6135c471-7f7e-471c-9f05-ba126418c002] received
... more 14 times
celery_worker-1  | [2024-06-06 19:17:32,468: INFO/MainProcess] Task core.requests.celery_tasks.send_callback_task[f4aaa14f-8bda-442a-8856-af40f1d68e6d] received
celery_worker-1  | [2024-06-06 19:17:32,469: INFO/MainProcess] Task core.requests.celery_tasks.send_callback_task[f4aaa14f-8bda-442a-8856-af40f1d68e6d] received
celery_worker-1  | [2024-06-06 19:17:32,471: INFO/MainProcess] Task core.requests.celery_tasks.send_callback_task[f4aaa14f-8bda-442a-8856-af40f1d68e6d] received
celery_worker-1  | [2024-06-06 19:17:32,472: INFO/MainProcess] Task core.requests.celery_tasks.send_callback_task[f4aaa14f-8bda-442a-8856-af40f1d68e6d] received
... more 55 times and many more with different IDs

Через некоторое время мы даже получаем это в логах:

celery_worker-1  | [2024-06-07 17:32:56,200: INFO/MainProcess] Task core.requests.celery_tasks.send_callback_task[bb4b48c5-92d1-4226-aadf-bfdc387d4baf] received
celery_worker-1  | [2024-06-07 17:32:56,200: WARNING/MainProcess] QoS: Disabled: prefetch_count exceeds 65535

Новые задания выполняются мгновенно с такими логами (без предупреждений о prefetch_count):

celery_worker-1  | [2024-06-07 16:31:28,595: INFO/MainProcess] Task core.requests.celery_tasks.send_callback_task[7cdda9a2-ad10-434e-82c2-5232e64dc3b1] received
celery_worker-1  | [2024-06-07 16:31:28,736: INFO/ForkPoolWorker-72] core.requests.celery_tasks.send_callback_task[7cdda9a2-ad10-434e-82c2-5232e64dc3b1]: {'msg': 'Callback sent!'}

Так что я предполагаю, что эти задачи не выполняются, потому что в журналах с этими идентификаторами задач нет сообщений "отправлено" и "не выполнено".

После перезагрузки сервера в логах (22 - это на тестовом сервере, может быть гораздо больше):

[2024-06-07 14:36:56,163: WARNING/MainProcess] Restoring 22 unacknowledged message(s)

[2024-06-07 14:41:37,392: INFO/MainProcess] Task core.requests.celery_tasks.send_callback_task[6b56c3b0-0828-46d6-86aa-34d0747ac30b] received
[2024-06-07 14:41:37,392: DEBUG/MainProcess] basic.qos: prefetch_count->9
[2024-06-07 14:41:37,394: INFO/MainProcess] Task core.requests.celery_tasks.send_callback_task[6b56c3b0-0828-46d6-86aa-34d0747ac30b] received
[2024-06-07 14:41:37,394: DEBUG/MainProcess] basic.qos: prefetch_count->10
...

В попытке решить эту проблему мы попробовали

  • удаление повторных попыток для send_callback_task
  • добавление таймаута для POST-запроса

Код:

@shared_task(bind=True)
def send_callback_task(self, url: 'str', data):
    response = requests.post(url, json=data, timeout=5)
    log_msg = {
        "msg": "Callback sent!",
    }
    logger.info(msg=log_msg)

    if response.status_code not in (200, 201, 202):
        log_msg = {
            "msg": "Callback failed!"
        }
        logger.info(msg=log_msg)
        raise RequestException

Новые задания запускаются с помощью apply_async.

Запускаем celery worker следующим образом:

celery -A project worker -l info

16-ядерная машина.

CELERY_WORKER_MAX_TASKS_PER_CHILD = 100

Я готов предоставить больше информации, если это необходимо.

Полагаю, префорк не очень хорошо работает с операциями ввода-вывода (в задаче Celery это был POST-запрос).

Переключился с prefork на eventlet, кажется, это решило проблему:

pip install eventlet
celery -A project worker --loglevel=info -P eventlet
Вернуться на верх