Как безопасно использовать многопроцессорность в приложении Django?
Я прочитал документы, в которых говорится о том, что многопроцессорная обработка может вызвать нежелательные побочные эффекты в приложениях Django или на Windows, особенно в тех, которые подключены к нескольким базам данных. В частности, я использую функцию load_to_table
для создания нескольких CSV-файлов из DataFrame и последующей загрузки данных в таблицу PostgreSQL с использованием мультипроцессинга. Эта функция глубоко интегрирована в мое приложение Django и не является отдельным скриптом.
Меня беспокоят потенциальные долгосрочные последствия, если этот код будет использоваться в производстве. Кроме того, if __name__ == '__main__':
, похоже, не работает в глубинных файлах/функциях Django. Это происходит потому, что команды управления Django выполняются в другом контексте, где __name__
не установлен в "__main__"
, что не позволяет этому блоку выполняться так, как ожидалось. Более того, руководство по многопроцессорности рекомендует использовать if __name__ == '__main__':
для безопасной инициализации многопроцессорных задач, так как это гарантирует, что код не будет случайно выполнен несколько раз, особенно на таких платформах, как Windows, где код на уровне модуля повторно импортируется в дочерние процессы.
Вот код, который я использую:
import os
import glob
import shutil
from multiprocessing import Pool, cpu_count
from functools import partial
def copy_to_table(connection, file_name: str, table_name: str, columns: list):
cursor = connection.cursor()
with open(file_name, "r") as f:
cursor.copy_from(f, table_name, sep=",", columns=columns, null="")
connection.commit()
connection.close()
return file_name
def load_to_table(connection, dataframe, table_name: str) -> dict:
filepath = os.path.join("uploaded_files", table_name)
os.makedirs(filepath, exist_ok=True)
rows_per_partition = 100000
total_rows = dataframe.count()
num_partitions = (total_rows // rows_per_partition) + (1 if total_rows % rows_per_partition > 0 else 0)
dataframe.repartition(num_partitions).write.mode("overwrite").format("csv").option("header", "false").save(filepath)
file_path_list = sorted(glob.glob(f"{filepath}/*.csv"))
with Pool(cpu_count()) as p:
p.map(partial(copy_to_table, connection=connection, table_name=table_name, columns=dataframe.columns), file_path_list)
shutil.rmtree(filepath)
# views.py file
load_to_table(connection, dataframe, 'source_table')
Приведенная выше функция не работает с отладчиком VS Code, скорее всего, из-за debugpy
, который мешает многопроцессорности Django. Однако она работает с runserver
. Когда я запускаю приложение Django с помощью отладчика VS Code, я сталкиваюсь со следующей ошибкой при выполнении функции. Кажется, что она выполняется в цикле.
File "/usr/lib/python3.11/multiprocessing/process.py", line 121, in start
self._popen = self._Popen(self)
^^^^^^^^^^^^^^^^^
File "/home/rhythmflow/Desktop/Reconciliation/reconciliation-backend-v3/portal/operations/load_data/methods.py", line 225, in load_to_table
with Pool(cpu_count()) as p:
^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/multiprocessing/context.py", line 281, in _Popen
return Popen(process_obj)
^^^^^^^^^^^^^^^^^^
File "/home/rhythmflow/Desktop/Reconciliation/reconciliation-backend-v3/portal/operations/load_data/load_data.py", line 71, in start
load_to_table(df_ops, self.source_tmp_details)
File "/usr/lib/python3.11/multiprocessing/context.py", line 119, in Pool
return Pool(processes, initializer, initargs, maxtasksperchild,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/rhythmflow/.vscode/extensions/ms-python.debugpy-2024.10.0-linux-x64/bundled/libs/debugpy/_vendored/pydevd/pydevd.py", line 838, in wait_for_ready_to_run
self._py_db_command_thread_event.wait(0.1)
File "/usr/lib/python3.11/threading.py", line 629, in wait
signaled = self._cond.wait(timeout)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/threading.py", line 331, in wait
gotit = waiter.acquire(True, timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/rhythmflow/Desktop/Reconciliation/reconciliation-backend-v3/.venv/lib/python3.11/site-packages/django/utils/autoreload.py", line 664, in <lambda>
signal.signal(signal.SIGTERM, lambda *args: sys.exit(0))
SystemExit: 0
[22/Aug/2024 15:04:30] "POST /start-process/ HTTP/1.1" 500 59
[22/Aug/2024 15:04:35,063] - Broken pipe from ('127.0.0.1', 51102)
Что может вызвать эту проблему, и как я могу ее решить, используя отладчик VS Code?
Использование чистой многопроцессорности в Django приведет к тому, что его внутренний сервер поднимется в вилочный/спаунизированный процесс и попытается связать себя с теми же ресурсами сокетов, которые реальный рабочий использует для прослушивания входящих HTTP-запросов.
Звучит достаточно грязно. Так что если нет официальной страницы django-docs, рассказывающей о том, как безопасно использовать многопроцессорность в Django, я бы не стал идти этим путем.
Однако есть другие способы заставить процессы с тесной связью, использующие ту же кодовую базу, что и ваши веб-рабочие, работать параллельно - просто измените точку входа так, чтобы django не пытался обслуживать HTTP. Я считаю, что django admin scripts - способ написания функций в кодовой базе, которые обычно вызываются из CLI, является хорошим способом, чтобы иметь возможность подключаться к БД и другим ресурсам, использовать классы Model и так далее, и делать это с другими подключениями к БД, созданными самим Django в другом процессе, зная, что они находятся в другом процессе.
Вместо использования multiprocessing
, которое неявно либо перезагрузит, либо клонирует ваше приложение Python как оно есть (включая прослушивание HTTP), вам следует использовать subprocess.Popen
для вызова того, что вы хотите выполнить вне процесса в качестве скрипта администратора. Вызывать его следует так же, как и CLI, передавая в качестве параметров командной строки любые ID экземпляров моделей, которые он должен обработать. Коммуникация немного сложнее (хотя вы можете создать специальную модель в БД и обмениваться сообщениями между процессом, обращенным к Web, и рабочими процессами, используя экземпляры этой модели)
В прошлом я использовал Celery, чтобы иметь внепроцессные рабочие, использующие одну и ту же кодовую базу django. Это может быть проще в использовании после настройки - рабочие должны быть запущены отдельно как часть конфигурации celery - но в остальном вызов удаленных функций в этих рабочих становится простым делом. Ищите документацию о том, как использовать celery, и как использовать django с Celery
(о, теперь Django явно поддерживается в Celery из коробки: https://docs.celeryq.dev/en/stable/django/first-steps-with-django.html#django-first-steps)