Django Channels Websocket зависает - WebSocketProtocol слишком долго закрывался и был убит
Окружение:
- Ubuntu 16.04.6 .
- Conda 4.12.0 .
- Apache/2.4.18 (Ubuntu)
- python==3.8.1
- Django==4.0.3 .
- channels==3.0.5
- asgi-redis==1.4.3
- asgiref==3.4.1
- daphne==3.0.2
Я пытаюсь создать веб-сокет сервис, который передает сообщения из redis только аутентифицированному пользователю. Пользователи не общаются друг с другом, поэтому мне не нужны уровни каналов, и, насколько я понимаю, уровни каналов являются совершенно необязательной частью Channels.
Я просто пытаюсь транслировать сообщения, специфичные для пользователя, который был аутентифицирован через пользовательское промежуточное ПО. У меня есть пользовательское промежуточное ПО auth, которое принимает входящий идентификатор сессии и возвращает словарь аутентифицированного пользователя вместе с user_id. Я также прикрепляю этот user_id к области видимости.
Я решил направлять весь трафик через daphne через Apache2, используя ProxyPass на порт 8033. Это предпочтительно, поскольку я использую один домен для этой службы.
Однако мне очень трудно поддерживать соединение с сервером websocket, особенно если я обновляю браузер. Оно сработает при первом запросе, а после него произойдет сбой со следующим сообщением в journalctl:
Application instance <Task pending name='Task-22' coro=<ProtocolTypeRouter.__call__() running at /root/miniconda2/lib/python3.8/site-packages/channels/routing.py:71> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f658c03f670>()]>> for connection <WebSocketProtocol client=['127.0.0.1', 46010] path=b'/ws/userupdates/'> took too long to shut down and was killed.
После того, как я провел несколько часов на github для каналов и попробовал многие предложения (особенно найденные на https://github.com/django/channels/issues/1119), я все еще в растерянности. Версия кода ниже - это лучшая рабочая версия на данный момент, которая, по крайней мере, устанавливает начальное соединение, отправляет обратно полезную нагрузку соединения {"success": true, "user_id": XXXXXX, "message": "Connected"}
и успешно передает все сообщения redis. Но, если я обновляю браузер или закрываю и открываю заново, он не может установить соединение и публикует вышеуказанное сообщение journalctl, пока я не перезапущу apache и daphne.
Моя догадка заключается в том, что я неправильно отсоединяю потребителя или не правильно использую async await. Есть мысли?
Соответствующая конфигурация apache
RewriteEngine On
RewriteCond %{HTTP:Connection} Upgrade [NC]
RewriteCond %{HTTP:Upgrade} websocket [NC]
RewriteRule /(.*) ws://127.0.0.1:8033/$1 [P,L]
<Location />
ProxyPass http://127.0.0.1:8033/
ProxyPassReverse /
</Location>
app/settings.py
[...]
ASGI_APPLICATION = 'app.asgi.application'
ASGI_THREADS = 1000
CHANNEL_LAYERS = {}
[...]
app/asgi.py
import os
from django.core.asgi import get_asgi_application
from django.urls import path
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.security.websocket import AllowedHostsOriginValidator
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app.settings')
import websockets.routing
from user.models import UserAuthMiddleware
application = ProtocolTypeRouter({
'http': get_asgi_application(),
"websocket": AllowedHostsOriginValidator(
UserAuthMiddleware(
URLRouter(websockets.routing.websocket_urlpatterns)
)
),
})
user/models.py::UserAuthMiddleWare
Получает user_id из пользовательского уровня аутентификации и присоединяет user_id к области видимости.
class UserAuthMiddleware(CookieMiddleware):
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
# Check this actually has headers. They're a required scope key for HTTP and WS.
if "headers" not in scope:
raise UserSessionError(
"UserAuthMiddleware was passed a scope that did not have a headers key "
+ "(make sure it is only passed HTTP or WebSocket connections)"
)
# Go through headers to find the cookie one
for name, value in scope.get("headers", []):
if name == b"cookie":
cookies = parse_cookie(value.decode("latin1"))
break
else:
# No cookie header found - add an empty default.
cookies = {}
# now gather user data from session
try:
req = HttpRequest()
req.GET = QueryDict(query_string=scope.get("query_string"))
setattr(req, 'COOKIES', cookies)
setattr(req, 'headers', scope.get("headers")),
session = UserSession(req)
scope['user_id'] = session.get_user_id()
except UserSessionError as e:
raise e
return await self.app(scope, receive, send)
websockets/routing.py
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
re_path(r'ws/userupdates/', consumers.UserUpdatesConsumer.as_asgi())
]
websockets/consumers.py::UserUpdatesConsumer
from channels.generic.websocket import JsonWebsocketConsumer
import json, redis
class UserUpdatesConsumer(JsonWebsocketConsumer):
def connect(self):
self.accept()
self.redis = redis.Redis(host='127.0.0.1', port=6379, db=0, decode_responses=True)
self.p = self.redis.pubsub()
if 'user_id' not in self.scope:
self.send_json({
'success': False,
'message': 'No user_id present'
})
self.close()
else:
self.send_json({
'success': True,
'user_id': self.scope['user_id'],
'message': 'Connected'
})
self.p.psubscribe(f"dip_alerts")
self.p.psubscribe(f"userupdates_{self.scope['user_id']}*")
for message in self.p.listen():
if message.get('type') == 'psubscribe' and message.get('data') in [1,2]:
continue
if message.get('channel') == "dip_alerts":
self.send_json({
"key": "dip_alerts",
"event": "dip_alert",
"data": json.loads(message.get('data'))
})
else:
self.send(message.get('data'))