Celery, Redis: умножение задач и их невыполнение
Сельдерей съедает оперативную память.
Мы используем Celery в Django REST с Redis в качестве брокера. Celery используется для отправки и повторной отправки обратных вызовов в случае неудачи (политика повторной отправки заключалась в попытке отправки обратных вызовов с экспоненциально растущим таймаутом между попытками, уже удалена).
Однако примерно каждые 1м 40с использование оперативной памяти увеличивается на 48мб, а в логах на несколько секунд появляется спам об этом:
celery_worker-1 | [2024-06-06 19:17:32,442: INFO/MainProcess] Task core.requests.celery_tasks.send_callback_task[6135c471-7f7e-471c-9f05-ba126418c002] received
celery_worker-1 | [2024-06-06 19:17:32,444: INFO/MainProcess] Task core.requests.celery_tasks.send_callback_task[6135c471-7f7e-471c-9f05-ba126418c002] received
celery_worker-1 | [2024-06-06 19:17:32,445: INFO/MainProcess] Task core.requests.celery_tasks.send_callback_task[6135c471-7f7e-471c-9f05-ba126418c002] received
... more 14 times
celery_worker-1 | [2024-06-06 19:17:32,468: INFO/MainProcess] Task core.requests.celery_tasks.send_callback_task[f4aaa14f-8bda-442a-8856-af40f1d68e6d] received
celery_worker-1 | [2024-06-06 19:17:32,469: INFO/MainProcess] Task core.requests.celery_tasks.send_callback_task[f4aaa14f-8bda-442a-8856-af40f1d68e6d] received
celery_worker-1 | [2024-06-06 19:17:32,471: INFO/MainProcess] Task core.requests.celery_tasks.send_callback_task[f4aaa14f-8bda-442a-8856-af40f1d68e6d] received
celery_worker-1 | [2024-06-06 19:17:32,472: INFO/MainProcess] Task core.requests.celery_tasks.send_callback_task[f4aaa14f-8bda-442a-8856-af40f1d68e6d] received
... more 55 times and many more with different IDs
Через некоторое время мы даже получаем это в логах:
celery_worker-1 | [2024-06-07 17:32:56,200: INFO/MainProcess] Task core.requests.celery_tasks.send_callback_task[bb4b48c5-92d1-4226-aadf-bfdc387d4baf] received
celery_worker-1 | [2024-06-07 17:32:56,200: WARNING/MainProcess] QoS: Disabled: prefetch_count exceeds 65535
Новые задания выполняются мгновенно с такими логами (без предупреждений о prefetch_count
):
celery_worker-1 | [2024-06-07 16:31:28,595: INFO/MainProcess] Task core.requests.celery_tasks.send_callback_task[7cdda9a2-ad10-434e-82c2-5232e64dc3b1] received
celery_worker-1 | [2024-06-07 16:31:28,736: INFO/ForkPoolWorker-72] core.requests.celery_tasks.send_callback_task[7cdda9a2-ad10-434e-82c2-5232e64dc3b1]: {'msg': 'Callback sent!'}
Так что я предполагаю, что эти задачи не выполняются, потому что в журналах с этими идентификаторами задач нет сообщений "отправлено" и "не выполнено".
После перезагрузки сервера в логах (22 - это на тестовом сервере, может быть гораздо больше):
[2024-06-07 14:36:56,163: WARNING/MainProcess] Restoring 22 unacknowledged message(s)
[2024-06-07 14:41:37,392: INFO/MainProcess] Task core.requests.celery_tasks.send_callback_task[6b56c3b0-0828-46d6-86aa-34d0747ac30b] received
[2024-06-07 14:41:37,392: DEBUG/MainProcess] basic.qos: prefetch_count->9
[2024-06-07 14:41:37,394: INFO/MainProcess] Task core.requests.celery_tasks.send_callback_task[6b56c3b0-0828-46d6-86aa-34d0747ac30b] received
[2024-06-07 14:41:37,394: DEBUG/MainProcess] basic.qos: prefetch_count->10
...
В попытке решить эту проблему мы попробовали
- удаление повторных попыток для
send_callback_task
- добавление таймаута для POST-запроса
Код:
@shared_task(bind=True)
def send_callback_task(self, url: 'str', data):
response = requests.post(url, json=data, timeout=5)
log_msg = {
"msg": "Callback sent!",
}
logger.info(msg=log_msg)
if response.status_code not in (200, 201, 202):
log_msg = {
"msg": "Callback failed!"
}
logger.info(msg=log_msg)
raise RequestException
Новые задания запускаются с помощью apply_async
.
Запускаем celery worker следующим образом:
celery -A project worker -l info
16-ядерная машина.
CELERY_WORKER_MAX_TASKS_PER_CHILD = 100
Я готов предоставить больше информации, если это необходимо.
Полагаю, префорк не очень хорошо работает с операциями ввода-вывода (в задаче Celery это был POST-запрос).
Переключился с prefork
на eventlet
, кажется, это решило проблему:
pip install eventlet
celery -A project worker --loglevel=info -P eventlet