Транзакции и управление соединениями¶
Управление транзакциями¶
Изменено в версии 1.4: Управление транзакциями сеанса было пересмотрено, чтобы стать более понятным и простым в использовании. В частности, теперь оно включает операцию «autobegin», что означает, что момент начала транзакции можно контролировать, не используя традиционный режим «autocommit».
Session
отслеживает состояние одной «виртуальной» транзакции за раз, используя объект под названием SessionTransaction
. Затем этот объект использует базовый Engine
или движки, к которым привязан объект Session
, чтобы начать реальные транзакции на уровне соединения, используя объект Connection
по мере необходимости.
Эта «виртуальная» транзакция создается автоматически, когда это необходимо, или может быть запущена с помощью метода Session.begin()
. В максимально возможной степени использование менеджера контекста Python поддерживается как на уровне создания объектов Session
, так и для поддержания области видимости SessionTransaction
.
Ниже, предположим, что мы начинаем с Session
:
from sqlalchemy.orm import Session
session = Session(engine)
Теперь мы можем выполнять операции внутри разграниченной транзакции с помощью менеджера контекста:
with session.begin():
session.add(some_object())
session.add(some_other_object())
# commits transaction at the end, or rolls back if there
# was an exception raised
В конце вышеупомянутого контекста, если не возникло исключений, все ожидающие объекты будут сброшены в базу данных, а транзакция базы данных будет зафиксирована. Если в вышеуказанном блоке возникло исключение, то транзакция будет откатана. В обоих случаях после выхода из блока вышеупомянутый Session
готов к использованию в последующих транзакциях.
Метод Session.begin()
является необязательным, а Session
может также использоваться в подходе commit-as-you-go, где он будет начинать транзакции автоматически по мере необходимости; их нужно только зафиксировать или откатить:
session = Session(engine)
session.add(some_object())
session.add(some_other_object())
session.commit() # commits
# will automatically begin again
result = session.execute("< some select statement >")
session.add_all([more_objects, ...])
session.commit() # commits
session.add(still_another_object)
session.flush() # flush still_another_object
session.rollback() # rolls back still_another_object
Сам Session
содержит метод Session.close()
. Если Session
запускается в транзакции, которая еще не была зафиксирована или откачена, этот метод отменит (т.е. откатит) транзакцию, а также удалит все объекты, содержащиеся в состоянии объекта Session
. Если Session
используется таким образом, что вызов Session.commit()
или Session.rollback()
не гарантирован (например, не в менеджере контекста или подобном), метод close
может быть использован для обеспечения освобождения всех ресурсов:
# expunges all objects, releases all transactions unconditionally
# (with rollback), releases all database connections back to their
# engines
session.close()
Наконец, сам процесс создания/закрытия сессии может быть запущен через контекстный менеджер. Это лучший способ обеспечить, чтобы область использования объекта Session
была ограничена в пределах фиксированного блока. Иллюстрация на примере конструктора Session
first:
with Session(engine) as session:
session.add(some_object())
session.add(some_other_object())
session.commit() # commits
session.add(still_another_object)
session.flush() # flush still_another_object
session.commit() # commits
result = session.execute("<some SELECT statement>")
# remaining transactional state from the .execute() call is
# discarded
Аналогично, sessionmaker
может быть использован таким же образом:
Session = sessionmaker(engine)
with Session() as session:
with session.begin():
session.add(some_object)
# commits
# closes the Session
Сам sessionmaker
включает в себя метод sessionmaker.begin()
, позволяющий выполнять обе операции одновременно:
with Session.begin() as session:
session.add(some_object)
Использование SAVEPOINT¶
Транзакции SAVEPOINT, если они поддерживаются базовым механизмом, могут быть разграничены с помощью метода Session.begin_nested()
:
Session = sessionmaker()
with Session.begin() as session:
session.add(u1)
session.add(u2)
nested = session.begin_nested() # establish a savepoint
session.add(u3)
nested.rollback() # rolls back u3, keeps u1 and u2
# commits u1 and u2
При каждом вызове Session.begin_nested()
в базу данных в рамках текущей транзакции базы данных (если она еще не началась) передается новая команда «BEGIN SAVEPOINT» и возвращается объект типа SessionTransaction
, который представляет собой хэндл к этому SAVEPOINT. При вызове метода .commit()
на этом объекте в базу данных передается сообщение «RELEASE SAVEPOINT», а если вместо этого вызывается метод .rollback()
, то передается сообщение «ROLLBACK TO SAVEPOINT». Охватывающая транзакция базы данных продолжает выполняться.
Session.begin_nested()
обычно используется в качестве менеджера контекста, где могут быть пойманы конкретные ошибки отдельных экземпляров, в сочетании с откатом, созданным для этой части состояния транзакции, без отката всей транзакции, как в примере ниже:
for record in records:
try:
with session.begin_nested():
session.merge(record)
except:
print("Skipped record %s" % record)
session.commit()
Когда менеджер контекста, переданный Session.begin_nested()
, завершает работу, он «фиксирует» точку сохранения, что включает в себя обычное поведение очистки всех ожидающих состояний. При возникновении ошибки точка сохранения откатывается, и состояние Session
, локальное для объектов, которые были изменены, становится недействительным.
Этот шаблон идеально подходит для таких ситуаций, как использование PostgreSQL и перехват IntegrityError
для обнаружения дублирующихся строк; PostgreSQL обычно прерывает всю транзакцию при возникновении такой ошибки, однако при использовании SAVEPOINT внешняя транзакция сохраняется. В приведенном ниже примере список данных сохраняется в базе данных, при этом случайные записи с «дублирующимся первичным ключом» пропускаются, без отката всей операции:
from sqlalchemy import exc
with session.begin():
for record in records:
try:
with session.begin_nested():
obj = SomeRecord(id=record["identifier"], name=record["name"])
session.add(obj)
except exc.IntegrityError:
print(f"Skipped record {record} - row already exists")
Когда вызывается Session.begin_nested()
, Session
сначала сбрасывает все текущее состояние в базу данных; это происходит безоговорочно, независимо от значения параметра Session.autoflush
, который обычно может использоваться для отключения автоматического сбрасывания. Такое поведение объясняется тем, что при откате этой вложенной транзакции Session
может уничтожить все состояния в памяти, которые были созданы в рамках SAVEPOINT, гарантируя при этом, что при обновлении объектов с истекшим сроком хранения состояние графа объектов до начала SAVEPOINT будет доступно для повторной загрузки из базы данных.
В современных версиях SQLAlchemy при откате SAVEPOINT, инициированного командой Session.begin_nested()
, состояние объекта в памяти, которое было изменено с момента создания SAVEPOINT, уничтожается, однако состояние объекта, которое не было изменено с момента создания SAVEPOINT, сохраняется. Это делается для того, чтобы последующие операции могли продолжать использовать незатронутые данные без необходимости их обновления из базы данных.
См.также
Connection.begin_nested()
- Основной API SAVEPOINT
Управление транзакциями на уровне сеанса и на уровне двигателя¶
Connection
в Core и _session.Session
в ORM имеют эквивалентную транзакционную семантику, как на уровне sessionmaker
против Engine
, так и Session
против Connection
. В следующих разделах подробно описаны эти сценарии, основанные на следующей схеме:
ORM Core
----------------------------------------- -----------------------------------
sessionmaker Engine
Session Connection
sessionmaker.begin() Engine.begin()
some_session.commit() some_connection.commit()
with some_sessionmaker() as session: with some_engine.connect() as conn:
with some_sessionmaker.begin() as session: with some_engine.begin() as conn:
with some_session.begin_nested() as sp: with some_connection.begin_nested() as sp:
Принимайте обязательства по ходу дела¶
Как Session
, так и Connection
имеют методы Connection.commit()
и Connection.rollback()
. Используя операции в стиле SQLAlchemy 2.0, эти методы во всех случаях воздействуют на внешнюю транзакцию. Для Session
предполагается, что Session.autobegin
оставлен в значении по умолчанию True
.
engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")
with engine.connect() as conn:
conn.execute(
some_table.insert(),
[
{"data": "some data one"},
{"data": "some data two"},
{"data": "some data three"},
],
)
conn.commit()
Session = sessionmaker(engine)
with Session() as session:
session.add_all(
[
SomeClass(data="some data one"),
SomeClass(data="some data two"),
SomeClass(data="some data three"),
]
)
session.commit()
Начать однажды¶
И sessionmaker
, и Engine
имеют метод Engine.begin()
, который получает новый объект для выполнения SQL-запросов (Session
и Connection
соответственно), а затем возвращает менеджер контекста, который будет поддерживать контекст начала/завершения/отката для этого объекта.
Двигатель:
engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")
with engine.begin() as conn:
conn.execute(
some_table.insert(),
[
{"data": "some data one"},
{"data": "some data two"},
{"data": "some data three"},
],
)
# commits and closes automatically
Сессия:
Session = sessionmaker(engine)
with Session.begin() as session:
session.add_all(
[
SomeClass(data="some data one"),
SomeClass(data="some data two"),
SomeClass(data="some data three"),
]
)
# commits and closes automatically
Вложенная транзакция¶
При использовании SAVEPOINT с помощью методов Session.begin_nested()
или Connection.begin_nested()
, возвращаемый объект транзакции должен использоваться для фиксации или отката SAVEPOINT. Вызов методов Session.commit()
или Connection.commit()
всегда будет фиксировать внешнюю транзакцию; это специфическое поведение SQLAlchemy 2.0, обратное тому, что было в серии 1.x.
Двигатель:
engine = create_engine("postgresql+psycopg2://user:pass@host/dbname")
with engine.begin() as conn:
savepoint = conn.begin_nested()
conn.execute(
some_table.insert(),
[
{"data": "some data one"},
{"data": "some data two"},
{"data": "some data three"},
],
)
savepoint.commit() # or rollback
# commits automatically
Сессия:
Session = sessionmaker(engine)
with Session.begin() as session:
savepoint = session.begin_nested()
session.add_all(
[
SomeClass(data="some data one"),
SomeClass(data="some data two"),
SomeClass(data="some data three"),
]
)
savepoint.commit() # or rollback
# commits automatically
Явное начало¶
Session
имеет поведение «автозапуска», означающее, что как только операции начинают выполняться, он обеспечивает наличие SessionTransaction
для отслеживания текущих операций. Эта операция завершается при вызове Session.commit()
.
Часто желательно, особенно при интеграции фреймворков, контролировать момент, когда происходит операция «begin». Для этого Session
использует стратегию «autobegin», так что метод Session.begin()
может быть вызван непосредственно для Session
, у которого еще не началась транзакция:
Session = sessionmaker(bind=engine)
session = Session()
session.begin()
try:
item1 = session.get(Item, 1)
item2 = session.get(Item, 2)
item1.foo = "bar"
item2.bar = "foo"
session.commit()
except:
session.rollback()
raise
Приведенный выше шаблон более идиоматично вызывается с помощью менеджера контекста:
Session = sessionmaker(bind=engine)
session = Session()
with session.begin():
item1 = session.get(Item, 1)
item2 = session.get(Item, 2)
item1.foo = "bar"
item2.bar = "foo"
Метод Session.begin()
и процесс «autobegin» сессии используют одну и ту же последовательность шагов для начала транзакции. Это включает вызов события SessionEvents.after_transaction_create()
при его возникновении; этот хук используется фреймворками для интеграции собственных транзакционных процессов с процессами ORM Session
.
Включение двухфазной фиксации¶
Для баз данных, поддерживающих двухфазную работу (в настоящее время это MySQL и PostgreSQL), сессии можно указать использовать семантику двухфазной фиксации. Это позволит координировать фиксацию транзакций между базами данных таким образом, что транзакция будет либо зафиксирована, либо откачена во всех базах данных. Вы также можете Session.prepare()
использовать сессию для взаимодействия с транзакциями, не управляемыми SQLAlchemy. Для использования двухфазных транзакций установите флаг twophase=True
на сессии:
engine1 = create_engine("postgresql+psycopg2://db1")
engine2 = create_engine("postgresql+psycopg2://db2")
Session = sessionmaker(twophase=True)
# bind User operations to engine 1, Account operations to engine 2
Session.configure(binds={User: engine1, Account: engine2})
session = Session()
# .... work with accounts and users
# commit. session will issue a flush to all DBs, and a prepare step to all DBs,
# before committing both transactions
session.commit()
Настройка уровней изоляции транзакций / DBAPI AUTOCOMMIT¶
Большинство DBAPI поддерживают концепцию настраиваемых уровней транзакций isolation. Традиционно это четыре уровня «READ UNCOMMITTED», «READ COMMITTED», «REPEATABLE READ» и «SERIALIZABLE». Они обычно применяются к соединению DBAPI перед началом новой транзакции, при этом следует отметить, что большинство DBAPI начинают транзакцию неявно при первом выполнении SQL-запросов.
DBAPI, поддерживающие уровни изоляции, обычно также поддерживают концепцию истинного «автокоммита», что означает, что само соединение DBAPI будет переведено в нетранзакционный режим автокоммита. Это обычно означает, что типичное поведение DBAPI, заключающееся в автоматической передаче «BEGIN» в базу данных, больше не происходит, но оно может включать и другие директивы. При использовании этого режима ** DBAPI не использует транзакцию ни при каких обстоятельствах**. Методы SQLAlchemy, такие как .begin()
, .commit()
и .rollback()
, проходят молча.
Диалекты SQLAlchemy поддерживают настраиваемые режимы изоляции на основе per-Engine
или per-Connection
, используя флаги как на уровне create_engine()
, так и на уровне Connection.execution_options()
.
При использовании ORM Session
он действует как фасад для движков и соединений, но не обеспечивает изоляцию транзакций напрямую. Поэтому для того, чтобы повлиять на уровень изоляции транзакций, мы должны действовать на Engine
или Connection
в зависимости от ситуации.
См.также
Установка уровней изоляции транзакций, включая DBAPI Autocommit - не забудьте также просмотреть, как работают уровни изоляции на уровне объекта SQLAlchemy Connection
.
Настройка изоляции для производителя сеансов / двигателя в целом¶
Чтобы установить Session
или sessionmaker
с определенным уровнем изоляции глобально, первая техника заключается в том, что Engine
может быть построена против определенного уровня изоляции во всех случаях, которая затем используется как источник связности для Session
и/или sessionmaker
:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
eng = create_engine(
"postgresql+psycopg2://scott:tiger@localhost/test",
isolation_level="REPEATABLE READ",
)
Session = sessionmaker(eng)
Другой вариант, полезный, если одновременно будут работать два движка с разными уровнями изоляции, заключается в использовании метода Engine.execution_options()
, который создаст неглубокую копию исходного Engine
, разделяющую тот же пул соединений, что и родительский движок. Это часто предпочтительнее, когда операции разделяются на «транзакционные» и «автокоммитные»:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
eng = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
autocommit_engine = eng.execution_options(isolation_level="AUTOCOMMIT")
transactional_session = sessionmaker(eng)
autocommit_session = sessionmaker(autocommit_engine)
Выше, оба «eng
» и "autocommit_engine"
используют один и тот же диалект и пул соединений. Однако режим «AUTOCOMMIT» будет установлен на соединениях, когда они будут получены от autocommit_engine
. Два объекта sessionmaker
«transactional_session
» и «autocommit_session"
затем наследуют эти характеристики при работе с соединениями базы данных.
«autocommit_session
» продолжает иметь транзакционную семантику, включая то, что Session.commit()
и Session.rollback()
по-прежнему считают себя объектами «фиксации» и «отката», однако транзакция будет молчаливо отсутствовать. По этой причине типично, хотя и не строго обязательно, чтобы сессия с изоляцией AUTOCOMMIT использовалась только для чтения, то есть:
with autocommit_session() as session:
some_objects = session.execute("<statement>")
some_other_objects = session.execute("<statement>")
# closes connection
Настройка изоляции для индивидуальных сеансов¶
Когда мы создаем новый Session
, либо используя конструктор напрямую, либо вызывая вызываемый объект, созданный sessionmaker
, мы можем передать аргумент bind
напрямую, переопределив предварительно существующую привязку. Например, мы можем создать наш Session
из стандартного sessionmaker
и передать набор движков для автокоммита:
plain_engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
autocommit_engine = plain_engine.execution_options(isolation_level="AUTOCOMMIT")
# will normally use plain_engine
Session = sessionmaker(plain_engine)
# make a specific Session that will use the "autocommit" engine
with Session(bind=autocommit_engine) as session:
# work with session
...
Для случая, когда Session
или sessionmaker
настроены с несколькими «привязками», мы можем либо повторно указать аргумент binds
полностью, либо, если мы хотим заменить только определенные привязки, мы можем использовать методы Session.bind_mapper()
или Session.bind_table()
:
with Session() as session:
session.bind_mapper(User, autocommit_engine)
Настройка изоляции для отдельных транзакций¶
Ключевым предостережением относительно уровня изоляции является то, что этот параметр не может быть безопасно изменен на Connection
, где транзакция уже началась. Базы данных не могут изменить уровень изоляции транзакции, находящейся в процессе выполнения, а некоторые DBAPI и диалекты SQLAlchemy имеют непоследовательное поведение в этой области.
Поэтому предпочтительнее использовать Session
, который заранее привязан к движку с желаемым уровнем изоляции. Однако на уровень изоляции для каждого соединения можно повлиять, используя метод Session.connection()
в начале транзакции:
from sqlalchemy.orm import Session
# assume session just constructed
sess = Session(bind=engine)
# call connection() with options before any other operations proceed.
# this will procure a new connection from the bound engine and begin a real
# database transaction.
sess.connection(execution_options={"isolation_level": "SERIALIZABLE"})
# ... work with session in SERIALIZABLE isolation level...
# commit transaction. the connection is released
# and reverted to its previous isolation level.
sess.commit()
# subsequent to commit() above, a new transaction may be begun if desired,
# which will proceed with the previous default isolation level unless
# it is set again.
Выше мы сначала создаем Session
, используя либо конструктор, либо sessionmaker
. Затем мы явно устанавливаем начало транзакции на уровне базы данных, вызывая Session.connection()
, который обеспечивает параметры выполнения, которые будут переданы соединению перед началом транзакции на уровне базы данных. Транзакция выполняется с выбранным уровнем изоляции. Когда транзакция завершается, уровень изоляции для соединения сбрасывается до значения по умолчанию, после чего соединение возвращается в пул соединений.
Метод Session.begin()
может также использоваться для начала транзакции уровня Session
; вызов Session.connection()
, следующий за этим вызовом, может использоваться для установки уровня изоляции для каждого соединения-транзакции:
sess = Session(bind=engine)
with sess.begin():
# call connection() with options before any other operations proceed.
# this will procure a new connection from the bound engine and begin a
# real database transaction.
sess.connection(execution_options={"isolation_level": "SERIALIZABLE"})
# ... work with session in SERIALIZABLE isolation level...
# outside the block, the transaction has been committed. the connection is
# released and reverted to its previous isolation level.
Отслеживание состояния транзакции с помощью событий¶
Обзор доступных крючков событий для изменения состояния транзакции сеанса см. в разделе События транзакции.
Присоединение сеанса к внешней транзакции (например, для тестовых наборов)¶
Если используется Connection
, который уже находится в транзакционном состоянии (т.е. имеет установленный Transaction
), то можно заставить Session
участвовать в этой транзакции, просто привязав Session
к этому Connection
. Обычное обоснование для этого - набор тестов, позволяющий ORM-коду свободно работать с Session
, включая возможность вызова Session.commit()
, после чего все взаимодействие с базой данных откатывается.
Изменено в версии 2.0: В версии 2.0 рецепт «присоединиться к внешней транзакции» снова усовершенствован; обработчики событий для «сброса» вложенной транзакции больше не требуются.
Рецепт работает путем создания Connection
внутри транзакции и, опционально, SAVEPOINT, затем передается Session
в качестве «привязки»; параметр Session.join_transaction_mode
передается с настройкой "create_savepoint"
, которая указывает, что должны быть созданы новые SAVEPOINT для реализации BEGIN/COMMIT/ROLLBACK для Session
, который оставит внешнюю транзакцию в том же состоянии, в котором она была передана.
Когда тест завершается, внешняя транзакция откатывается, чтобы все изменения данных в ходе теста были возвращены:
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from unittest import TestCase
# global application scope. create Session class, engine
Session = sessionmaker()
engine = create_engine("postgresql+psycopg2://...")
class SomeTest(TestCase):
def setUp(self):
# connect to the database
self.connection = engine.connect()
# begin a non-ORM transaction
self.trans = self.connection.begin()
# bind an individual Session to the connection, selecting
# "create_savepoint" join_transaction_mode
self.session = Session(
bind=self.connection, join_transaction_mode="create_savepoint"
)
def test_something(self):
# use the session in tests.
self.session.add(Foo())
self.session.commit()
def test_something_with_rollbacks(self):
self.session.add(Bar())
self.session.flush()
self.session.rollback()
self.session.add(Foo())
self.session.commit()
def tearDown(self):
self.session.close()
# rollback - everything that happened with the
# Session above (including calls to commit())
# is rolled back.
self.trans.rollback()
# return connection to the Engine
self.connection.close()
Приведенный выше рецепт является частью собственного CI SQLAlchemy, чтобы гарантировать, что он продолжает работать так, как ожидается.