How can I safely use multiprocessing in a Django app?
I’ve read the docs suggesting that multiprocessing may cause unintended side effects in Django apps or on Windows, especially those connected to multiple databases. Specifically, I'm using a function, load_to_table
, to create multiple CSV files from a DataFrame and then load the data into a PostgreSQL table using multiprocessing. This function is deeply integrated within my Django app and is not a standalone script.
I am concerned about potential long-term implications if this code is used in production. Additionally, if __name__ == '__main__':
does not seem to work within the deep files/functions of Django. This is because Django's management commands are executed in a different context where __name__
is not set to "__main__"
, which prevents this block from being executed as expected. Moreover, multiprocessing guidelines recommend using if __name__ == '__main__':
to safely initialize multiprocessing tasks, as it ensures that code is not accidentally executed multiple times, especially on platforms like Windows where the module-level code is re-imported in child processes.
Here is the code I am using:
import os
import glob
import shutil
from multiprocessing import Pool, cpu_count
from functools import partial
def copy_to_table(connection, file_name: str, table_name: str, columns: list):
cursor = connection.cursor()
with open(file_name, "r") as f:
cursor.copy_from(f, table_name, sep=",", columns=columns, null="")
connection.commit()
connection.close()
return file_name
def load_to_table(connection, dataframe, table_name: str) -> dict:
filepath = os.path.join("uploaded_files", table_name)
os.makedirs(filepath, exist_ok=True)
rows_per_partition = 100000
total_rows = dataframe.count()
num_partitions = (total_rows // rows_per_partition) + (1 if total_rows % rows_per_partition > 0 else 0)
dataframe.repartition(num_partitions).write.mode("overwrite").format("csv").option("header", "false").save(filepath)
file_path_list = sorted(glob.glob(f"{filepath}/*.csv"))
with Pool(cpu_count()) as p:
p.map(partial(copy_to_table, connection=connection, table_name=table_name, columns=dataframe.columns), file_path_list)
shutil.rmtree(filepath)
# views.py file
load_to_table(connection, dataframe, 'source_table')
The function above does not work with the VS Code debugger, most likely due to debugpy
, which interferes with Django's multiprocessing. However, it works with runserver
. When I run the Django app with the VS Code debugger, I encounter the following error while executing the function. It seems to be running in loops.
File "/usr/lib/python3.11/multiprocessing/process.py", line 121, in start
self._popen = self._Popen(self)
^^^^^^^^^^^^^^^^^
File "/home/rhythmflow/Desktop/Reconciliation/reconciliation-backend-v3/portal/operations/load_data/methods.py", line 225, in load_to_table
with Pool(cpu_count()) as p:
^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/multiprocessing/context.py", line 281, in _Popen
return Popen(process_obj)
^^^^^^^^^^^^^^^^^^
File "/home/rhythmflow/Desktop/Reconciliation/reconciliation-backend-v3/portal/operations/load_data/load_data.py", line 71, in start
load_to_table(df_ops, self.source_tmp_details)
File "/usr/lib/python3.11/multiprocessing/context.py", line 119, in Pool
return Pool(processes, initializer, initargs, maxtasksperchild,
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/rhythmflow/.vscode/extensions/ms-python.debugpy-2024.10.0-linux-x64/bundled/libs/debugpy/_vendored/pydevd/pydevd.py", line 838, in wait_for_ready_to_run
self._py_db_command_thread_event.wait(0.1)
File "/usr/lib/python3.11/threading.py", line 629, in wait
signaled = self._cond.wait(timeout)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/lib/python3.11/threading.py", line 331, in wait
gotit = waiter.acquire(True, timeout)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/rhythmflow/Desktop/Reconciliation/reconciliation-backend-v3/.venv/lib/python3.11/site-packages/django/utils/autoreload.py", line 664, in <lambda>
signal.signal(signal.SIGTERM, lambda *args: sys.exit(0))
SystemExit: 0
[22/Aug/2024 15:04:30] "POST /start-process/ HTTP/1.1" 500 59
[22/Aug/2024 15:04:35,063] - Broken pipe from ('127.0.0.1', 51102)
What could be causing this issue, and how can I address it while using the VS Code debugger?
Using pure multi-processing with Django will cause its internal server to go up in the forked/spawned process and try to bind itself with the same socket resources the real worker is using for listening to HTTP incomming requests.
Sounds messy enough. So unless there is a django-docs offical page telling a way to safe use multiprocessing with Django with the whole process, I wouldn't go via that route.
However, there are other routes to make tight coupling processes, using the same codebase as your web-facing workers, to run concurrently - just change the entry point so that django doesn't try to serve HTTP - I beleive the django admin scripts - a way to write functions in the code base which are usually called from the CLI, is a good way to be able to connect to the DB and other resources, use the Model classes and so on, and doing so with other DB connections created by Django itself in another process, knowing they are in other process.
You should them, instead of using multiprocessing
, which will implicitly either reload or clone your Python app as it is (including listening to HTTP), to use subprocess.Popen
to call whatever you want to perform off-process as an admin script. It should be straightforward to call it as you would do from the CLI, passing any ID.s for model instances it should process as command line parameters. Communication is a bit more complex (though you could create a special model on the DB and exchange messages between the Web facing process and worker processes using instances of this model)
Otherwise, I had used Celery in the past to have off-process workers using the same django code base. That could be easier to use once you set-it up - the workers have to be started separately as part of the celery config - but otherwise, calling remote functions in these workers becomes a breeze. Look for docs on how to use celery, and how to use django with Celery
(oh, Django is now explicitly supported out of the box in Celery: https://docs.celeryq.dev/en/stable/django/first-steps-with-django.html#django-first-steps)