Как использовать мультипроцессинг с Django для создания тысяч объектов из JSON?
У меня есть конечная точка API на Django REST, которая принимает файл .json и выполняет некоторые вычисления над данными для создания некоторых "профилей", затем выполняет некоторые проверки на ошибки и валидацию, и, наконец, возвращает ответ fail, если что-то пошло не так.
Этот json-файл может быть очень большим, что замедляет процесс. Я не могу использовать bulk_create от Django, потому что модель является наследуемой моделью и bulk create не работает в этом случае.
Итак, я пытаюсь использовать мультипроцессинг Python (поправьте меня, если это не лучший подход) для создания экземпляров модели и последующего сохранения в партиях. Это происходит следующим образом:
- Viewset:
create()вызывает функцию utils для перебора объектов в json-файле .
- Для каждого объекта другая функция
actually_create, в которой инстанцируется модель - Эти объекты сохраняются в списке (batch)
- Партии сохраняются внутри блока транзакций
Хотя я использовал тот же подход для других случаев использования, я не могу заставить его работать, и я не знаю почему, потому что обратная трассировка не помогает, и я не могу установить точки останова при многопроцессорной обработке.
Traceback:
Traceback (most recent call last):
File "/home/everton/.virtualenvs/venv/lib/python3.8/site-packages/django/db/backends/base/base.py", line 235, in _cursor
return self._prepare_cursor(self.create_cursor(name))
File "/home/everton/.virtualenvs/venv/lib/python3.8/site-packages/django/db/backends/postgresql/base.py", line 223, in create_cursor
cursor = self.connection.cursor()
psycopg2.InterfaceError: connection already closed
Может ли Celery быть альтернативой для этого? У меня нет опыта работы с ним, но я читал некоторые случаи использования, которые могут быть похожи на этот.
views.py
from rest_framework import viewsets
from rest_framework.response import Response
from .utils import create_profiles
class CustomViewSet(viewsets.ViewSet):
def create(self, request, *args, **kwargs):
#...
try:
profiles = create_profiles(data)
serializer = self.serializer_class(profiles, many=True)
except Exception as e:
resp = Response(str(e))
resp.status_code = status.HTTP_400_BAD_REQUEST
return resp
return Response(serializer.data, status=status.HTTP_200_OK)
utils.py
from multiprocessing import cpu_count, Pool
from django.db import transaction, connection
def save_batch(batch):
with transaction.atomic():
for obj in batch:
obj.save()
return batch
def worker_init():
connection.close()
def create_profiles(data):
create_batches = []
default_batch_size = 1000
with Pool(processes=cpu_count(), initializer=worker_init) as pool:
profiles_to_create = []
for profile in pool.imap_unordered(actually_create, data):
profiles_to_create.append(profile)
if len(profiles_to_create) == default_batch_size:
create_batches.append(profiles_to_create)
profiles_to_create = []
if len(profiles_to_create) > 0:
create_batches.append(profiles_to_create)
return profiles_to_create
def actually_create(profile):
#...
object = MyModel(**profile)
return object