Закрытие соединений django db после отключения ThreadPoolExecutor

Я хочу использовать ThreadPoolExecutor в django и повторно использовать соединение с БД в потоке, чтобы не создавать соединение с БД для каждой подзадачи. но соединения с БД не закрываются после выключения ThreadPoolExecutor. Я знаю, что могу закрыть соединение в конце подзадачи, но при таком решении мы создаем соединение для каждой задачи и соединение не используется повторно. В ThreadPoolExecutor есть инициализатор params, но нет чего-то вроде on_destroy, который может быть вызван при уничтожении потока. main_task работает с сельдереем в моей установке.

def sub_task():
    #some db operations


def main_task(max_workers): 
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        for i in range(10):
            executor.submit(sub_task)

Я написал собственный ThreadPoolExecutor и создал список соединений потоков с БД (добавив указатель на их обработчик соединений в функцию инициализатора потоков) и закрыл все соединения с БД всех потоков при выключении экзекутора. обратите внимание, что если вы хотите использовать вашу функцию инициализатора, будьте осторожны, передавайте ее как ключевой аргумент, а не позиционный

class DBSafeThreadPoolExecutor(ThreadPoolExecutor):
    
    def generate_initializer(self, initializer):
        def new_initializer(*args, **kwargs):
            self, *args = args
            try:
                if initializer != None:
                    initializer(*args, **kwargs)
            finally:
                self.on_thread_init()
        return new_initializer

    def on_thread_init(self):
        for curr_conn in db.connections.all():
            curr_conn.connection = None
            self.threads_db_conns.append(curr_conn)

    def on_executor_shutdown(self):
        for curr_conn in self.threads_db_conns:
            try:
                curr_conn.inc_thread_sharing()
                curr_conn.close()
            except Exception:
                print(f'error while closing connection {curr_conn.alias}')
                traceback.print_exc()


    def __init__(self, *args, **kwargs):
        kwargs['initializer'] = self.generate_initializer(kwargs.get('initializer'))
        kwargs['initargs'] = (self,) + (kwargs.get('initargs') or ())
        self.threads_db_conns = []
        super().__init__(*args, **kwargs)

    def shutdown(self, *args, **kwargs):
        super().shutdown(*args, **kwargs)
        self.on_executor_shutdown()
Вернуться на верх