Как безопасно использовать многопроцессорность в приложении Django?

Я прочитал документы, в которых говорится о том, что многопроцессорная обработка может вызвать нежелательные побочные эффекты в приложениях Django или на Windows, особенно в тех, которые подключены к нескольким базам данных. В частности, я использую функцию load_to_table для создания нескольких CSV-файлов из DataFrame и последующей загрузки данных в таблицу PostgreSQL с использованием мультипроцессинга. Эта функция глубоко интегрирована в мое приложение Django и не является отдельным скриптом.

Меня беспокоят потенциальные долгосрочные последствия, если этот код будет использоваться в производстве. Кроме того, if __name__ == '__main__':, похоже, не работает в глубинных файлах/функциях Django. Это происходит потому, что команды управления Django выполняются в другом контексте, где __name__ не установлен в "__main__", что не позволяет этому блоку выполняться так, как ожидалось. Более того, руководство по многопроцессорности рекомендует использовать if __name__ == '__main__': для безопасной инициализации многопроцессорных задач, так как это гарантирует, что код не будет случайно выполнен несколько раз, особенно на таких платформах, как Windows, где код на уровне модуля повторно импортируется в дочерние процессы.

Вот код, который я использую:

import os
import glob
import shutil
from multiprocessing import Pool, cpu_count
from functools import partial

def copy_to_table(connection, file_name: str, table_name: str, columns: list):
    cursor = connection.cursor()
    with open(file_name, "r") as f:
        cursor.copy_from(f, table_name, sep=",", columns=columns, null="")
    connection.commit()
    connection.close()
    return file_name

def load_to_table(connection, dataframe, table_name: str) -> dict:
    filepath = os.path.join("uploaded_files", table_name)
    os.makedirs(filepath, exist_ok=True)
    
    rows_per_partition = 100000
    total_rows = dataframe.count()
    num_partitions = (total_rows // rows_per_partition) + (1 if total_rows % rows_per_partition > 0 else 0)
    
    dataframe.repartition(num_partitions).write.mode("overwrite").format("csv").option("header", "false").save(filepath)
    
    file_path_list = sorted(glob.glob(f"{filepath}/*.csv"))
    with Pool(cpu_count()) as p:
        p.map(partial(copy_to_table, connection=connection, table_name=table_name, columns=dataframe.columns), file_path_list)
    
    shutil.rmtree(filepath)

# views.py file
load_to_table(connection, dataframe, 'source_table')

Приведенная выше функция не работает с отладчиком VS Code, скорее всего, из-за debugpy, который мешает многопроцессорности Django. Однако она работает с runserver. Когда я запускаю приложение Django с помощью отладчика VS Code, я сталкиваюсь со следующей ошибкой при выполнении функции. Кажется, что она выполняется в цикле.

File "/usr/lib/python3.11/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
                  ^^^^^^^^^^^^^^^^^
  File "/home/rhythmflow/Desktop/Reconciliation/reconciliation-backend-v3/portal/operations/load_data/methods.py", line 225, in load_to_table
    with Pool(cpu_count()) as p:
         ^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/multiprocessing/context.py", line 281, in _Popen
    return Popen(process_obj)
           ^^^^^^^^^^^^^^^^^^
  File "/home/rhythmflow/Desktop/Reconciliation/reconciliation-backend-v3/portal/operations/load_data/load_data.py", line 71, in start
    load_to_table(df_ops, self.source_tmp_details)
  File "/usr/lib/python3.11/multiprocessing/context.py", line 119, in Pool
    return Pool(processes, initializer, initargs, maxtasksperchild,
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

File "/home/rhythmflow/.vscode/extensions/ms-python.debugpy-2024.10.0-linux-x64/bundled/libs/debugpy/_vendored/pydevd/pydevd.py", line 838, in wait_for_ready_to_run
    self._py_db_command_thread_event.wait(0.1)
  File "/usr/lib/python3.11/threading.py", line 629, in wait
    signaled = self._cond.wait(timeout)
               ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/threading.py", line 331, in wait
    gotit = waiter.acquire(True, timeout)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/rhythmflow/Desktop/Reconciliation/reconciliation-backend-v3/.venv/lib/python3.11/site-packages/django/utils/autoreload.py", line 664, in <lambda>
    signal.signal(signal.SIGTERM, lambda *args: sys.exit(0))
SystemExit: 0
[22/Aug/2024 15:04:30] "POST /start-process/ HTTP/1.1" 500 59
[22/Aug/2024 15:04:35,063] - Broken pipe from ('127.0.0.1', 51102)

Что может вызвать эту проблему, и как я могу ее решить, используя отладчик VS Code?

Использование чистой многопроцессорности в Django приведет к тому, что его внутренний сервер поднимется в вилочный/спаунизированный процесс и попытается связать себя с теми же ресурсами сокетов, которые реальный рабочий использует для прослушивания входящих HTTP-запросов.

Звучит достаточно грязно. Так что если нет официальной страницы django-docs, рассказывающей о том, как безопасно использовать многопроцессорность в Django, я бы не стал идти этим путем.

Однако есть другие способы заставить процессы с тесной связью, использующие ту же кодовую базу, что и ваши веб-рабочие, работать параллельно - просто измените точку входа так, чтобы django не пытался обслуживать HTTP. Я считаю, что django admin scripts - способ написания функций в кодовой базе, которые обычно вызываются из CLI, является хорошим способом, чтобы иметь возможность подключаться к БД и другим ресурсам, использовать классы Model и так далее, и делать это с другими подключениями к БД, созданными самим Django в другом процессе, зная, что они находятся в другом процессе.

Вместо использования multiprocessing, которое неявно либо перезагрузит, либо клонирует ваше приложение Python как оно есть (включая прослушивание HTTP), вам следует использовать subprocess.Popen для вызова того, что вы хотите выполнить вне процесса в качестве скрипта администратора. Вызывать его следует так же, как и CLI, передавая в качестве параметров командной строки любые ID экземпляров моделей, которые он должен обработать. Коммуникация немного сложнее (хотя вы можете создать специальную модель в БД и обмениваться сообщениями между процессом, обращенным к Web, и рабочими процессами, используя экземпляры этой модели)

В прошлом я использовал Celery, чтобы иметь внепроцессные рабочие, использующие одну и ту же кодовую базу django. Это может быть проще в использовании после настройки - рабочие должны быть запущены отдельно как часть конфигурации celery - но в остальном вызов удаленных функций в этих рабочих становится простым делом. Ищите документацию о том, как использовать celery, и как использовать django с Celery

(о, теперь Django явно поддерживается в Celery из коробки: https://docs.celeryq.dev/en/stable/django/first-steps-with-django.html#django-first-steps)

Вернуться на верх