Транзакции и управление соединениями

Управление транзакциями

Изменено в версии 1.4: Управление транзакциями сеанса было пересмотрено, чтобы стать более понятным и простым в использовании. В частности, теперь оно включает операцию «autobegin», что означает, что момент начала транзакции можно контролировать, не используя традиционный режим «autocommit».

Session отслеживает состояние одной «виртуальной» транзакции за раз, используя объект под названием SessionTransaction. Затем этот объект использует базовый Engine или движки, к которым привязан объект Session, чтобы начать реальные транзакции на уровне соединения, используя объект Connection по мере необходимости.

Эта «виртуальная» транзакция создается автоматически, когда это необходимо, или может быть запущена с помощью метода Session.begin(). В максимально возможной степени использование менеджера контекста Python поддерживается как на уровне создания объектов Session, так и для поддержания области видимости SessionTransaction.

Ниже, предположим, что мы начинаем с Session:

from sqlalchemy.orm import Session

session = Session(engine)

Теперь мы можем выполнять операции внутри разграниченной транзакции с помощью менеджера контекста:

with session.begin():
    session.add(some_object())
    session.add(some_other_object())
# commits transaction at the end, or rolls back if there
# was an exception raised

В конце вышеупомянутого контекста, если не возникло исключений, все ожидающие объекты будут сброшены в базу данных, а транзакция базы данных будет зафиксирована. Если в вышеуказанном блоке возникло исключение, то транзакция будет откатана. В обоих случаях после выхода из блока вышеупомянутый Session готов к использованию в последующих транзакциях.

Метод Session.begin() является необязательным, а Session может также использоваться в подходе commit-as-you-go, где он будет начинать транзакции автоматически по мере необходимости; их нужно только зафиксировать или откатить:

session = Session(engine)

session.add(some_object())
session.add(some_other_object())

session.commit()  # commits

# will automatically begin again
result = session.execute("< some select statement >")
session.add_all([more_objects, ...])
session.commit()  # commits

session.add(still_another_object)
session.flush()  # flush still_another_object
session.rollback()  # rolls back still_another_object

Сам Session содержит метод Session.close(). Если Session запускается в транзакции, которая еще не была зафиксирована или откачена, этот метод отменит (т.е. откатит) транзакцию, а также удалит все объекты, содержащиеся в состоянии объекта Session. Если Session используется таким образом, что вызов Session.commit() или Session.rollback() не гарантирован (например, не в менеджере контекста или подобном), метод close может быть использован для обеспечения освобождения всех ресурсов:

# expunges all objects, releases all transactions unconditionally
# (with rollback), releases all database connections back to their
# engines
session.close()

Наконец, сам процесс создания/закрытия сессии может быть запущен через контекстный менеджер. Это лучший способ обеспечить, чтобы область использования объекта Session была ограничена в пределах фиксированного блока. Иллюстрация на примере конструктора Session first:

with Session(engine) as session:
    session.add(some_object())
    session.add(some_other_object())

    session.commit()  # commits

    session.add(still_another_object)
    session.flush()  # flush still_another_object

    session.commit()  # commits

    result = session.execute("<some SELECT statement>")

# remaining transactional state from the .execute() call is
# discarded

Аналогично, sessionmaker может быть использован таким же образом:

Session = sessionmaker(engine)

with Session() as session:
    with session.begin():
        session.add(some_object)
    # commits

# closes the Session

Сам sessionmaker включает в себя метод sessionmaker.begin(), позволяющий выполнять обе операции одновременно:

with Session.begin() as session:
    session.add(some_object)

Использование SAVEPOINT

Транзакции SAVEPOINT, если они поддерживаются базовым механизмом, могут быть разграничены с помощью метода Session.begin_nested():

Session = sessionmaker()

with Session.begin() as session:
    session.add(u1)
    session.add(u2)

    nested = session.begin_nested()  # establish a savepoint
    session.add(u3)
    nested.rollback()  # rolls back u3, keeps u1 and u2

# commits u1 and u2

При каждом вызове Session.begin_nested() в базу данных в рамках текущей транзакции базы данных (если она еще не началась) передается новая команда «BEGIN SAVEPOINT» и возвращается объект типа SessionTransaction, который представляет собой хэндл к этому SAVEPOINT. При вызове метода .commit() на этом объекте в базу данных передается сообщение «RELEASE SAVEPOINT», а если вместо этого вызывается метод .rollback(), то передается сообщение «ROLLBACK TO SAVEPOINT». Охватывающая транзакция базы данных продолжает выполняться.

Session.begin_nested() обычно используется в качестве менеджера контекста, где могут быть пойманы конкретные ошибки отдельных экземпляров, в сочетании с откатом, созданным для этой части состояния транзакции, без отката всей транзакции, как в примере ниже:

for record in records:
    try:
        with session.begin_nested():
            session.merge(record)
    except:
        print("Skipped record %s" % record)
session.commit()

Когда менеджер контекста, переданный Session.begin_nested(), завершает работу, он «фиксирует» точку сохранения, что включает в себя обычное поведение очистки всех ожидающих состояний. При возникновении ошибки точка сохранения откатывается, и состояние Session, локальное для объектов, которые были изменены, становится недействительным.

Этот шаблон идеально подходит для таких ситуаций, как использование PostgreSQL и перехват IntegrityError для обнаружения дублирующихся строк; PostgreSQL обычно прерывает всю транзакцию при возникновении такой ошибки, однако при использовании SAVEPOINT внешняя транзакция сохраняется. В приведенном ниже примере список данных сохраняется в базе данных, при этом случайные записи с «дублирующимся первичным ключом» пропускаются, без отката всей операции:

from sqlalchemy import exc

with session.begin():
    for record in records:
        try:
            with session.begin_nested():
                obj = SomeRecord(id=record["identifier"], name=record["name"])
                session.add(obj)
        except exc.IntegrityError:
            print(f"Skipped record {record} - row already exists")

Когда вызывается Session.begin_nested(), Session сначала сбрасывает все текущее состояние в базу данных; это происходит безоговорочно, независимо от значения параметра Session.autoflush, который обычно может использоваться для отключения автоматического сбрасывания. Такое поведение объясняется тем, что при откате этой вложенной транзакции Session может уничтожить все состояния в памяти, которые были созданы в рамках SAVEPOINT, гарантируя при этом, что при обновлении объектов с истекшим сроком хранения состояние графа объектов до начала SAVEPOINT будет доступно для повторной загрузки из базы данных.

В современных версиях SQLAlchemy при откате SAVEPOINT, инициированного командой Session.begin_nested(), состояние объекта в памяти, которое было изменено с момента создания SAVEPOINT, уничтожается, однако состояние объекта, которое не было изменено с момента создания SAVEPOINT, сохраняется. Это делается для того, чтобы последующие операции могли продолжать использовать незатронутые данные без необходимости их обновления из базы данных.

См.также

Connection.begin_nested() - Основной API SAVEPOINT

Управление транзакциями на уровне сеанса и на уровне двигателя

Connection в Core и _session.Session в ORM имеют эквивалентную транзакционную семантику, как на уровне sessionmaker против Engine, так и Session против Connection. В следующих разделах подробно описаны эти сценарии, основанные на следующей схеме:

ORM                                           Core
-----------------------------------------     -----------------------------------
sessionmaker                                  Engine
Session                                       Connection
sessionmaker.begin()                          Engine.begin()
some_session.commit()                         some_connection.commit()
with some_sessionmaker() as session:          with some_engine.connect() as conn:
with some_sessionmaker.begin() as session:    with some_engine.begin() as conn:
with some_session.begin_nested() as sp:       with some_connection.begin_nested() as sp:

Принимайте обязательства по ходу дела

Как Session, так и Connection имеют методы Connection.commit() и Connection.rollback(). Используя операции в стиле SQLAlchemy 2.0, эти методы во всех случаях воздействуют на внешнюю транзакцию. Для Session предполагается, что Session.autobegin оставлен в значении по умолчанию True.

Engine:

engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")

with engine.connect() as conn:
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"},
        ],
    )
    conn.commit()

Session:

Session = sessionmaker(engine)

with Session() as session:
    session.add_all(
        [
            SomeClass(data="some data one"),
            SomeClass(data="some data two"),
            SomeClass(data="some data three"),
        ]
    )
    session.commit()

Начать однажды

И sessionmaker, и Engine имеют метод Engine.begin(), который получает новый объект для выполнения SQL-запросов (Session и Connection соответственно), а затем возвращает менеджер контекста, который будет поддерживать контекст начала/завершения/отката для этого объекта.

Двигатель:

engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")

with engine.begin() as conn:
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"},
        ],
    )
# commits and closes automatically

Сессия:

Session = sessionmaker(engine)

with Session.begin() as session:
    session.add_all(
        [
            SomeClass(data="some data one"),
            SomeClass(data="some data two"),
            SomeClass(data="some data three"),
        ]
    )
# commits and closes automatically

Вложенная транзакция

При использовании SAVEPOINT с помощью методов Session.begin_nested() или Connection.begin_nested(), возвращаемый объект транзакции должен использоваться для фиксации или отката SAVEPOINT. Вызов методов Session.commit() или Connection.commit() всегда будет фиксировать внешнюю транзакцию; это специфическое поведение SQLAlchemy 2.0, обратное тому, что было в серии 1.x.

Двигатель:

engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")

with engine.begin() as conn:
    savepoint = conn.begin_nested()
    conn.execute(
        some_table.insert(),
        [
            {"data": "some data one"},
            {"data": "some data two"},
            {"data": "some data three"},
        ],
    )
    savepoint.commit()  # or rollback

# commits automatically

Сессия:

Session = sessionmaker(engine)

with Session.begin() as session:
    savepoint = session.begin_nested()
    session.add_all(
        [
            SomeClass(data="some data one"),
            SomeClass(data="some data two"),
            SomeClass(data="some data three"),
        ]
    )
    savepoint.commit()  # or rollback
# commits automatically

Явное начало

Session имеет поведение «автозапуска», означающее, что как только операции начинают выполняться, он обеспечивает наличие SessionTransaction для отслеживания текущих операций. Эта операция завершается при вызове Session.commit().

Часто желательно, особенно при интеграции фреймворков, контролировать момент, когда происходит операция «begin». Для этого Session использует стратегию «autobegin», так что метод Session.begin() может быть вызван непосредственно для Session, у которого еще не началась транзакция:

Session = sessionmaker(bind=engine)
session = Session()
session.begin()
try:
    item1 = session.get(Item, 1)
    item2 = session.get(Item, 2)
    item1.foo = "bar"
    item2.bar = "foo"
    session.commit()
except:
    session.rollback()
    raise

Приведенный выше шаблон более идиоматично вызывается с помощью менеджера контекста:

Session = sessionmaker(bind=engine)
session = Session()
with session.begin():
    item1 = session.get(Item, 1)
    item2 = session.get(Item, 2)
    item1.foo = "bar"
    item2.bar = "foo"

Метод Session.begin() и процесс «autobegin» сессии используют одну и ту же последовательность шагов для начала транзакции. Это включает вызов события SessionEvents.after_transaction_create() при его возникновении; этот хук используется фреймворками для интеграции собственных транзакционных процессов с процессами ORM Session.

Включение двухфазной фиксации

Для баз данных, поддерживающих двухфазную работу (в настоящее время это MySQL и PostgreSQL), сессии можно указать использовать семантику двухфазной фиксации. Это позволит координировать фиксацию транзакций между базами данных таким образом, что транзакция будет либо зафиксирована, либо откачена во всех базах данных. Вы также можете Session.prepare() использовать сессию для взаимодействия с транзакциями, не управляемыми SQLAlchemy. Для использования двухфазных транзакций установите флаг twophase=True на сессии:

engine1 = create_engine("postgresql+psycopg2://db1")
engine2 = create_engine("postgresql+psycopg2://db2")

Session = sessionmaker(twophase=True)

# bind User operations to engine 1, Account operations to engine 2
Session.configure(binds={User: engine1, Account: engine2})

session = Session()

# .... work with accounts and users

# commit.  session will issue a flush to all DBs, and a prepare step to all DBs,
# before committing both transactions
session.commit()

Настройка уровней изоляции транзакций / DBAPI AUTOCOMMIT

Большинство DBAPI поддерживают концепцию настраиваемых уровней транзакций isolation. Традиционно это четыре уровня «READ UNCOMMITTED», «READ COMMITTED», «REPEATABLE READ» и «SERIALIZABLE». Они обычно применяются к соединению DBAPI перед началом новой транзакции, при этом следует отметить, что большинство DBAPI начинают транзакцию неявно при первом выполнении SQL-запросов.

DBAPI, поддерживающие уровни изоляции, обычно также поддерживают концепцию истинного «автокоммита», что означает, что само соединение DBAPI будет переведено в нетранзакционный режим автокоммита. Это обычно означает, что типичное поведение DBAPI, заключающееся в автоматической передаче «BEGIN» в базу данных, больше не происходит, но оно может включать и другие директивы. При использовании этого режима ** DBAPI не использует транзакцию ни при каких обстоятельствах**. Методы SQLAlchemy, такие как .begin(), .commit() и .rollback(), проходят молча.

Диалекты SQLAlchemy поддерживают настраиваемые режимы изоляции на основе per-Engine или per-Connection, используя флаги как на уровне create_engine(), так и на уровне Connection.execution_options().

При использовании ORM Session он действует как фасад для движков и соединений, но не обеспечивает изоляцию транзакций напрямую. Поэтому для того, чтобы повлиять на уровень изоляции транзакций, мы должны действовать на Engine или Connection в зависимости от ситуации.

См.также

Установка уровней изоляции транзакций, включая DBAPI Autocommit - не забудьте также просмотреть, как работают уровни изоляции на уровне объекта SQLAlchemy Connection.

Настройка изоляции для производителя сеансов / двигателя в целом

Чтобы установить Session или sessionmaker с определенным уровнем изоляции глобально, первая техника заключается в том, что Engine может быть построена против определенного уровня изоляции во всех случаях, которая затем используется как источник связности для Session и/или sessionmaker:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

eng = create_engine(
    "postgresql+psycopg2://scott:tiger@localhost/test",
    isolation_level="REPEATABLE READ",
)

Session = sessionmaker(eng)

Другой вариант, полезный, если одновременно будут работать два движка с разными уровнями изоляции, заключается в использовании метода Engine.execution_options(), который создаст неглубокую копию исходного Engine, разделяющую тот же пул соединений, что и родительский движок. Это часто предпочтительнее, когда операции разделяются на «транзакционные» и «автокоммитные»:

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

eng = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")

autocommit_engine = eng.execution_options(isolation_level="AUTOCOMMIT")

transactional_session = sessionmaker(eng)
autocommit_session = sessionmaker(autocommit_engine)

Выше, оба «eng» и "autocommit_engine" используют один и тот же диалект и пул соединений. Однако режим «AUTOCOMMIT» будет установлен на соединениях, когда они будут получены от autocommit_engine. Два объекта sessionmaker «transactional_session» и «autocommit_session" затем наследуют эти характеристики при работе с соединениями базы данных.

«autocommit_session» продолжает иметь транзакционную семантику, включая то, что Session.commit() и Session.rollback() по-прежнему считают себя объектами «фиксации» и «отката», однако транзакция будет молчаливо отсутствовать. По этой причине типично, хотя и не строго обязательно, чтобы сессия с изоляцией AUTOCOMMIT использовалась только для чтения, то есть:

with autocommit_session() as session:
    some_objects = session.execute("<statement>")
    some_other_objects = session.execute("<statement>")

# closes connection

Настройка изоляции для индивидуальных сеансов

Когда мы создаем новый Session, либо используя конструктор напрямую, либо вызывая вызываемый объект, созданный sessionmaker, мы можем передать аргумент bind напрямую, переопределив предварительно существующую привязку. Например, мы можем создать наш Session из стандартного sessionmaker и передать набор движков для автокоммита:

plain_engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")

autocommit_engine = plain_engine.execution_options(isolation_level="AUTOCOMMIT")

# will normally use plain_engine
Session = sessionmaker(plain_engine)

# make a specific Session that will use the "autocommit" engine
with Session(bind=autocommit_engine) as session:
    # work with session
    ...

Для случая, когда Session или sessionmaker настроены с несколькими «привязками», мы можем либо повторно указать аргумент binds полностью, либо, если мы хотим заменить только определенные привязки, мы можем использовать методы Session.bind_mapper() или Session.bind_table():

with Session() as session:
    session.bind_mapper(User, autocommit_engine)

Настройка изоляции для отдельных транзакций

Ключевым предостережением относительно уровня изоляции является то, что этот параметр не может быть безопасно изменен на Connection, где транзакция уже началась. Базы данных не могут изменить уровень изоляции транзакции, находящейся в процессе выполнения, а некоторые DBAPI и диалекты SQLAlchemy имеют непоследовательное поведение в этой области.

Поэтому предпочтительнее использовать Session, который заранее привязан к движку с желаемым уровнем изоляции. Однако на уровень изоляции для каждого соединения можно повлиять, используя метод Session.connection() в начале транзакции:

from sqlalchemy.orm import Session

# assume session just constructed
sess = Session(bind=engine)

# call connection() with options before any other operations proceed.
# this will procure a new connection from the bound engine and begin a real
# database transaction.
sess.connection(execution_options={"isolation_level": "SERIALIZABLE"})

# ... work with session in SERIALIZABLE isolation level...

# commit transaction.  the connection is released
# and reverted to its previous isolation level.
sess.commit()

# subsequent to commit() above, a new transaction may be begun if desired,
# which will proceed with the previous default isolation level unless
# it is set again.

Выше мы сначала создаем Session, используя либо конструктор, либо sessionmaker. Затем мы явно устанавливаем начало транзакции на уровне базы данных, вызывая Session.connection(), который обеспечивает параметры выполнения, которые будут переданы соединению перед началом транзакции на уровне базы данных. Транзакция выполняется с выбранным уровнем изоляции. Когда транзакция завершается, уровень изоляции для соединения сбрасывается до значения по умолчанию, после чего соединение возвращается в пул соединений.

Метод Session.begin() может также использоваться для начала транзакции уровня Session; вызов Session.connection(), следующий за этим вызовом, может использоваться для установки уровня изоляции для каждого соединения-транзакции:

sess = Session(bind=engine)

with sess.begin():
    # call connection() with options before any other operations proceed.
    # this will procure a new connection from the bound engine and begin a
    # real database transaction.
    sess.connection(execution_options={"isolation_level": "SERIALIZABLE"})

    # ... work with session in SERIALIZABLE isolation level...

# outside the block, the transaction has been committed.  the connection is
# released and reverted to its previous isolation level.

Отслеживание состояния транзакции с помощью событий

Обзор доступных крючков событий для изменения состояния транзакции сеанса см. в разделе События транзакции.

Присоединение сеанса к внешней транзакции (например, для тестовых наборов)

Если используется Connection, который уже находится в транзакционном состоянии (т.е. имеет установленный Transaction), то можно заставить Session участвовать в этой транзакции, просто привязав Session к этому Connection. Обычное обоснование для этого - набор тестов, позволяющий ORM-коду свободно работать с Session, включая возможность вызова Session.commit(), после чего все взаимодействие с базой данных откатывается.

Изменено в версии 2.0: В версии 2.0 рецепт «присоединиться к внешней транзакции» снова усовершенствован; обработчики событий для «сброса» вложенной транзакции больше не требуются.

Рецепт работает путем создания Connection внутри транзакции и, опционально, SAVEPOINT, затем передается Session в качестве «привязки»; параметр Session.join_transaction_mode передается с настройкой "create_savepoint", которая указывает, что должны быть созданы новые SAVEPOINT для реализации BEGIN/COMMIT/ROLLBACK для Session, который оставит внешнюю транзакцию в том же состоянии, в котором она была передана.

Когда тест завершается, внешняя транзакция откатывается, чтобы все изменения данных в ходе теста были возвращены:

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from unittest import TestCase

# global application scope.  create Session class, engine
Session = sessionmaker()

engine = create_engine("postgresql+psycopg2://...")


class SomeTest(TestCase):
    def setUp(self):
        # connect to the database
        self.connection = engine.connect()

        # begin a non-ORM transaction
        self.trans = self.connection.begin()

        # bind an individual Session to the connection, selecting
        # "create_savepoint" join_transaction_mode
        self.session = Session(
            bind=self.connection, join_transaction_mode="create_savepoint"
        )

    def test_something(self):
        # use the session in tests.

        self.session.add(Foo())
        self.session.commit()

    def test_something_with_rollbacks(self):
        self.session.add(Bar())
        self.session.flush()
        self.session.rollback()

        self.session.add(Foo())
        self.session.commit()

    def tearDown(self):
        self.session.close()

        # rollback - everything that happened with the
        # Session above (including calls to commit())
        # is rolled back.
        self.trans.rollback()

        # return connection to the Engine
        self.connection.close()

Приведенный выше рецепт является частью собственного CI SQLAlchemy, чтобы гарантировать, что он продолжает работать так, как ожидается.

Вернуться на верх