Как использовать мультипроцессинг с Django для создания тысяч объектов из JSON?

У меня есть конечная точка API на Django REST, которая принимает файл .json и выполняет некоторые вычисления над данными для создания некоторых "профилей", затем выполняет некоторые проверки на ошибки и валидацию, и, наконец, возвращает ответ fail, если что-то пошло не так. Этот json-файл может быть очень большим, что замедляет процесс. Я не могу использовать bulk_create от Django, потому что модель является наследуемой моделью и bulk create не работает в этом случае.

Итак, я пытаюсь использовать мультипроцессинг Python (поправьте меня, если это не лучший подход) для создания экземпляров модели и последующего сохранения в партиях. Это происходит следующим образом:

  • Viewset: create() вызывает функцию utils для перебора объектов в json-файле
  • .
  • Для каждого объекта другая функция actually_create, в которой инстанцируется модель
  • Эти объекты сохраняются в списке (batch)
  • Партии сохраняются внутри блока транзакций

Хотя я использовал тот же подход для других случаев использования, я не могу заставить его работать, и я не знаю почему, потому что обратная трассировка не помогает, и я не могу установить точки останова при многопроцессорной обработке.

Traceback:

Traceback (most recent call last):
  File "/home/everton/.virtualenvs/venv/lib/python3.8/site-packages/django/db/backends/base/base.py", line 235, in _cursor
    return self._prepare_cursor(self.create_cursor(name))
  File "/home/everton/.virtualenvs/venv/lib/python3.8/site-packages/django/db/backends/postgresql/base.py", line 223, in create_cursor
    cursor = self.connection.cursor()
psycopg2.InterfaceError: connection already closed

Может ли Celery быть альтернативой для этого? У меня нет опыта работы с ним, но я читал некоторые случаи использования, которые могут быть похожи на этот.

views.py

from rest_framework import viewsets
from rest_framework.response import Response

from .utils import create_profiles


class CustomViewSet(viewsets.ViewSet):
    def create(self, request, *args, **kwargs):
        #...
        try:
            profiles = create_profiles(data)
            serializer = self.serializer_class(profiles, many=True)
        except Exception as e:
            resp = Response(str(e))
            resp.status_code = status.HTTP_400_BAD_REQUEST
            return resp

        return Response(serializer.data, status=status.HTTP_200_OK)

utils.py

from multiprocessing import cpu_count, Pool
from django.db import transaction, connection

def save_batch(batch):
    with transaction.atomic():
        for obj in batch:
            obj.save()
    return batch

def worker_init():
    connection.close()

def create_profiles(data):
    create_batches = []
    default_batch_size = 1000

    with Pool(processes=cpu_count(), initializer=worker_init) as pool:
        profiles_to_create = []
        for profile in pool.imap_unordered(actually_create, data):
            profiles_to_create.append(profile)
       
            if len(profiles_to_create) == default_batch_size:
                create_batches.append(profiles_to_create)
                profiles_to_create = []
       
        if len(profiles_to_create) > 0:
            create_batches.append(profiles_to_create)
    
    return profiles_to_create

def actually_create(profile):
    #...
    object = MyModel(**profile)
    return object
Вернуться на верх