Как дросселировать задачи redis/celery для веб-сайта django?

У меня есть сайт на django v3.0.2 (python v3.6.9), куда я загружаю изображение, обрабатываю его, а затем отображаю на сайте. Обрабатывая изображение, я создаю разные размеры одного изображения, нахожу и идентифицирую лица, оцифровываю любой текст и перевожу его при необходимости, использую несколько различных алгоритмов для расчета сходства каждого изображения с другими, и другие алгоритмы обработки изображений. Я использую celery v4.3.0 и redis v4.0.9 для выполнения всех вычислений в фоновом режиме в различных задачах celery.

Проблема, с которой я сталкиваюсь, заключается в том, что когда я пытаюсь загрузить и обработать более ~4 изображений в цикле, у меня заканчивается память, и вычисления начинают вызывать исключения. Под исчерпанием памяти подразумевается, что мои 5 ГБ подкачки и 16 ГБ оперативной памяти полностью израсходованы, и chrome даже падает. В своей базе данных я веду журнал выполнения каждой задачи, успешно ли она завершается или нет. Я вижу много ошибок и много задач, которые не были начаты.

Если я помещу time.sleep(10) в цикл, который загружает изображения, и сразу после запуска celery workers для каждого изображения, я могу загрузить 60+ изображений в одном цикле без каких-либо исключений, сбоев или ошибок в журнале задач.

Я искал в гугле способ дросселирования рабочих в celery, чтобы не использовать sleep, и обнаружил, что, возможно, у celery нет такой возможности Celery rate limiting with multiple workers на форуме celery issues. Я нашел эту статью, которая может помочь Celery throttling - установка ограничения скорости для очередей.

Прежде чем я начну строить что-то самостоятельно, как в приведенной выше ссылке, я решил спросить у коллективного сознания, есть ли лучший (правильный?) способ дросселирования рабочих celery, чем time.sleep(10), чтобы у меня не кончалась память и не возникало много ошибок задач.

Здесь слишком много кода, чтобы разместить его здесь, поэтому вот обзор того, что я делаю.

admin.py

def save_model(self, request, obj, form, change):
    if form.is_valid():
        if not change:
            files = request.FILES.getlist('storage_file_name')
            for f in files:
                obj = Document()       # model for each image
                # add some data to the model
                obj.save()
                time.sleep(10)
                doc_admin_workflow(<some parameters>)
        else:
            # some different housekeeping on the model fields
            doc_admin_workflow(<some different parameters>)
            super().save_model(request, obj, form, change)

def doc_admin_workflow(<some parameters>):
    if not change:
        # Decide which of the image processing steps to perform. 
        # Each processing step is a celery task of the form task_name.si(params)
        # in a list called 'jobs' ~ 3-5 tasks
        transaction.on_commit(lambda: chain(group(jobs), 
                              change_state_task.si(document_id, 'new')).delay()) 
        time.sleep(10) 
    else:
        # decide what other processing steps (i.e. celery tasks) to perform
        transaction.on_commit(lambda: chain(step_1, step_2, faces_2, ocr_job, 
                              change_state_task.si(document_id, 'ready')).delay())
        time.sleep(10)

Все задачи celery, использованные выше, являются самодостаточными, поскольку они не передают данные следующей задаче и никак не взаимодействуют друг с другом. Они просто вычисляют и читают/записывают базу данных.

Кроме того, эти загрузки выполняются администратором сайта. Пользователи, взаимодействующие с сайтом, не загружают изображения на сайт и не используют задачи celery по обработке изображений. Итак, time.sleep(10) - это возможное решение, но оно кажется огромным и не очень надежным.

Спасибо!

Пожалуйста, посмотрите ответ на это связанное сообщение: celery redis tasks don't always complete

Удаление всех аккордов и групп из моих задач и использование только цепочек, похоже, решило проблему нехватки памяти. Теперь нет необходимости дросселировать задачи celery или помещать какие-либо time.sleep() утверждения в поток.

Вернуться на верх