Как разделить (инициализировать и закрыть) aiohttp.ClientSession между асинхронными представлениями Django для использования пула соединений

Django поддерживает async views начиная с версии 3.1, поэтому он отлично подходит для неблокирующих вызовов, например, внешних HTTP API (используя, например, aiohttp).

Я часто вижу следующий пример кода, который я считаю концептуально неправильным (хотя он прекрасно работает):

import aiohttp
from django.http import HttpRequest, HttpResponse

async def view_bad_example1(request: HttpRequest):
    async with aiohttp.ClientSession() as session:
        async with session.get("https://example.com/") as example_response:
            response_text = await example_response.text()
            return HttpResponse(response_text[:42], content_type="text/plain")

Этот код создает ClientSession для каждого входящего запроса, что неэффективно. В этом случае aiohttp нельзя использовать, например, пул соединений.

Не создавайте сессию на каждый запрос. Скорее всего, вам нужна сессия для приложение, которое выполняет все запросы в целом.

Источник: https://docs.aiohttp.org/en/stable/client_quickstart.html#make-a-request

.

То же самое относится к httpx:

С другой стороны, экземпляр Client использует пул соединений HTTP. Это означает, что когда вы делаете несколько запросов к одному и тому же узлу, клиент клиент будет повторно использовать основное TCP-соединение, вместо того чтобы создавать его заново одно для каждого отдельного запроса.

Источник: https://www.python-httpx.org/advanced/#why-use-a-client

.

Есть ли способ глобально инстанцировать aiohttp.ClientSession в Django, чтобы этот экземпляр можно было использовать для нескольких запросов? Не забывайте, что ClientSession должен быть создан в запущенном цикле событий (Почему создание ClientSession вне цикла событий опасно? ), поэтому мы не можем инстанцировать его, например, в настройках Django или как переменную на уровне модуля.

Ближе всего я подошел к этому коду. Однако, я думаю, что этот код уродлив и не рассматривает, например, закрытие сессии.

CLIENT_SESSSION = None

async def view_bad_example2(request: HttpRequest):
    global CLIENT_SESSSION

    if not CLIENT_SESSSION:
        CLIENT_SESSSION = aiohttp.ClientSession()

    example_response = await CLIENT_SESSSION.get("https://example.com/")
    response_text = await example_response.text()

    return HttpResponse(response_text[:42], content_type="text/plain")

В основном я ищу эквивалент Events из FastAPI, который можно использовать для создания/закрытия некоторого ресурса в асинхронном контексте.

Кстати, вот сравнение производительности с использованием k6 между двумя видами:

  • view_bad_example1: avg=1.32s min=900.86ms med=1.14s max=2.22s p(90)=2s p(95)=2.1s
  • view_bad_example2: avg=930.82ms min=528.28ms med=814.31ms max=1.66s p(90)=1.41s p(95)=1.52s

Django не реализует протокол ASGI Lifespan.
Ссылка: https://github.com/django/django/pull/13636

Starlette делает. FastAPI напрямую использует реализацию обработчиков событий Starlette.

Вот как этого можно достичь с помощью Django:

  1. Реализовать протокол ASGI Lifespan в подклассе Django ASGIHandler.
import django
from django.core.asgi import ASGIHandler


class MyASGIHandler(ASGIHandler):
    def __init__(self):
        super().__init__()
        self.on_shutdown = []

    async def __call__(self, scope, receive, send):
        if scope['type'] == 'lifespan':
            while True:
                message = await receive()
                if message['type'] == 'lifespan.startup':
                    # Do some startup here!
                    await send({'type': 'lifespan.startup.complete'})
                elif message['type'] == 'lifespan.shutdown':
                    # Do some shutdown here!
                    await self.shutdown()
                    await send({'type': 'lifespan.shutdown.complete'})
                    return
        await super().__call__(scope, receive, send)

    async def shutdown(self):
        for handler in self.on_shutdown:
            if asyncio.iscoroutinefunction(handler):
                await handler()
            else:
                handler()


def my_get_asgi_application():
    django.setup(set_prefix=False)
    return MyASGIHandler()
  1. Замените application в asgi.py.
# application = get_asgi_application()
application = my_get_asgi_application()
  1. Создайте помощника get_client_session для совместного использования экземпляра:
import asyncio
import aiohttp
from .asgi import application

CLIENT_SESSSION = None

_lock = asyncio.Lock()


async def get_client_session():
    global CLIENT_SESSSION

    async with _lock:
        if not CLIENT_SESSSION:
            CLIENT_SESSSION = aiohttp.ClientSession()
            application.on_shutdown.append(CLIENT_SESSSION.close)

    return CLIENT_SESSSION

Использование:

async def view(request: HttpRequest):
    session = await get_client_session()
    
    example_response = await session.get("https://example.com/")
    response_text = await example_response.text()

    return HttpResponse(response_text[:42], content_type="text/plain")
Вернуться на верх