Закрытие соединений django db после отключения ThreadPoolExecutor
Я хочу использовать ThreadPoolExecutor в django и повторно использовать соединение с БД в потоке, чтобы не создавать соединение с БД для каждой подзадачи. но соединения с БД не закрываются после выключения ThreadPoolExecutor. Я знаю, что могу закрыть соединение в конце подзадачи, но при таком решении мы создаем соединение для каждой задачи и соединение не используется повторно. В ThreadPoolExecutor есть инициализатор params, но нет чего-то вроде on_destroy, который может быть вызван при уничтожении потока. main_task работает с сельдереем в моей установке.
def sub_task():
#some db operations
def main_task(max_workers):
with ThreadPoolExecutor(max_workers=max_workers) as executor:
for i in range(10):
executor.submit(sub_task)
Я написал собственный ThreadPoolExecutor и создал список соединений потоков с БД (добавив указатель на их обработчик соединений в функцию инициализатора потоков) и закрыл все соединения с БД всех потоков при выключении экзекутора. обратите внимание, что если вы хотите использовать вашу функцию инициализатора, будьте осторожны, передавайте ее как ключевой аргумент, а не позиционный
class DBSafeThreadPoolExecutor(ThreadPoolExecutor):
def generate_initializer(self, initializer):
def new_initializer(*args, **kwargs):
self, *args = args
try:
if initializer != None:
initializer(*args, **kwargs)
finally:
self.on_thread_init()
return new_initializer
def on_thread_init(self):
for curr_conn in db.connections.all():
curr_conn.connection = None
self.threads_db_conns.append(curr_conn)
def on_executor_shutdown(self):
for curr_conn in self.threads_db_conns:
try:
curr_conn.inc_thread_sharing()
curr_conn.close()
except Exception:
print(f'error while closing connection {curr_conn.alias}')
traceback.print_exc()
def __init__(self, *args, **kwargs):
kwargs['initializer'] = self.generate_initializer(kwargs.get('initializer'))
kwargs['initargs'] = (self,) + (kwargs.get('initargs') or ())
self.threads_db_conns = []
super().__init__(*args, **kwargs)
def shutdown(self, *args, **kwargs):
super().shutdown(*args, **kwargs)
self.on_executor_shutdown()