Consumers¶
While Channels is built around a basic low-level spec called ASGI, it’s more designed for interoperability than for writing complex applications in. So, Channels provides you with Consumers, a rich abstraction that allows you to make ASGI applications easily.
Consumers do a couple of things in particular:
Structures your code as a series of functions to be called whenever an event happens, rather than making you write an event loop.
Allow you to write synchronous or async code and deals with handoffs and threading for you.
Of course, you are free to ignore consumers and use the other parts of Channels - like routing, session handling and authentication - with any ASGI app, but they’re generally the best way to write your application code.
Basic Layout¶
A consumer is a subclass of either channels.consumer.AsyncConsumer
or
channels.consumer.SyncConsumer
. As these names suggest, one will expect
you to write async-capable code, while the other will run your code
synchronously in a threadpool for you.
Let’s look at a basic example of a SyncConsumer
:
from channels.consumer import SyncConsumer
class EchoConsumer(SyncConsumer):
def websocket_connect(self, event):
self.send({
"type": "websocket.accept",
})
def websocket_receive(self, event):
self.send({
"type": "websocket.send",
"text": event["text"],
})
This is a very simple WebSocket echo server - it will accept all incoming WebSocket connections, and then reply to all incoming WebSocket text frames with the same text.
Consumers are structured around a series of named methods corresponding to the
type
value of the messages they are going to receive, with any .
replaced by _
. The two handlers above are handling websocket.connect
and websocket.receive
messages respectively.
How did we know what event types we were going to get and what would be
in them (like websocket.receive
having a text
) key? That’s because we
designed this against the ASGI WebSocket specification, which tells us how
WebSockets are presented - read more about it in ASGI - and
protected this application with a router that checks for a scope type of
websocket
- see more about that in Routing.
Apart from that, the only other basic API is self.send(event)
. This lets
you send events back to the client or protocol server as defined by the
protocol - if you read the WebSocket protocol, you’ll see that the dict we
send above is how you send a text frame to the client.
The AsyncConsumer
is laid out very similarly, but all the handler methods
must be coroutines, and self.send
is a coroutine:
from channels.consumer import AsyncConsumer
class EchoConsumer(AsyncConsumer):
async def websocket_connect(self, event):
await self.send({
"type": "websocket.accept",
})
async def websocket_receive(self, event):
await self.send({
"type": "websocket.send",
"text": event["text"],
})
When should you use AsyncConsumer
and when should you use SyncConsumer
?
The main thing to consider is what you’re talking to. If you call a slow
synchronous function from inside an AsyncConsumer
you’re going to hold up
the entire event loop, so they’re only useful if you’re also calling async
code (for example, using HTTPX
to fetch 20 pages in parallel).
If you’re calling any part of Django’s ORM or other synchronous code, you
should use a SyncConsumer
, as this will run the whole consumer in a thread
and stop your ORM queries blocking the entire server.
We recommend that you write SyncConsumers by default, and only use AsyncConsumers in cases where you know you are doing something that would be improved by async handling (long-running tasks that could be done in parallel) and you are only using async-native libraries.
If you really want to call a synchronous function from an AsyncConsumer
,
take a look at asgiref.sync.sync_to_async
, which is the utility that Channels
uses to run SyncConsumers
in threadpools, and can turn any synchronous
callable into an asynchronous coroutine.
Important
If you want to call the Django ORM from an AsyncConsumer
(or any other
asynchronous code), you should use the database_sync_to_async
adapter
instead. See Database Access for more.
Closing Consumers¶
When the socket or connection attached to your consumer is closed - either by
you or the client - you will likely get an event sent to you (for example,
http.disconnect
or websocket.disconnect
), and your application instance
will be given a short amount of time to act on it.
Once you have finished doing your post-disconnect cleanup, you need to raise
channels.exceptions.StopConsumer
to halt the ASGI application cleanly and
let the server clean it up. If you leave it running - by not raising this
exception - the server will reach its application close timeout (which is
10 seconds by default in Daphne) and then kill your application and raise
a warning.
The generic consumers below do this for you, so this is only needed if you
are writing your own consumer class based on AsyncConsumer
or
SyncConsumer
. However, if you override their __call__
method, or
block the handling methods that it calls from returning, you may still run into
this; take a look at their source code if you want more information.
Additionally, if you launch your own background coroutines, make sure to also shut them down when the connection is finished, or you’ll leak coroutines into the server.
Channel Layers¶
Consumers also let you deal with Channel’s channel layers, to let them send messages between each other either one-to-one or via a broadcast system called groups.
Consumers will use the channel layer default
unless
the channel_layer_alias
attribute is set when subclassing any
of the provided Consumer
classes.
To use the channel layer echo_alias
we would set it like so:
from channels.consumer import SyncConsumer
class EchoConsumer(SyncConsumer):
channel_layer_alias = "echo_alias"
You can read more in Channel Layers.
Scope¶
Consumers receive the connection’s scope
when they are called, which
contains a lot of the information you’d find on the request
object in a
Django view. It’s available as self.scope
inside the consumer’s methods.
Scopes are part of the ASGI specification, but here are some common things you might want to use:
scope["path"]
, the path on the request. (HTTP and WebSocket)scope["headers"]
, raw name/value header pairs from the request (HTTP and WebSocket)scope["method"]
, the method name used for the request. (HTTP)
If you enable things like Authentication, you’ll also be able to access
the user object as scope["user"]
, and the URLRouter, for example, will
put captured groups from the URL into scope["url_route"]
.
In general, the scope is the place to get connection information and where
middleware will put attributes it wants to let you access (in the same way
that Django’s middleware adds things to request
).
For a full list of what can occur in a connection scope, look at the basic ASGI spec for the protocol you are terminating, plus any middleware or routing code you are using. The web (HTTP and WebSocket) scopes are available in the Web ASGI spec.
Generic Consumers¶
What you see above is the basic layout of a consumer that works for any protocol. Much like Django’s generic views, Channels ships with generic consumers that wrap common functionality up so you don’t need to rewrite it, specifically for HTTP and WebSocket handling.
WebsocketConsumer¶
Available as channels.generic.websocket.WebsocketConsumer
, this
wraps the verbose plain-ASGI message sending and receiving into handling that
just deals with text and binary frames:
from channels.generic.websocket import WebsocketConsumer
class MyConsumer(WebsocketConsumer):
groups = ["broadcast"]
def connect(self):
# Called on connection.
# To accept the connection call:
self.accept()
# Or accept the connection and specify a chosen subprotocol.
# A list of subprotocols specified by the connecting client
# will be available in self.scope['subprotocols']
self.accept("subprotocol")
# To reject the connection, call:
self.close()
def receive(self, text_data=None, bytes_data=None):
# Called with either text_data or bytes_data for each frame
# You can call:
self.send(text_data="Hello world!")
# Or, to send a binary frame:
self.send(bytes_data="Hello world!")
# Want to force-close the connection? Call:
self.close()
# Or add a custom WebSocket error code!
self.close(code=4123)
def disconnect(self, close_code):
# Called when the socket closes
You can also raise channels.exceptions.AcceptConnection
or
channels.exceptions.DenyConnection
from anywhere inside the connect
method in order to accept or reject a connection, if you want reusable
authentication or rate-limiting code that doesn’t need to use mixins.
A WebsocketConsumer
’s channel will automatically be added to (on connect)
and removed from (on disconnect) any groups whose names appear in the
consumer’s groups
class attribute. groups
must be an iterable, and a
channel layer with support for groups must be set as the channel backend
(channels.layers.InMemoryChannelLayer
and
channels_redis.core.RedisChannelLayer
both support groups). If no channel
layer is configured or the channel layer doesn’t support groups, connecting
to a WebsocketConsumer
with a non-empty groups
attribute will raise
channels.exceptions.InvalidChannelLayerError
. See Groups for more.
AsyncWebsocketConsumer¶
Available as channels.generic.websocket.AsyncWebsocketConsumer
, this has
the exact same methods and signature as WebsocketConsumer
but everything
is async, and the functions you need to write have to be as well:
from channels.generic.websocket import AsyncWebsocketConsumer
class MyConsumer(AsyncWebsocketConsumer):
groups = ["broadcast"]
async def connect(self):
# Called on connection.
# To accept the connection call:
await self.accept()
# Or accept the connection and specify a chosen subprotocol.
# A list of subprotocols specified by the connecting client
# will be available in self.scope['subprotocols']
await self.accept("subprotocol")
# To reject the connection, call:
await self.close()
async def receive(self, text_data=None, bytes_data=None):
# Called with either text_data or bytes_data for each frame
# You can call:
await self.send(text_data="Hello world!")
# Or, to send a binary frame:
await self.send(bytes_data="Hello world!")
# Want to force-close the connection? Call:
await self.close()
# Or add a custom WebSocket error code!
await self.close(code=4123)
async def disconnect(self, close_code):
# Called when the socket closes
JsonWebsocketConsumer¶
Available as channels.generic.websocket.JsonWebsocketConsumer
, this
works like WebsocketConsumer
, except it will auto-encode and decode
to JSON sent as WebSocket text frames.
The only API differences are:
Your
receive_json
method must take a single argument,content
, that is the decoded JSON object.self.send_json
takes only a single argument,content
, which will be encoded to JSON for you.
If you want to customise the JSON encoding and decoding, you can override
the encode_json
and decode_json
classmethods.
AsyncJsonWebsocketConsumer¶
An async version of JsonWebsocketConsumer
, available as
channels.generic.websocket.AsyncJsonWebsocketConsumer
. Note that even
encode_json
and decode_json
are async functions.
AsyncHttpConsumer¶
Available as channels.generic.http.AsyncHttpConsumer
, this offers basic
primitives to implement a HTTP endpoint:
from channels.generic.http import AsyncHttpConsumer
class BasicHttpConsumer(AsyncHttpConsumer):
async def handle(self, body):
await asyncio.sleep(10)
await self.send_response(200, b"Your response bytes", headers=[
(b"Content-Type", b"text/plain"),
])
You are expected to implement your own handle
method. The
method receives the whole request body as a single bytestring. Headers
may either be passed as a list of tuples or as a dictionary. The
response body content is expected to be a bytestring.
You can also implement a disconnect
method if you want to run code on
disconnect - for example, to shut down any coroutines you launched. This will
run even on an unclean disconnection, so don’t expect that handle
has
finished running cleanly.
If you need more control over the response, e.g. for implementing long
polling, use the lower level self.send_headers
and self.send_body
methods instead. This example already mentions channel layers which will
be explained in detail later:
import json
from channels.generic.http import AsyncHttpConsumer
class LongPollConsumer(AsyncHttpConsumer):
async def handle(self, body):
await self.send_headers(headers=[
(b"Content-Type", b"application/json"),
])
# Headers are only sent after the first body event.
# Set "more_body" to tell the interface server to not
# finish the response yet:
await self.send_body(b"", more_body=True)
async def chat_message(self, event):
# Send JSON and finish the response:
await self.send_body(json.dumps(event).encode("utf-8"))
Of course you can also use those primitives to implement a HTTP endpoint for Server-sent events:
from datetime import datetime
from channels.generic.http import AsyncHttpConsumer
class ServerSentEventsConsumer(AsyncHttpConsumer):
async def handle(self, body):
await self.send_headers(headers=[
(b"Cache-Control", b"no-cache"),
(b"Content-Type", b"text/event-stream"),
(b"Transfer-Encoding", b"chunked"),
])
while True:
payload = "data: %s\n\n" % datetime.now().isoformat()
await self.send_body(payload.encode("utf-8"), more_body=True)
await asyncio.sleep(1)