Как реализовать асинхронный драйвер SQLAlchemy Postgres в проекте Django?
Я столкнулся с очень странным поведением при реализации SQLAlchemy в моем Django проекте с использованием асинхронного драйвера Postgres.
Когда я создаю сессию SQLAlchemy Session с помощью sessionmaker и затем выполняю простой оператор SELECT в менеджере контекста сессии, он генерирует правильный результат в первый раз, но последующие попытки приводят к исключению RuntimeError, говорящему "Task ... attached to a different loop". Я проверил ошибку, и она обычно указывает на то, что сессия сохраняется в нескольких запросах Django; однако в моем коде я не вижу, как это возможно, поскольку я создаю сессию базы данных в запросе, используя sessionmaker, и использую менеджер контекста сессии для ее закрытия.
При перезагрузке страницы в третий раз с использованием подхода менеджера контекста сессии все становится еще более странным, поскольку RuntimeError меняется на InterfaceError, говорящий "невозможно выполнить операцию: выполняется другая операция". Но если не использовать подход менеджера контекста, а создать сессию вручную и сразу после ее использования закрыть сессию, то даже после второй попытки возникает исключение RuntimeError.
И это продолжает становиться странным, когда я делаю второй подход и не ожидаю закрытия сессии, тогда он работает совершенно нормально, кроме сообщения в журнале о том, что метод никогда не был ожидаем.
Я создал репозиторий GitHub с docker-compose.yml, который воспроизводит проблему, если кто-то хочет увидеть это сам. https://github.com/enots227/Django-SQLAlchemy-Async
Django Setup
settings.py
# Database
DB_ENGINE = create_async_engine(URL.create(
"postgresql+asyncpg",
username=os.getenv('DB_USERNAME'),
password=os.getenv('DB_PASSWORD'),
host=os.getenv('DB_HOST'),
port=os.getenv('DB_PORT'),
database=os.getenv('DB_NAME')), echo=True)
DB_SESSION_MAKER = sessionmaker(DB_ENGINE, AsyncSession, expire_on_commit=False)
Подход 1: Использование контекстного менеджера
Вот мой код для представления Django для контекстного менеджера (http://127.0.0.1:8000/sql/1/)
async def view1(request):
html = '<html><body><ul>'
async with settings.DB_SESSION_MAKER() as db_session:
items = await list_accounts(db_session)
for item in items:
html += '<li>' + item.name + '</li>'
html += '</ul></body></html>'
return HttpResponse(html)
При первом выполнении он возвращает правильный ответ:
<html><body><ul><li>test1</li></ul></body></html>
Но если я пытаюсь перезагрузить страницу после первого выполнения, то возникает исключение RuntimeError, в котором говорится: "Задача ... присоединена к другому циклу".
А когда я перезагружаюсь в третий раз, возникает InterfaceError, говорящий "cannot perform operation: another operation is in progress".
Подход 2: ручное создание и закрытие сессии
Вот мой код для представления Django с ручным управлением сессией (http://127.0.0.1:8000/sql/2/)
async def view2(request):
html = '<html><body><ul>'
db_session: AsyncSession = settings.DB_SESSION_MAKER()
items = await list_accounts(db_session)
await db_session.close()
for item in items:
html += '<li>' + item.name + '</li>'
html += '</ul></body></html>'
return HttpResponse(html)
При первом выполнении возвращается правильный ответ, как в подходе 1. После первого выполнения я получаю RuntimeError в подходе 1 и никогда не получаю InterfaceError.
Подход 3: Ручное создание и закрытие сессии (не ожидая закрытия)
Вот мой код для представления Django для ручного управления сессией и не ожидая метода close (http://127.0.0.1:8000/sql/2/)
async def view2(request):
html = '<html><body><ul>'
db_session: AsyncSession = settings.DB_SESSION_MAKER()
items = await list_accounts(db_session)
db_session.close() # logs async warning since missing await, but works
for item in items:
html += '<li>' + item.name + '</li>'
html += '</ul></body></html>'
return HttpResponse(html)
Все выполнения всегда возвращают правильный ответ.