• en
  • Язык: ru
  • Версия: 3

Потребители

Хотя Channels построен вокруг базовой низкоуровневой спецификации ASGI, он больше предназначен для взаимодействия, чем для написания сложных приложений. Таким образом, Channels предоставляет вам Consumers, богатую абстракцию, которая позволяет вам легко создавать ASGI-приложения.

Потребители делают несколько вещей в частности:

  • Структурирует ваш код как серию функций, вызываемых всякий раз, когда происходит событие, вместо того, чтобы заставлять вас писать цикл событий.

  • Позволяет вам писать синхронный или асинхронный код и решает за вас вопросы передачи и управления потоками.

Конечно, вы можете игнорировать потребителей и использовать другие части Channels - такие как маршрутизация, обработка сессий и аутентификация - в любом приложении ASGI, но в целом они являются лучшим способом написания кода приложения.

Основной макет

Потребитель - это подкласс либо channels.consumer.AsyncConsumer, либо channels.consumer.SyncConsumer. Как следует из этих названий, один из них ожидает от вас написания кода с поддержкой async, в то время как другой будет выполнять ваш код синхронно в пуле потоков за вас.

Рассмотрим базовый пример SyncConsumer:

from channels.consumer import SyncConsumer

class EchoConsumer(SyncConsumer):

    def websocket_connect(self, event):
        self.send({
            "type": "websocket.accept",
        })

    def websocket_receive(self, event):
        self.send({
            "type": "websocket.send",
            "text": event["text"],
        })

Это очень простой эхо-сервер WebSocket - он будет принимать все входящие соединения WebSocket, а затем отвечать на все входящие текстовые кадры WebSocket одним и тем же текстом.

Потребители структурированы вокруг серии именованных методов, соответствующих значению type сообщений, которые они собираются получить, с любыми ., замененными на _. Два обработчика выше обрабатывают сообщения websocket.connect и << 4 >>> соответственно.

Откуда мы знали, какие типы событий мы получим и что в них будет (например, websocket.receive имеет ключ text)? Потому что мы разработали это приложение в соответствии со спецификацией ASGI WebSocket, которая говорит нам, как представляются WebSockets - подробнее об этом читайте в ASGI - и защитили это приложение маршрутизатором, который проверяет тип области видимости websocket - подробнее об этом читайте в Маршрутизация.

Кроме этого, единственным базовым API является self.send(event). Это позволяет вам отправлять события обратно клиенту или серверу протокола, как определено протоколом - если вы прочитаете протокол WebSocket, вы увидите, что диктант, который мы отправили выше, - это способ отправки текстового фрейма клиенту.

AsyncConsumer устроен очень похоже, но все методы обработчика должны быть корутинами, а self.send является корутиной:

from channels.consumer import AsyncConsumer

class EchoConsumer(AsyncConsumer):

    async def websocket_connect(self, event):
        await self.send({
            "type": "websocket.accept",
        })

    async def websocket_receive(self, event):
        await self.send({
            "type": "websocket.send",
            "text": event["text"],
        })

Когда следует использовать AsyncConsumer, а когда SyncConsumer? Главное, что нужно учитывать, это то, с чем вы разговариваете. Если вы вызовете медленную синхронную функцию внутри AsyncConsumer, вы задержите весь цикл событий, поэтому они полезны, только если вы также вызываете асинхронный код (например, используя HTTPX для параллельной выборки 20 страниц).

Если вы вызываете какую-либо часть ORM или другого синхронного кода Django, вы должны использовать SyncConsumer, так как это позволит запустить весь потребитель в потоке и остановить ваши ORM-запросы, блокирующие весь сервер.

Мы рекомендуем писать SyncConsumers по умолчанию, а AsyncConsumers использовать только в тех случаях, когда вы знаете, что делаете что-то, что будет улучшено асинхронной обработкой (длительные задачи, которые могут выполняться параллельно) и вы используете только async-нативные библиотеки.

Если вы действительно хотите вызвать синхронную функцию из AsyncConsumer, посмотрите на asgiref.sync.sync_to_async, которая является утилитой, используемой Channels для запуска SyncConsumers в потоковых пулах, и может превратить любую синхронную вызываемую функцию в асинхронную корутину.

Важно

Если вы хотите вызвать Django ORM из AsyncConsumer (или любого другого асинхронного кода), вам следует использовать адаптер database_sync_to_async вместо этого. Подробнее см. в Доступ к базе данных.

Закрытие потребителей

Когда сокет или соединение, подключенное к вашему потребителю, закрывается - вами или клиентом - вы, скорее всего, получите событие, отправленное вам (например, http.disconnect или << 1 >>>), и вашему экземпляру приложения будет предоставлено короткое время, чтобы действовать в соответствии с ним.

Как только вы закончите очистку после отключения, вам нужно поднять channels.exceptions.StopConsumer, чтобы остановить ASGI-приложение и позволить серверу очистить его. Если вы оставите его работать - не поднимите это исключение - сервер достигнет тайм-аута закрытия приложения (который в Daphne по умолчанию составляет 10 секунд), а затем убьет ваше приложение и выдаст предупреждение.

Приведенные ниже общие потребители делают это за вас, так что это необходимо только в том случае, если вы пишете свой собственный класс потребителя на основе AsyncConsumer или << 1 >>>. Однако, если вы переопределите их метод SyncConsumer или заблокируете методы обработки, которые он вызывает, от возврата, вы все равно можете столкнуться с этим; посмотрите их исходный код, если хотите получить больше информации.

Кроме того, если вы запускаете свои собственные фоновые программы, убедитесь, что вы также закрываете их по завершении соединения, иначе вы будете передавать программы на сервер.

Слои канала

Потребители также позволяют вам работать с канальными слоями Channel, чтобы позволить им отправлять сообщения между собой либо один на один, либо через широковещательную систему, называемую группами.

Потребители будут использовать канальный слой default, если не установлен атрибут channel_layer_alias при подклассификации любого из предоставленных классов Consumer.

Для использования слоя канала echo_alias мы установим его следующим образом:

from channels.consumer import SyncConsumer

class EchoConsumer(SyncConsumer):
    channel_layer_alias = "echo_alias"

Подробнее вы можете прочитать в Слои канала.

Область применения

Потребители получают scope соединения при вызове, который содержит много информации, которую вы найдете на request объекте в представлении Django. Она доступна в виде self.scope внутри методов потребителя.

Области применения являются частью ASGI specification, но вот некоторые общие, которые вы можете захотеть использовать:

  • scope["path"], путь в запросе. (HTTP и WebSocket).

  • scope["headers"], необработанные пары заголовков имя/значение из запроса (HTTP и WebSocket).

  • scope["method"], имя метода, используемого для запроса. (HTTP)

Если вы включите такие вещи, как Аутентификация, вы также сможете получить доступ к объекту user как scope["user"], а URLRouter, например, поместит захваченные группы из URL в scope["url_route"].

В общем, область видимости - это место для получения информации о соединении и место, куда промежуточное ПО помещает атрибуты, к которым хочет предоставить вам доступ (подобно тому, как промежуточное ПО Django добавляет вещи в request).

Для получения полного списка того, что может происходить в области соединения, посмотрите базовую спецификацию ASGI для протокола, который вы завершаете, плюс любое промежуточное программное обеспечение или код маршрутизации, который вы используете. Области веб (HTTP и WebSocket) доступны в the Web ASGI spec.

Общие потребители

То, что вы видите выше, является базовой схемой потребителя, который работает для любого протокола. Подобно генеральным представлениям в Django, Channels поставляется с генеральными потребителями, которые оборачивают общую функциональность, чтобы вам не нужно было переписывать ее, в частности, для обработки HTTP и WebSocket.

WebsocketConsumer

Доступно как channels.generic.websocket.WebsocketConsumer, это оборачивает многословную отправку и получение сообщений обычного ASGI в обработку, которая работает только с текстовыми и двоичными кадрами:

from channels.generic.websocket import WebsocketConsumer

class MyConsumer(WebsocketConsumer):
    groups = ["broadcast"]

    def connect(self):
        # Called on connection.
        # To accept the connection call:
        self.accept()
        # Or accept the connection and specify a chosen subprotocol.
        # A list of subprotocols specified by the connecting client
        # will be available in self.scope['subprotocols']
        self.accept("subprotocol")
        # To reject the connection, call:
        self.close()

    def receive(self, text_data=None, bytes_data=None):
        # Called with either text_data or bytes_data for each frame
        # You can call:
        self.send(text_data="Hello world!")
        # Or, to send a binary frame:
        self.send(bytes_data="Hello world!")
        # Want to force-close the connection? Call:
        self.close()
        # Or add a custom WebSocket error code!
        self.close(code=4123)

    def disconnect(self, close_code):
        # Called when the socket closes

Вы также можете поднять channels.exceptions.AcceptConnection или channels.exceptions.DenyConnection из любого места внутри метода connect, чтобы принять или отклонить соединение, если вам нужен многоразовый код аутентификации или ограничения скорости, не требующий использования микшинов.

Канал WebsocketConsumer будет автоматически добавлен (при подключении) и удален (при отключении) из всех групп, имена которых содержатся в атрибуте класса потребителя groups. groups должен быть итерабельным, а в качестве бэкенда канала должен быть установлен канальный уровень с поддержкой групп (channels.layers.InMemoryChannelLayer и channels_redis.core.RedisChannelLayer оба поддерживают группы). Если канальный уровень не настроен или канальный уровень не поддерживает группы, подключение к WebsocketConsumer с непустым атрибутом groups вызовет ошибку channels.exceptions.InvalidChannelLayerError. Подробнее см. в разделе Группы.

AsyncWebsocketConsumer

Доступный как channels.generic.websocket.AsyncWebsocketConsumer, он имеет точно такие же методы и сигнатуру, как WebsocketConsumer, но все асинхронно, и функции, которые вам нужно написать, также должны быть асинхронными:

from channels.generic.websocket import AsyncWebsocketConsumer

class MyConsumer(AsyncWebsocketConsumer):
    groups = ["broadcast"]

    async def connect(self):
        # Called on connection.
        # To accept the connection call:
        await self.accept()
        # Or accept the connection and specify a chosen subprotocol.
        # A list of subprotocols specified by the connecting client
        # will be available in self.scope['subprotocols']
        await self.accept("subprotocol")
        # To reject the connection, call:
        await self.close()

    async def receive(self, text_data=None, bytes_data=None):
        # Called with either text_data or bytes_data for each frame
        # You can call:
        await self.send(text_data="Hello world!")
        # Or, to send a binary frame:
        await self.send(bytes_data="Hello world!")
        # Want to force-close the connection? Call:
        await self.close()
        # Or add a custom WebSocket error code!
        await self.close(code=4123)

    async def disconnect(self, close_code):
        # Called when the socket closes

JsonWebsocketConsumer

Доступен как channels.generic.websocket.JsonWebsocketConsumer, работает как WebsocketConsumer, за исключением того, что он будет автоматически кодировать и декодировать в JSON, отправленный как текстовые фреймы WebSocket.

Различия только в API:

  • Ваш метод receive_json должен принимать единственный аргумент content, то есть декодированный объект JSON.

  • self.send_json принимает только один аргумент, content, который будет закодирован в JSON для вас.

Если вы хотите настроить кодирование и декодирование JSON, вы можете переопределить методы класса encode_json и << 1 >>>.

AsyncJsonWebsocketConsumer

Асинхронная версия JsonWebsocketConsumer, доступная как channels.generic.websocket.AsyncJsonWebsocketConsumer. Обратите внимание, что даже encode_json и decode_json являются асинхронными функциями.

AsyncHttpConsumer

Доступный как channels.generic.http.AsyncHttpConsumer, он предлагает базовые примитивы для реализации конечной точки HTTP:

from channels.generic.http import AsyncHttpConsumer

class BasicHttpConsumer(AsyncHttpConsumer):
    async def handle(self, body):
        await asyncio.sleep(10)
        await self.send_response(200, b"Your response bytes", headers=[
            (b"Content-Type", b"text/plain"),
        ])

Ожидается, что вы реализуете свой собственный метод handle. Метод получает все тело запроса в виде одного байта. Заголовки могут быть переданы либо в виде списка кортежей, либо в виде словаря. Содержимое тела ответа ожидается как байтовая строка.

Вы также можете реализовать метод disconnect, если хотите запустить код при отключении - например, чтобы завершить все запущенные вами корутины. Этот метод будет выполняться даже при нечистом отключении, поэтому не ожидайте, что handle закончил выполнение чисто.

Если вам нужно больше контроля над ответом, например, для реализации длинного опроса, используйте вместо этого методы более низкого уровня self.send_headers и self.send_body. В этом примере уже упоминаются канальные слои, которые будут подробно описаны позже:

import json
from channels.generic.http import AsyncHttpConsumer

class LongPollConsumer(AsyncHttpConsumer):
    async def handle(self, body):
        await self.send_headers(headers=[
            (b"Content-Type", b"application/json"),
        ])
        # Headers are only sent after the first body event.
        # Set "more_body" to tell the interface server to not
        # finish the response yet:
        await self.send_body(b"", more_body=True)

    async def chat_message(self, event):
        # Send JSON and finish the response:
        await self.send_body(json.dumps(event).encode("utf-8"))

Конечно, вы также можете использовать эти примитивы для реализации конечной точки HTTP для Server-sent events:

from datetime import datetime
from channels.generic.http import AsyncHttpConsumer

class ServerSentEventsConsumer(AsyncHttpConsumer):
    async def handle(self, body):
        await self.send_headers(headers=[
            (b"Cache-Control", b"no-cache"),
            (b"Content-Type", b"text/event-stream"),
            (b"Transfer-Encoding", b"chunked"),
        ])
        while True:
            payload = "data: %s\n\n" % datetime.now().isoformat()
            await self.send_body(payload.encode("utf-8"), more_body=True)
            await asyncio.sleep(1)
Вернуться на верх