Объединение соединений

Пул соединений - это стандартная техника, используемая для сохранения длительных соединений в памяти для эффективного повторного использования, а также для обеспечения управления общим количеством соединений, которые приложение может использовать одновременно.

В частности, для веб-приложений на стороне сервера пул соединений является стандартным способом поддержания в памяти «пула» активных соединений с базой данных, которые повторно используются при выполнении запросов.

SQLAlchemy включает несколько реализаций пулов соединений, которые интегрируются с Engine. Их также можно использовать непосредственно для приложений, которые хотят добавить пул к обычному подходу DBAPI.

Конфигурация пула соединений

Функция Engine, возвращаемая функцией create_engine(), в большинстве случаев имеет встроенный QueuePool, предварительно сконфигурированный с разумными настройками пулинга по умолчанию. Если вы читаете этот раздел только для того, чтобы узнать, как включить пулинг - поздравляю! Вы уже сделали это.

Наиболее распространенные параметры настройки QueuePool могут быть переданы непосредственно в create_engine() в качестве аргументов ключевых слов: pool_size, max_overflow, pool_recycle и pool_timeout. Например:

engine = create_engine("postgresql://me@localhost/mydb", pool_size=20, max_overflow=0)

В случае SQLite, SingletonThreadPool или NullPool выбираются диалектом для обеспечения большей совместимости с моделью потоков и блокировок SQLite, а также для обеспечения разумного поведения по умолчанию для баз данных SQLite с «памятью», которые хранят весь свой набор данных в пределах одного соединения.

Все реализации пула SQLAlchemy объединяет то, что ни одна из них не «предварительно создает» соединения - все реализации ждут до первого использования, прежде чем создать соединение. В этот момент, если не поступает дополнительных одновременных запросов на установку соединений, дополнительные соединения не создаются. Вот почему совершенно нормально для create_engine() по умолчанию использовать QueuePool размером пять без учета того, действительно ли приложению нужно пять соединений в очереди - пул вырастет до такого размера, только если приложение действительно использует пять соединений одновременно, и в этом случае использование небольшого пула является вполне подходящим поведением по умолчанию.

Реализации коммутационных пулов

Обычный способ использования пула другого типа с create_engine() - это использование аргумента poolclass. Этот аргумент принимает класс, импортированный из модуля sqlalchemy.pool, и обрабатывает детали создания пула за вас. Обычные варианты включают указание QueuePool с SQLite:

from sqlalchemy.pool import QueuePool

engine = create_engine("sqlite:///file.db", poolclass=QueuePool)

Отключение объединения с помощью NullPool:

from sqlalchemy.pool import NullPool

engine = create_engine(
    "postgresql+psycopg2://scott:tiger@localhost/test", poolclass=NullPool
)

Использование пользовательской функции подключения

Подробную информацию о различных процедурах настройки соединения см. в разделе Пользовательские аргументы DBAPI connect() / процедуры включения соединения.

Строительство бассейна

Чтобы использовать Pool самостоятельно, функция creator является единственным аргументом, который требуется, и передается первой, за которой следуют любые дополнительные опции:

import sqlalchemy.pool as pool
import psycopg2


def getconn():
    c = psycopg2.connect(user="ed", host="127.0.0.1", dbname="test")
    return c


mypool = pool.QueuePool(getconn, max_overflow=10, pool_size=5)

Затем соединения DBAPI могут быть получены из пула с помощью функции Pool.connect(). Возвращаемым значением этого метода является соединение DBAPI, содержащееся в прозрачном прокси:

# get a connection
conn = mypool.connect()

# use it
cursor_obj = conn.cursor()
cursor_obj.execute("select foo")

Цель прозрачного прокси - перехватить вызов close(), чтобы вместо закрытия соединения DBAPI оно было возвращено в пул:

# "close" the connection.  Returns
# it to the pool.
conn.close()

Прокси также возвращает содержащееся в нем DBAPI-соединение в пул при сборке мусора, хотя в Python это происходит не сразу (хотя это типично для cPython). Однако такое использование не рекомендуется и, в частности, не поддерживается с драйверами asyncio DBAPI.

Сброс при возврате

Пул также включает функцию «сброс при возврате», которая будет вызывать метод rollback() для соединения DBAPI, когда соединение возвращается в пул. Это делается для того, чтобы все существующие транзакции на соединении были удалены, не только гарантируя, что при следующем использовании не останется никаких существующих состояний, но и для того, чтобы блокировки таблиц и строк были освобождены, а также чтобы были удалены любые изолированные снимки данных. Это rollback() происходит в большинстве случаев даже при использовании объекта Engine, за исключением случая, когда Connection может гарантировать, что rollback() был вызван непосредственно перед возвратом соединения в пул.

Для большинства DBAPI вызов rollback() является очень дешевым, и если DBAPI уже завершила транзакцию, то метод не должен работать. Однако для DBAPI, у которых возникают проблемы с производительностью при использовании rollback(), даже если в соединении нет состояния, это поведение можно отключить с помощью опции reset_on_return в Pool. Это поведение безопасно отключать при следующих условиях:

  • Если база данных вообще не поддерживает транзакции, например, при использовании MySQL с движком MyISAM, или DBAPI используется только в режиме autocommit, поведение может быть отключено.

  • Если пул сам не поддерживает соединение после регистрации, например, при использовании NullPool, это поведение можно отключить.

  • В противном случае должно быть обеспечено, что:

    • приложение гарантирует, что все объекты Connection явно закрыты с помощью менеджера контекста (т.е. блока with) или блока в стиле try/finally

    • соединениям никогда не разрешается собираться в мусор до явного закрытия.

    • само соединение DBAPI, например connection.connection, не используется напрямую, или приложение гарантирует, что .rollback() будет вызвано на этом соединении, прежде чем отпустить его обратно в пул соединений.

Шаг «сброс при возврате» может быть зарегистрирован с помощью уровня журнала logging.DEBUG вместе с регистратором sqlalchemy.pool, или путем установки echo_pool='debug' с create_engine().

Мероприятия в бассейне

Пулы соединений поддерживают интерфейс событий, который позволяет выполнять хуки при первом подключении, при каждом новом подключении, а также при проверке и регистрации подключений. Подробности см. в разделе PoolEvents.

Работа с разъединениями

Пул соединений имеет возможность обновлять как отдельные соединения, так и весь набор соединений, устанавливая ранее установленные соединения как «недействительные». Распространенный вариант использования - позволить пулу соединений изящно восстановиться, когда сервер базы данных был перезапущен, и все ранее установленные соединения больше не функционируют. Для этого существует два подхода.

Работа с разъединениями - пессимистично

Пессимистический подход подразумевает выполнение тестового оператора для SQL-соединения в начале каждой проверки пула соединений, чтобы проверить, что соединение с базой данных все еще жизнеспособно. Обычно это простой оператор типа «SELECT 1», но может также использоваться какой-либо метод, специфичный для DBAPI, для проверки соединения на жизнеспособность.

Этот подход добавляет немного накладных расходов в процесс проверки соединений, однако в остальном это самый простой и надежный подход к полному устранению ошибок базы данных из-за несвежих соединений в пуле. Вызывающему приложению не нужно заботиться об организации операций, чтобы иметь возможность восстанавливаться после неактивных соединений, выгруженных из пула.

Важно отметить, что подход предварительного пинга не учитывает соединения, потерянные в середине транзакций или других операций SQL. Если база данных станет недоступной во время выполнения транзакции, транзакция будет потеряна и возникнет ошибка базы данных. Хотя объект Connection обнаружит ситуацию «разъединения» и переработает соединение, а также аннулирует остальной пул соединений, когда это произойдет, отдельная операция, при которой было вызвано исключение, будет потеряна, и приложению придется либо отказаться от операции, либо повторить всю транзакцию заново. Если движок настроен с использованием автокоммит-соединений на уровне DBAPI, как описано в Установка уровней изоляции транзакций, включая DBAPI Autocommit, соединение может быть переподключено прозрачно в середине операции с помощью событий. Пример смотрите в разделе Как автоматически «повторить» выполнение заявления?.

Пессимистическое тестирование соединений при проверке достижимо с помощью аргумента Pool.pre_ping, доступного из create_engine() через аргумент create_engine.pool_pre_ping:

engine = create_engine("mysql+pymysql://user:pw@host/db", pool_pre_ping=True)

Функция «pre ping» обычно выдает SQL, эквивалентный «SELECT 1», каждый раз, когда соединение проверяется из пула; если возникает ошибка, которая определяется как ситуация «разъединения», соединение будет немедленно переработано, а все другие соединения в пуле старше текущего времени будут аннулированы, так что при следующей проверке они также будут переработаны перед использованием.

Если база данных все еще недоступна, когда выполняется «pre ping», то первоначальное подключение будет неудачным, и ошибка о невозможности подключения будет распространяться нормально. В редкой ситуации, когда база данных доступна для подключения, но не может ответить на «ping», «pre_ping» будет пытаться до трех раз, прежде чем сдастся, распространяя последнюю полученную ошибку базы данных.

Примечание

SELECT 1», выдаваемый «pre-ping», вызывается в рамках пула соединений / диалекта, используя очень короткий кодовый путь для минимальной задержки Python. Таким образом, это утверждение не регистрируется в выводе SQL echo и не отображается в логах движка SQLAlchemy.

Добавлено в версии 1.2: Добавлена возможность «предварительного пинга» в класс Pool.

Пользовательский / Наследие Пессимистический пинг

До добавления create_engine.pool_pre_ping подход «предварительного пинга» исторически выполнялся вручную с использованием события движка ConnectionEvents.engine_connect(). Наиболее распространенный рецепт для этого приведен ниже, для справки, если приложение уже использует такой рецепт, или если требуется особое поведение:

from sqlalchemy import exc
from sqlalchemy import event
from sqlalchemy import select

some_engine = create_engine(...)


@event.listens_for(some_engine, "engine_connect")
def ping_connection(connection, branch):
    if branch:
        # "branch" refers to a sub-connection of a connection,
        # we don't want to bother pinging on these.
        return

    # turn off "close with result".  This flag is only used with
    # "connectionless" execution, otherwise will be False in any case
    save_should_close_with_result = connection.should_close_with_result
    connection.should_close_with_result = False

    try:
        # run a SELECT 1.   use a core select() so that
        # the SELECT of a scalar value without a table is
        # appropriately formatted for the backend
        connection.scalar(select(1))
    except exc.DBAPIError as err:
        # catch SQLAlchemy's DBAPIError, which is a wrapper
        # for the DBAPI's exception.  It includes a .connection_invalidated
        # attribute which specifies if this connection is a "disconnect"
        # condition, which is based on inspection of the original exception
        # by the dialect in use.
        if err.connection_invalidated:
            # run the same SELECT again - the connection will re-validate
            # itself and establish a new connection.  The disconnect detection
            # here also causes the whole connection pool to be invalidated
            # so that all stale connections are discarded.
            connection.scalar(select(1))
        else:
            raise
    finally:
        # restore "close with result"
        connection.should_close_with_result = save_should_close_with_result

Приведенный выше рецепт имеет то преимущество, что мы используем возможности SQLAlchemy для обнаружения тех исключений DBAPI, которые, как известно, указывают на ситуацию «разъединения», а также способность объекта Engine правильно аннулировать текущий пул соединений при возникновении этого условия и позволить текущему Connection заново валидироваться на новое соединение DBAPI.

Работа с разъединением - оптимистично

Когда пессимистическая обработка не используется, а также когда база данных выключается и/или перезапускается в середине периода использования соединения в транзакции, другой подход к работе с устаревшими / закрытыми соединениями заключается в том, чтобы позволить SQLAlchemy обрабатывать разъединения по мере их возникновения, в этот момент все соединения в пуле аннулируются, то есть они считаются устаревшими и будут обновлены при следующей проверке. Это поведение предполагает, что Pool используется в сочетании с Engine. Engine имеет логику, которая может обнаруживать события разъединения и автоматически обновлять пул.

Когда Connection пытается использовать соединение DBAPI и возникает исключение, соответствующее событию «разъединение», соединение аннулируется. Затем Connection вызывает метод Pool.recreate(), эффективно аннулируя все соединения, не проверенные в данный момент, чтобы они были заменены новыми при следующей проверке. Этот поток проиллюстрирован в примере кода ниже:

from sqlalchemy import create_engine, exc

e = create_engine(...)
c = e.connect()

try:
    # suppose the database has been restarted.
    c.execute(text("SELECT * FROM table"))
    c.close()
except exc.DBAPIError, e:
    # an exception is raised, Connection is invalidated.
    if e.connection_invalidated:
        print("Connection was invalidated!")

# after the invalidate event, a new connection
# starts with a new Pool
c = e.connect()
c.execute(text("SELECT * FROM table"))

Приведенный выше пример показывает, что не требуется специального вмешательства для обновления пула, который продолжает нормально работать после обнаружения события разъединения. Тем не менее, возникает одно исключение базы данных на каждое соединение, которое используется во время события недоступности базы данных. В типичном веб-приложении, использующем ORM Session, вышеописанное условие будет соответствовать одному запросу, который завершится ошибкой 500, после чего веб-приложение продолжит работу в нормальном режиме. Таким образом, данный подход является «оптимистичным», так как частые перезагрузки базы данных не предвидятся.

Настройка утилизации пула

Дополнительным параметром, который может дополнить «оптимистичный» подход, является установка параметра recycle пула. Этот параметр не позволяет пулу использовать конкретное соединение, возраст которого истек, и подходит для бэкендов баз данных, таких как MySQL, которые автоматически закрывают соединения, которые устарели после определенного периода времени:

from sqlalchemy import create_engine

e = create_engine("mysql://scott:tiger@localhost/test", pool_recycle=3600)

Выше, любое соединение DBAPI, которое было открыто более одного часа, будет аннулировано и заменено при следующей проверке. Обратите внимание, что аннулирование только происходит во время проверки - не для любых соединений, которые находятся в состоянии проверки. pool_recycle является функцией самого Pool, независимо от того, используется ли Engine.

Подробнее об инвалидизации

Pool предоставляет услуги «аннулирования соединения», которые позволяют как явное аннулирование соединения, так и автоматическое аннулирование в ответ на условия, которые делают соединение непригодным для использования.

«Недействительность» означает, что конкретное DBAPI-соединение удаляется из пула и отбрасывается. Метод .close() вызывается на этом соединении, если не ясно, что само соединение может быть не закрыто, однако если этот метод не срабатывает, исключение регистрируется, но операция все равно продолжается.

Если используется Engine, метод Connection.invalidate() является обычной точкой входа для явного аннулирования. Другие условия, при которых соединение DBAPI может быть признано недействительным, включают:

  • исключение DBAPI, такое как OperationalError, возникающее при вызове метода типа connection.execute(), определяется как указание на так называемое состояние «разъединения». Поскольку Python DBAPI не предоставляет стандартной системы для определения природы исключения, все диалекты SQLAlchemy включают систему под названием is_disconnect(), которая изучает содержимое объекта исключения, включая строковое сообщение и любые потенциальные коды ошибок, включенные в него, чтобы определить, указывает ли это исключение на то, что соединение больше не может использоваться. Если это так, то вызывается метод _ConnectionFairy.invalidate() и соединение DBAPI отбрасывается.

  • Когда соединение возвращается в пул, и вызов методов connection.rollback() или connection.commit(), как диктуется поведением пула «reset on return», вызывает исключение. Будет сделана последняя попытка вызвать .close() на соединении, после чего оно будет отброшено.

  • Когда слушатель, реализующий PoolEvents.checkout(), вызывает исключение DisconnectionError, указывающее на то, что соединение не может быть использовано и необходимо предпринять новую попытку соединения.

Все возникающие недействительности будут вызывать событие PoolEvents.invalidate().

Использование FIFO против LIFO

Класс QueuePool имеет флаг QueuePool.use_lifo, который также может быть доступен из create_engine() через флаг create_engine.pool_use_lifo. Установка этого флага в значение True приводит к тому, что поведение «очереди» пула становится поведением «стека», например, последнее соединение, возвращенное в пул, будет первым, которое будет использовано при следующем запросе. В отличие от давнего поведения пула по принципу «первым пришел - первым ушел», который создает эффект круговой очереди, последовательно используя каждое соединение в пуле, режим lifo позволяет лишним соединениям оставаться в пуле незадействованными, позволяя схемам тайм-аута на стороне сервера закрывать эти соединения. Разница между FIFO и LIFO заключается в том, желательно ли для пула поддерживать полный набор соединений, готовых к работе, даже в периоды простоя:

engine = create_engine("postgreql://", pool_use_lifo=True, pool_pre_ping=True)

Выше мы также использовали флаг create_engine.pool_pre_ping, чтобы соединения, закрываемые со стороны сервера, изящно обрабатывались пулом соединений и заменялись новым соединением.

Обратите внимание, что флаг применяется только при использовании QueuePool.

Добавлено в версии 1.3.

Использование пулов соединений с многопроцессорной обработкой или os.fork()

Очень важно, чтобы при использовании пула соединений и, соответственно, при использовании Engine, созданного через create_engine(), пул соединений не был общим для вилочного процесса. TCP-соединения представлены в виде дескрипторов файлов, которые обычно работают через границы процессов, что означает, что это приведет к одновременному доступу к дескриптору файла от имени двух или более совершенно независимых состояний интерпретатора Python.

В зависимости от специфики драйвера и ОС, проблемы, возникающие здесь, варьируются от нерабочих соединений до сокетных соединений, которые используются несколькими процессами одновременно, что приводит к нарушению обмена сообщениями (последний случай, как правило, наиболее распространен).

Объект SQLAlchemy Engine относится к пулу соединений существующих соединений базы данных. Поэтому, когда этот объект реплицируется в дочерний процесс, цель состоит в том, чтобы гарантировать, что никакие соединения с базой данных не будут перенесены. Для этого существует три общих подхода:

  1. Отключите объединение соединений с помощью NullPool. Это наиболее упрощенная, одноразовая система, которая не позволяет Engine использовать любое соединение более одного раза:

    from sqlalchemy.pool import NullPool
    
    engine = create_engine("mysql://user:pass@host/dbname", poolclass=NullPool)
  2. Вызовите Engine.dispose() на любом данном Engine, передав параметру Engine.dispose.close значение False, в фазе инициализации дочернего процесса. Это делается для того, чтобы новый процесс не трогал соединения родительского процесса, а начинал с новых соединений. Это рекомендуемый подход:

    from multiprocessing import Pool
    
    engine = create_engine("mysql+mysqldb://user:pass@host/dbname")
    
    
    def run_in_process(some_data_record):
        with engine.connect() as conn:
            conn.execute(text("..."))
    
    
    def initializer():
        """ensure the parent proc's database connections are not touched
        in the new connection pool"""
        engine.dispose(close=False)
    
    
    with Pool(10, initializer=initializer) as p:
        p.map(run_in_process, data)

    Добавлено в версии 1.4.33: Добавлен параметр Engine.dispose.close, позволяющий заменить пул соединений в дочернем процессе без вмешательства в соединения, используемые родительским процессом.

    Чтобы добиться такого же поведения «dispose without close» до версии 1.4.33 (все версии SQLAlchemy), вместо вызова Engine.dispose() замените Pool непосредственно на Pool.recreate():

    engine.pool = engine.pool.recreate()

    Приведенный выше код эквивалентен engine.dispose(close=False) за исключением того, что не вызывается хук событий конечного пользователя ConnectionEvents.engine_disposed(); если код конечного пользователя не использует этот хук, это обходное решение не имеет других негативных последствий.

  3. Вызовите Engine.dispose() непосредственно перед созданием дочернего процесса. Это также приведет к тому, что дочерний процесс начнет работу с новым пулом соединений, при этом родительские соединения не будут переданы дочернему процессу:

    engine = create_engine("mysql://user:pass@host/dbname")
    
    
    def run_in_process():
        with engine.connect() as conn:
            conn.execute(text("..."))
    
    
    # before process starts, ensure engine.dispose() is called
    engine.dispose()
    p = Process(target=run_in_process)
    p.start()
  4. К пулу соединений может быть применен обработчик событий, который проверяет наличие соединений, совместно используемых через границы процессов, и аннулирует их:

    from sqlalchemy import event
    from sqlalchemy import exc
    import os
    
    engine = create_engine("...")
    
    
    @event.listens_for(engine, "connect")
    def connect(dbapi_connection, connection_record):
        connection_record.info["pid"] = os.getpid()
    
    
    @event.listens_for(engine, "checkout")
    def checkout(dbapi_connection, connection_record, connection_proxy):
        pid = os.getpid()
        if connection_record.info["pid"] != pid:
            connection_record.dbapi_connection = connection_proxy.dbapi_connection = None
            raise exc.DisconnectionError(
                "Connection record belongs to pid %s, "
                "attempting to check out in pid %s" % (connection_record.info["pid"], pid)
            )

    Выше мы использовали подход, аналогичный описанному в Работа с разъединениями - пессимистично, чтобы рассматривать соединение DBAPI, возникшее в другом родительском процессе, как «недействительное» соединение, заставляя пул перерабатывать запись соединения для создания нового соединения.

Приведенные выше стратегии подходят для случая, когда Engine разделяется между процессами. Приведенных выше шагов недостаточно для случая совместного использования конкретного Connection через границу процесса; лучше сохранить область видимости конкретного Connection локальной для одного процесса (и потока). Кроме того, не поддерживается обмен любым видом текущего транзакционного состояния непосредственно через границу процесса, например, объектом ORM Session, который начал транзакцию и ссылается на активные экземпляры Connection; опять же, предпочтительнее создавать новые объекты Session в новых процессах.

Документация API - Доступные реализации пулов

Вернуться на верх