Соединения / Двигатели

Как настроить ведение журнала?

См. Настройка ведения журнала.

Как объединить соединения с базой данных? Являются ли мои соединения объединенными?

В большинстве случаев SQLAlchemy автоматически выполняет пул соединений на уровне приложения. Для всех включенных диалектов (кроме SQLite при использовании базы данных «с памятью») объект Engine ссылается на QueuePool как на источник соединения.

Подробнее об этом см. в разделах Конфигурация двигателя и Объединение соединений.

Как передать пользовательские аргументы подключения в API базы данных?

Вызов create_engine() принимает дополнительные аргументы либо непосредственно через ключевое слово-аргумент connect_args:

e = create_engine(
    "mysql+mysqldb://scott:tiger@localhost/test", connect_args={"encoding": "utf8"}
)

Или для базовых строковых и целочисленных аргументов их обычно можно указать в строке запроса URL:

e = create_engine("mysql+mysqldb://scott:tiger@localhost/test?encoding=utf8")

«Сервер MySQL перестал работать»

Основная причина этой ошибки заключается в том, что соединение MySQL завершилось по времени и было закрыто сервером. Сервер MySQL закрывает соединения, которые простояли некоторое время, по умолчанию равное восьми часам. Чтобы учесть это, необходимо включить параметр create_engine.pool_recycle, который гарантирует, что соединение, которое старше заданного количества секунд, будет удалено и заменено новым соединением при следующей проверке.

В более общем случае, когда речь идет о перезагрузке базы данных и других временных потерях связи из-за сетевых проблем, соединения, находящиеся в пуле, могут быть восстановлены в ответ на более общие методы обнаружения разрывов связи. В разделе Работа с разъединениями приведены сведения о «пессимистических» (например, pre-ping) и «оптимистических» (например, graceful recovery) методах. В современной SQLAlchemy предпочтение отдается «пессимистическому» подходу.

«Команды рассинхронизированы; вы не можете выполнить эту команду сейчас» / «Этот объект result не возвращает строки. Он был закрыт автоматически»

Драйверы MySQL имеют достаточно широкий класс режимов отказа, при которых состояние соединения с сервером находится в недопустимом состоянии. Обычно при повторном использовании соединения возникает одно из этих двух сообщений об ошибке. Причина заключается в том, что состояние сервера было изменено на такое, которого клиентская библиотека не ожидает, и когда клиентская библиотека выдает новый оператор на соединение, сервер отвечает не так, как ожидалось.

В SQLAlchemy, поскольку соединения с базами данных объединяются в пул, проблема рассинхронизации сообщений на соединении становится более актуальной, поскольку при сбое операции, если само соединение находится в непригодном состоянии, то при возврате в пул соединений оно снова будет работать со сбоями. Решение этой проблемы заключается в том, что при возникновении такого режима отказа соединение инвалидируется, так что базовое соединение базы данных с MySQL отбрасывается. Эта проверка происходит автоматически для многих известных режимов отказа, а также может быть вызвана явно с помощью метода Connection.invalidate().

Существует также второй класс режимов отказа в этой категории, когда менеджер контекста, например with session.begin_nested():, хочет «откатить» транзакцию при возникновении ошибки; однако в некоторых режимах отказа соединения сам откат (который также может быть операцией RELEASE SAVEPOINT) также не удается, что приводит к появлению недостоверных трассировок стека.

Изначально причина этой ошибки была довольно проста: она означала, что многопоточная программа вызывает команды на одном соединении из более чем одного потока. Это относилось к оригинальному native-C драйверу «MySQLdb», который был практически единственным используемым драйвером. Однако с появлением драйверов на чистом Python, таких как PyMySQL и MySQL-connector-Python, а также с ростом использования таких инструментов, как gevent/eventlet, многопоточной обработки (часто с помощью Celery) и других, появилась целая серия факторов, вызывающих эту проблему, некоторые из которых были улучшены во всех версиях SQLAlchemy, но другие неизбежны:

  • Разделение соединения между потоками - Это первоначальная причина возникновения подобных ошибок. Программа использовала одно и то же соединение в двух или более потоках одновременно, в результате чего несколько наборов сообщений смешивались в соединении, переводя сессию на стороне сервера в состояние, которое клиент уже не знает, как интерпретировать. Однако сегодня более вероятны другие причины.

  • Разделение файлового хэндла соединения между процессами - Обычно это происходит, когда программа использует os.fork() для порождения нового процесса, и TCP-соединение, присутствующее в родительском процессе, передается одному или нескольким дочерним процессам. Поскольку несколько процессов теперь передают сообщения по сути одному и тому же файловому хэндлу, сервер получает чередующиеся сообщения и нарушает состояние соединения.

    Такой сценарий может легко возникнуть, если программа использует «многопроцессорный» модуль Python и использует Engine, созданный в родительском процессе. Обычно «многопроцессорность» используется при использовании таких инструментов, как Celery. Правильный подход должен заключаться либо в том, что при первом запуске дочернего процесса создается новый Engine, отбрасывая все Engine, спустившиеся из родительского процесса; либо в том, что Engine, унаследованный от родительского процесса, может утилизировать свой внутренний пул соединений путем вызова Engine.dispose().

  • Greenlet Monkeypatching w/ Exits - При использовании таких библиотек, как gevent или eventlet, осуществляющих мониторинг сетевого API Python, такие библиотеки, как PyMySQL, начинают работать в асинхронном режиме, хотя они и не разрабатывались явно под эту модель. Распространенной проблемой является прерывание работы зеленого потока, часто из-за логики таймаута в приложении. Это приводит к возникновению исключения GreenletExit, и чистый драйвер MySQL на языке Python прерывает свою работу, которая могла заключаться в получении ответа от сервера или подготовке к иному восстановлению состояния соединения. Когда исключение прерывает всю эту работу, разговор между клиентом и сервером становится рассинхронизированным, и последующее использование соединения может закончиться неудачей. SQLAlchemy, начиная с версии 1.1.0, умеет защищаться от этого: если операция с базой данных прерывается так называемым «исключением выхода», которое включает GreenletExit и любой другой подкласс Python BaseException, не являющийся также подклассом Exception, то соединение аннулируется.

  • Откаты / освобождения SAVEPOINT не удаются - Некоторые классы ошибок приводят к тому, что соединение становится непригодным для использования в контексте транзакции, а также при работе в блоке «SAVEPOINT». В этих случаях сбой в соединении приводит к тому, что любой SAVEPOINT перестает существовать, а когда SQLAlchemy или приложение пытается «откатить» эту точку сохранения, операция «RELEASE SAVEPOINT» заканчивается неудачей, обычно с сообщением типа «savepoint does not exist». В этом случае в Python 3 будет выведена цепочка исключений, в которой будет указана и конечная «причина» ошибки. В Python 2 «цепочки» исключений нет, однако последние версии SQLAlchemy пытаются вывести предупреждение, иллюстрирующее первоначальную причину ошибки, но при этом выдают непосредственную ошибку, которая является неудачей ROLLBACK.

Как автоматически «повторить» выполнение запроса?

В разделе документации Работа с разъединениями рассматриваются стратегии, доступные для пула соединений, которые были отключены с момента последней проверки конкретного соединения. Наиболее современной возможностью в этом отношении является параметр create_engine.pre_ping, позволяющий при извлечении соединения базы данных из пула выдавать «пинг», переподключаясь, если текущее соединение было отключено.

Важно отметить, что этот «пинг» выдается только до того, как соединение будет использовано для выполнения операции. После того как соединение доставлено вызывающей стороне, согласно спецификации Python DBAPI, оно становится объектом операции autobegin, что означает автоматическое начало новой транзакции при его первом использовании, которая остается в силе для последующих операций, пока не будет вызван метод connection.commit() или connection.rollback() уровня DBAPI.

При современном использовании SQLAlchemy серия SQL-операторов всегда вызывается в рамках этого транзакционного состояния, если не включена функция DBAPI autocommit mode (подробнее об этом в следующем разделе), что означает, что ни один оператор не фиксируется автоматически; в случае сбоя операции последствия всех операторов в рамках текущей транзакции будут потеряны.

Для понятия «повторная попытка» это означает, что в стандартном случае при потере соединения теряется вся транзакция. Нет никакого полезного способа, с помощью которого база данных могла бы «переподключиться и повторить попытку» и продолжить работу с того места, на котором она остановилась, поскольку данные уже потеряны. По этой причине SQLAlchemy не имеет прозрачной функции «переподключения», работающей в середине транзакции, для случая, когда соединение с базой данных оборвалось в процессе использования. Каноническим подходом к решению проблемы разрыва соединения в середине операции является повторение всей операции с начала транзакции, часто с помощью пользовательского декоратора Python, который будет «повторять» определенную функцию несколько раз до тех пор, пока не добьется успеха, или иным образом спроектировать приложение таким образом, чтобы оно было устойчиво к разрыву транзакций, который приводит к сбою операций.

Существует также понятие расширений, которые могут отслеживать все операторы, выполнявшиеся в транзакции, и затем воспроизводить их в новой транзакции, чтобы приблизить операцию к «повторной попытке». В SQLAlchemy event system позволяет построить такую систему, однако этот подход также не совсем удобен, так как невозможно гарантировать, что операторы DML будут работать с одним и тем же состоянием, поскольку после завершения транзакции состояние базы данных в новой транзакции может быть совершенно другим. Архитектура «повторной попытки», явно заложенная в приложение в точках начала и фиксации транзакционных операций, остается лучшим подходом, поскольку именно транзакционные методы на уровне приложения лучше всего знают, как повторно выполнить свои шаги.

В противном случае, если бы SQLAlchemy предоставила возможность прозрачно и беззвучно «переподключать» соединение в середине транзакции, то это привело бы к беззвучной потере данных. Пытаясь скрыть проблему, SQLAlchemy значительно усугубит ситуацию.

Однако если мы **не используем транзакции, то у нас есть больше возможностей, о которых пойдет речь в следующем разделе.

Использование DBAPI Autocommit позволяет реализовать версию Transparent Reconnect, доступную только для чтения

Приведенные в предыдущем разделе аргументы в пользу отсутствия механизма прозрачного переподключения основываются на предположении, что приложение действительно использует транзакции на уровне DBAPI. Поскольку большинство DBAPI в настоящее время предлагают native «autocommit» settings, мы можем использовать эти возможности для обеспечения ограниченной формы прозрачного повторного соединения для операций только чтение, только автокоммит. Прозрачное повторное соединение может быть применено к методу cursor.execute() в DBAPI, однако его небезопасно применять к методу cursor.executemany() в DBAPI, так как оператор мог использовать любую часть переданных аргументов.

Предупреждение

Приведенный ниже рецепт не должен использоваться для операций записи данных. Пользователи должны внимательно прочитать и понять принцип работы рецепта, а также тщательно протестировать режимы отказов на конкретном драйвере DBAPI, прежде чем использовать этот рецепт в производстве. Механизм повторных попыток не гарантирует предотвращения ошибок разъединения во всех случаях.

К методу cursor.execute() уровня DBAPI можно применить простой механизм повторных попыток, используя хуки DialectEvents.do_execute() и DialectEvents.do_execute_no_params(), который будет способен перехватывать разрывы соединений во время выполнения операторов. При этом не будут перехватываться обрывы соединений во время операций выборки наборов результатов, для тех DBAPI, которые не полностью буферизируют наборы результатов. Рецепт требует, чтобы база данных поддерживала автокоммит на уровне DBAPI, и не гарантируется для конкретных бэкендов. Представлена единственная функция reconnecting_engine(), которая применяет крючки событий к заданному объекту Engine, возвращая версию always-autocommit, которая включает автокоммит на уровне DBAPI. Соединение прозрачно переподключается при выполнении оператора с одним параметром или без него:

import time

from sqlalchemy import event


def reconnecting_engine(engine, num_retries, retry_interval):
    def _run_with_retries(fn, context, cursor_obj, statement, *arg, **kw):
        for retry in range(num_retries + 1):
            try:
                fn(cursor_obj, statement, context=context, *arg)
            except engine.dialect.dbapi.Error as raw_dbapi_err:
                connection = context.root_connection
                if engine.dialect.is_disconnect(raw_dbapi_err, connection, cursor_obj):
                    if retry > num_retries:
                        raise
                    engine.logger.error(
                        "disconnection error, retrying operation",
                        exc_info=True,
                    )
                    connection.invalidate()

                    # use SQLAlchemy 2.0 API if available
                    if hasattr(connection, "rollback"):
                        connection.rollback()
                    else:
                        trans = connection.get_transaction()
                        if trans:
                            trans.rollback()

                    time.sleep(retry_interval)
                    context.cursor = cursor_obj = connection.connection.cursor()
                else:
                    raise
            else:
                return True

    e = engine.execution_options(isolation_level="AUTOCOMMIT")

    @event.listens_for(e, "do_execute_no_params")
    def do_execute_no_params(cursor_obj, statement, context):
        return _run_with_retries(
            context.dialect.do_execute_no_params, context, cursor_obj, statement
        )

    @event.listens_for(e, "do_execute")
    def do_execute(cursor_obj, statement, parameters, context):
        return _run_with_retries(
            context.dialect.do_execute, context, cursor_obj, statement, parameters
        )

    return e

Учитывая приведенный выше рецепт, переподключение в середине транзакции можно продемонстрировать с помощью следующего сценария доказательства концепции. После запуска он будет каждые пять секунд выдавать в базу данных сообщение SELECT 1:

from sqlalchemy import create_engine
from sqlalchemy import select

if __name__ == "__main__":
    engine = create_engine("mysql+mysqldb://scott:tiger@localhost/test", echo_pool=True)

    def do_a_thing(engine):
        with engine.begin() as conn:
            while True:
                print("ping: %s" % conn.execute(select([1])).scalar())
                time.sleep(5)

    e = reconnecting_engine(
        create_engine("mysql+mysqldb://scott:tiger@localhost/test", echo_pool=True),
        num_retries=5,
        retry_interval=2,
    )

    do_a_thing(e)

Перезапустите базу данных во время выполнения сценария, чтобы продемонстрировать работу прозрачного переподключения:

$ python reconnect_test.py
ping: 1
ping: 1
disconnection error, retrying operation
Traceback (most recent call last):
  ...
MySQLdb._exceptions.OperationalError: (2006, 'MySQL server has gone away')
2020-10-19 16:16:22,624 INFO sqlalchemy.pool.impl.QueuePool Invalidate connection <_mysql.connection open to 'localhost' at 0xf59240>
ping: 1
ping: 1
...

Приведенный рецепт протестирован для SQLAlchemy 1.4.

Почему SQLAlchemy выдает так много ROLLBACK?

В настоящее время SQLAlchemy предполагает, что соединения DBAPI находятся в режиме «non-autocommit» - это поведение API баз данных Python по умолчанию, то есть предполагается, что транзакция всегда находится в процессе. Пул соединений выдает connection.rollback() при возврате соединения. Это делается для того, чтобы все транзакционные ресурсы, оставшиеся на соединении, были освобождены. Для таких баз данных, как PostgreSQL или MSSQL, где ресурсы таблиц агрессивно блокируются, это очень важно, чтобы строки и таблицы не оставались заблокированными в соединениях, которые больше не используются. В противном случае приложение может зависнуть. Однако это относится не только к блокировкам и одинаково важно для любой базы данных с любым видом изоляции транзакций, включая MySQL с InnoDB. Любое соединение, находящееся внутри старой транзакции, будет возвращать устаревшие данные, если эти данные уже запрашивались на этом соединении в условиях изоляции. О том, почему в MySQL могут появляться несвежие данные, читайте на сайте https://dev.mysql.com/doc/refman/5.1/en/innodb-transaction-model.html.

Я нахожусь на MyISAM - как мне его отключить?

Поведение пула соединений при возврате соединений может быть настроено с помощью reset_on_return:

from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool

engine = create_engine(
    "mysql+mysqldb://scott:tiger@localhost/myisam_database",
    pool=QueuePool(reset_on_return=False),
)

Я работаю на SQL Server - как мне превратить эти ROLLBACK в COMMIT?

reset_on_return принимает значения commit, rollback в дополнение к True, False и None. Установка значения commit приведет к COMMIT при возврате любого соединения в пул:

engine = create_engine(
    "mssql+pyodbc://scott:tiger@mydsn", pool=QueuePool(reset_on_return="commit")
)

Я использую несколько соединений с базой данных SQLite (обычно для проверки работы транзакций), и моя тестовая программа не работает!

При использовании базы данных SQLite :memory: по умолчанию используется пул соединений SingletonThreadPool, который поддерживает ровно одно соединение SQLite на поток. Таким образом, два соединения, используемые в одном потоке, на самом деле будут одним и тем же соединением SQLite. Убедитесь, что вы не используете базу данных :memory:, чтобы движок использовал QueuePool (по умолчанию для баз данных без памяти в текущих версиях SQLAlchemy).

См.также

Поведение потоков/пулинга - информация о поведении PySQLite.

Как получить необработанное соединение DBAPI при использовании Engine?

При использовании обычного SA-соединения на уровне движка можно получить пул-проксированную версию DBAPI-соединения через атрибут Connection.connection на Connection, а для реального DBAPI-соединения можно вызвать атрибут PoolProxiedConnection.dbapi_connection на нем. В обычных драйверах синхронизации обычно нет необходимости обращаться к непул-проксированному DBAPI-соединению, так как все методы проксируются через:

engine = create_engine(...)
conn = engine.connect()

# pep-249 style PoolProxiedConnection (historically called a "connection fairy")
connection_fairy = conn.connection

# typically to run statements one would get a cursor() from this
# object
cursor_obj = connection_fairy.cursor()
# ... work with cursor_obj

# to bypass "connection_fairy", such as to set attributes on the
# unproxied pep-249 DBAPI connection, use .dbapi_connection
raw_dbapi_connection = connection_fairy.dbapi_connection

# the same thing is available as .driver_connection (more on this
# in the next section)
also_raw_dbapi_connection = connection_fairy.driver_connection

Изменено в версии 1.4.24: Добавлен атрибут PoolProxiedConnection.dbapi_connection, который заменяет предыдущий атрибут PoolProxiedConnection.connection, остающийся доступным; этот атрибут всегда предоставляет объект соединения в синхронном стиле pep-249. Также добавлен атрибут PoolProxiedConnection.driver_connection, который всегда будет ссылаться на реальное соединение на уровне драйвера, независимо от того, какой API он представляет.

Доступ к базовому соединению для драйвера asyncio

При использовании драйвера asyncio в приведенную схему вносятся два изменения. Первое заключается в том, что при использовании AsyncConnection доступ к PoolProxiedConnection должен осуществляться с помощью метода awaitable AsyncConnection.get_raw_connection(). Возвращаемый PoolProxiedConnection в этом случае сохраняет схему использования pep-249 в стиле sync, а атрибут PoolProxiedConnection.dbapi_connection ссылается на адаптированный к SQLAlchemy объект соединения, который адаптирует соединение asyncio к API pep-249 в стиле sync, иными словами, при использовании драйвера asyncio происходит два уровня проксирования. Собственно asyncio-соединение доступно из атрибута driver_connection. Пересказ предыдущего примера в терминах asyncio выглядит так:

async def main():
    engine = create_async_engine(...)
    conn = await engine.connect()

    # pep-249 style ConnectionFairy connection pool proxy object
    # presents a sync interface
    connection_fairy = await conn.get_raw_connection()

    # beneath that proxy is a second proxy which adapts the
    # asyncio driver into a pep-249 connection object, accessible
    # via .dbapi_connection as is the same with a sync API
    sqla_sync_conn = connection_fairy.dbapi_connection

    # the really-real innermost driver connection is available
    # from the .driver_connection attribute
    raw_asyncio_connection = connection_fairy.driver_connection

    # work with raw asyncio connection
    result = await raw_asyncio_connection.execute(...)

Изменено в версии 1.4.24: Добавлены атрибуты PoolProxiedConnection.dbapi_connection и PoolProxiedConnection.driver_connection, позволяющие получить доступ к соединениям pep-249, адаптационным слоям pep-249 и базовым соединениям драйверов с использованием согласованного интерфейса.

При использовании драйверов asyncio указанное выше соединение «DBAPI» на самом деле является адаптированной к SQLAlchemy формой соединения, представляющей синхронный API в стиле pep-249. Для доступа к реальному соединению с asyncio-драйвером, которое будет представлять оригинальный asyncio API используемого драйвера, можно воспользоваться атрибутом PoolProxiedConnection.driver_connection, входящим в состав PoolProxiedConnection. Для стандартного драйвера pep-249 атрибуты PoolProxiedConnection.dbapi_connection и PoolProxiedConnection.driver_connection являются синонимами.

Перед возвращением соединения в пул необходимо вернуть все настройки уровня изоляции или другие настройки, специфичные для данной операции, в нормальное состояние.

В качестве альтернативы возврату настроек можно вызвать метод Connection.detach() для Connection или проксируемого соединения, который деассоциирует соединение из пула таким образом, что оно будет закрыто и отброшено при вызове Connection.close():

conn = engine.connect()
conn.detach()  # detaches the DBAPI connection from the connection pool
conn.connection.<go nuts>
conn.close()  # connection is closed for real, the pool replaces it with a new connection

Как использовать движки / соединения / сессии с многопроцессорной обработкой Python или os.fork()?

Об этом рассказывается в разделе Использование пулов соединений с многопроцессорной обработкой или os.fork().

Вернуться на верх