Как дросселировать задачи redis/celery для веб-сайта django?
У меня есть сайт на django v3.0.2 (python v3.6.9), куда я загружаю изображение, обрабатываю его, а затем отображаю на сайте. Обрабатывая изображение, я создаю разные размеры одного изображения, нахожу и идентифицирую лица, оцифровываю любой текст и перевожу его при необходимости, использую несколько различных алгоритмов для расчета сходства каждого изображения с другими, и другие алгоритмы обработки изображений. Я использую celery v4.3.0 и redis v4.0.9 для выполнения всех вычислений в фоновом режиме в различных задачах celery.
Проблема, с которой я сталкиваюсь, заключается в том, что когда я пытаюсь загрузить и обработать более ~4 изображений в цикле, у меня заканчивается память, и вычисления начинают вызывать исключения. Под исчерпанием памяти подразумевается, что мои 5 ГБ подкачки и 16 ГБ оперативной памяти полностью израсходованы, и chrome даже падает. В своей базе данных я веду журнал выполнения каждой задачи, успешно ли она завершается или нет. Я вижу много ошибок и много задач, которые не были начаты.
Если я помещу time.sleep(10)
в цикл, который загружает изображения, и сразу после запуска celery workers для каждого изображения, я могу загрузить 60+ изображений в одном цикле без каких-либо исключений, сбоев или ошибок в журнале задач.
Я искал в гугле способ дросселирования рабочих в celery, чтобы не использовать sleep
, и обнаружил, что, возможно, у celery нет такой возможности Celery rate limiting with multiple workers на форуме celery issues. Я нашел эту статью, которая может помочь Celery throttling - установка ограничения скорости для очередей.
Прежде чем я начну строить что-то самостоятельно, как в приведенной выше ссылке, я решил спросить у коллективного сознания, есть ли лучший (правильный?) способ дросселирования рабочих celery, чем time.sleep(10)
, чтобы у меня не кончалась память и не возникало много ошибок задач.
Здесь слишком много кода, чтобы разместить его здесь, поэтому вот обзор того, что я делаю.
admin.py
def save_model(self, request, obj, form, change):
if form.is_valid():
if not change:
files = request.FILES.getlist('storage_file_name')
for f in files:
obj = Document() # model for each image
# add some data to the model
obj.save()
time.sleep(10)
doc_admin_workflow(<some parameters>)
else:
# some different housekeeping on the model fields
doc_admin_workflow(<some different parameters>)
super().save_model(request, obj, form, change)
def doc_admin_workflow(<some parameters>):
if not change:
# Decide which of the image processing steps to perform.
# Each processing step is a celery task of the form task_name.si(params)
# in a list called 'jobs' ~ 3-5 tasks
transaction.on_commit(lambda: chain(group(jobs),
change_state_task.si(document_id, 'new')).delay())
time.sleep(10)
else:
# decide what other processing steps (i.e. celery tasks) to perform
transaction.on_commit(lambda: chain(step_1, step_2, faces_2, ocr_job,
change_state_task.si(document_id, 'ready')).delay())
time.sleep(10)
Все задачи celery, использованные выше, являются самодостаточными, поскольку они не передают данные следующей задаче и никак не взаимодействуют друг с другом. Они просто вычисляют и читают/записывают базу данных.
Кроме того, эти загрузки выполняются администратором сайта. Пользователи, взаимодействующие с сайтом, не загружают изображения на сайт и не используют задачи celery по обработке изображений. Итак, time.sleep(10)
- это возможное решение, но оно кажется огромным и не очень надежным.
Спасибо!
Пожалуйста, посмотрите ответ на это связанное сообщение: celery redis tasks don't always complete
Удаление всех аккордов и групп из моих задач и использование только цепочек, похоже, решило проблему нехватки памяти. Теперь нет необходимости дросселировать задачи celery или помещать какие-либо time.sleep()
утверждения в поток.