Разгрузка обширных вычислений в методе сохранения пользовательского поля FileField в Django

Я создаю веб-приложение для галереи на основе Django (4.1.1) и Vue. Я хочу также загружать и отображать видео (не только изображения). Для поддержки форматов, которые не работают в html-теге видео, я конвертирую эти форматы в mp4 через pyffmpeg.

Для этого я создал пользовательское поле для своей модели на основе FileField. В его методе save я беру содержимое файлов, конвертирую его и сохраняю результат. Это вызывается сериализатором через соответствующий ViewSet. Это работает, но преобразование видео занимает слишком много времени, так что веб-запрос из моего приложения Vue (выполняемый с помощью axios) уходит в таймаут.

Ясно, что мне нужно как-то разгрузить преобразование, вернуть соответствующий ответ напрямую и сохранить данные в базе данных, как только преобразование будет завершено.

Возможно ли это вообще? Или мне нужно написать пользовательское представление помимо ViewSet для выполнения расчета? Можете ли вы подсказать мне, как разгрузить этот расчет? У меня только зачаточные знания о таких вещах, как asyncio.

TL;DR: Как выполнить обширные вычисления асихронно на файловых данных перед сохранением их в модель с помощью FileField и вернуть ответ до окончания вычислений?

При необходимости я могу предоставить свой текущий код.

Сейчас я решил свою проблему, хотя мне все еще интересны другие/лучшие решения. Мое решение работает, но может быть не самым лучшим, и я чувствую, что в некоторых местах оно немного халтурное.

TL;DR: Установил django-q как менеджер очередей задач с бэкендом базы данных redis, подключил его к django и затем вызвал функцию перекодирования видеофайла из моего представления через

taskid = async_task("apps.myapp.services.transcode_video", data)

Это должна быть надежная система для обработки этих задач транскодирования параллельно и без блокирования запроса.


Я нашел этот учебник о Django-Q. Django-Q управляет и выполняет задачи из django. Он работает параллельно с Django и подключается к нему через своего брокера (в данном случае базу данных redis).

Сначала я установил django-q и клиентские модули redis через pip

pip install django-q redis

Затем я создаю базу данных Redis (здесь она работает в контейнере docker на моей машине с официальным образом redis). Как это сделать, во многом зависит от вашей платформы.

Затем настройка Django на использование Django-Q путем добавления конфигурации в settings.py (Обратите внимание, что я отключил таймауты, потому что задачи перекодирования могут занимать довольно много времени. Возможно, в будущем это будет изменено):

Q_CLUSTER = {
    'name': 'django_q_django',
    'workers': 8,
    'recycle': 500,
    'timeout': None,
    'compress': True,
    'save_limit': 250,
    'queue_limit': 500,
    'cpu_affinity': 1,
    'label': 'Django Q',
    'redis': {
        'host': 'redishostname',
        'port': 6379,
        'password': 'mysecureredisdbpassword',
        'db': 0, }
}

и затем активировать Django-Q, добавив его в установленные приложения в settings.py:

INSTALLED_APPS = [
    ...
    'django_q',
]

Затем перенесите определения моделей из Django Q через:

python manage.py migrate

и запустите Django Q через (база данных Redis должна быть запущена в этот момент):

python manage.py qcluster

Это работает в отдельном терминале от типичного

python manage.py runserver

Примечание: Конечно, эти два пункта предназначены только для разработки. В настоящее время я еще не знаю, как развернуть Django Q в production.

Теперь нам нужен файл для наших функций. Как и в учебнике, я добавил файл services.py в свое приложение. Там я просто определил функцию для запуска:

def transcode_video(data):
    # Doing my transcoding stuff here
    return {'entryid': entry.id, 'filename': target_name}

Затем эта функция может быть вызвана внутри кода представления через:

taskid = async_task("apps.myapp.services.transcode_video", data)

Таким образом, я могу предоставить данные функции и получить ID задачи в качестве возвращаемого значения. Возвращаемое значение выполненной функции появится в поле result созданной задачи, так что вы можете даже возвращать данные оттуда.

На этом этапе я столкнулся с проблемой: data содержит объект TemporaryUploadedFile, что привело к ошибке pickle. Похоже, что данные травятся перед передачей в Django Q, что не сработало для этого объекта. Возможно, есть способ преобразовать файл в формат, пригодный для пикелирования, но поскольку мне уже нужен файл в файловой системе для вызова pyffmeg, в представлении я просто записываю данные в файл с помощью

with open(filepath, 'wb') as f:
    for chunk in self.request.data['file'].chunks():
        f.write(chunk)

Обычно в ViewSet я вызываю serializer.save() в конце, но для транскодирования я этого не делаю, так как новый объект сохраняется внутри функции Django Q после транзакции. Там я создаю его так

with open(target_path, 'rb') as f:
    file = UploadedFile(
        file=f,
        name=target_name,
        content_type=data['file_type']+"/"+data['target_ext'],
    )
    entry = AlbumEntry(
        file=file,
        ... other Model fields here)
    entry.save()

Чтобы вернуть определенный Response из набора представлений, даже когда объект еще не создан, мне пришлось переписать метод create() в дополнение к методу perform_create() (где я выполнял всю обработку). Для этого я скопировал код из родительского класса и немного изменил его, чтобы возвращать определенный ответ в зависимости от возвращаемого значения perform_create() (который ранее ничего не возвращал):

def create(self, request, *args, **kwargs):
    serializer = self.get_serializer(data=request.data)
    serializer.is_valid(raise_exception=True)
    taskid = self.perform_create(serializer)
    if taskid:
        return HttpResponse(json.dumps({'taskid': taskid, 'status': 'transcoding'}), status=status.HTTP_201_CREATED)
    headers = self.get_success_headers(serializer.data)
    return Response(serializer.data, status=status.HTTP_201_CREATED, headers=headers)

Таким образом, perform_create() возвращает идентификатор задания для заданий транскодирования и None в противном случае. Соответствующий ответ отправляется сюда.

Последней, но не менее важной проблемой было то, что фронтенд не знал, когда было выполнено транскодирование. Поэтому я создал простое представление для получения задачи по ID:

@api_view(['GET'])
@authentication_classes([authentication.SessionAuthentication])
@permission_classes([permissions.IsAuthenticated])
def get_task(request, task_id):
    task = Task.get_task(task_id)
    if not task:
        return HttpResponse(json.dumps({
            'success': False
        }))
    return HttpResponse(json.dumps({
        'id': task.id,
        'result': task.result,
        ...some more data to return}))

Вы видите, что я возвращаю фиксированный ответ, когда задача не найдена. Это мой обходной путь, поскольку по умолчанию объект Task будет создан только после завершения задачи. Для моей цели нормально просто предположить, что она все еще выполняется. Комментарий в этом github выпуске Django Q предполагает, что для получения актуального объекта Task вам нужно написать свою собственную модель Task и реализовать ее таким образом, чтобы она регулярно проверяла Django Q на статус задачи. Я не хотел этого делать

Я также поместил результат в ответ, чтобы мой фронтенд мог регулярно опрашивать задачу (по ее ID), и когда перекодирование будет завершено, он будет содержать ID созданного объекта Model в базе данных. Когда мой фронтенд увидит это, он загрузит содержимое объекта.

Вернуться на верх