Django Channels Websocket зависает - WebSocketProtocol слишком долго закрывался и был убит

Окружение:

  • Ubuntu 16.04.6
  • .
  • Conda 4.12.0
  • .
  • Apache/2.4.18 (Ubuntu)
  • python==3.8.1
  • Django==4.0.3
  • .
  • channels==3.0.5
  • asgi-redis==1.4.3
  • asgiref==3.4.1
  • daphne==3.0.2

Я пытаюсь создать веб-сокет сервис, который передает сообщения из redis только аутентифицированному пользователю. Пользователи не общаются друг с другом, поэтому мне не нужны уровни каналов, и, насколько я понимаю, уровни каналов являются совершенно необязательной частью Channels.

Я просто пытаюсь транслировать сообщения, специфичные для пользователя, который был аутентифицирован через пользовательское промежуточное ПО. У меня есть пользовательское промежуточное ПО auth, которое принимает входящий идентификатор сессии и возвращает словарь аутентифицированного пользователя вместе с user_id. Я также прикрепляю этот user_id к области видимости.

Я решил направлять весь трафик через daphne через Apache2, используя ProxyPass на порт 8033. Это предпочтительно, поскольку я использую один домен для этой службы.

Однако мне очень трудно поддерживать соединение с сервером websocket, особенно если я обновляю браузер. Оно сработает при первом запросе, а после него произойдет сбой со следующим сообщением в journalctl:

Application instance <Task pending name='Task-22' coro=<ProtocolTypeRouter.__call__() running at /root/miniconda2/lib/python3.8/site-packages/channels/routing.py:71> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7f658c03f670>()]>> for connection <WebSocketProtocol client=['127.0.0.1', 46010] path=b'/ws/userupdates/'> took too long to shut down and was killed.

После того, как я провел несколько часов на github для каналов и попробовал многие предложения (особенно найденные на https://github.com/django/channels/issues/1119), я все еще в растерянности. Версия кода ниже - это лучшая рабочая версия на данный момент, которая, по крайней мере, устанавливает начальное соединение, отправляет обратно полезную нагрузку соединения {"success": true, "user_id": XXXXXX, "message": "Connected"} и успешно передает все сообщения redis. Но, если я обновляю браузер или закрываю и открываю заново, он не может установить соединение и публикует вышеуказанное сообщение journalctl, пока я не перезапущу apache и daphne.

Моя догадка заключается в том, что я неправильно отсоединяю потребителя или не правильно использую async await. Есть мысли?

Соответствующая конфигурация apache

RewriteEngine On
RewriteCond %{HTTP:Connection} Upgrade [NC]
RewriteCond %{HTTP:Upgrade} websocket [NC]
RewriteRule /(.*) ws://127.0.0.1:8033/$1 [P,L]

<Location />
    ProxyPass http://127.0.0.1:8033/
    ProxyPassReverse /
</Location>

app/settings.py

[...]
ASGI_APPLICATION = 'app.asgi.application'
ASGI_THREADS = 1000
CHANNEL_LAYERS = {}
[...]

app/asgi.py

import os

from django.core.asgi import get_asgi_application
from django.urls import path
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.security.websocket import AllowedHostsOriginValidator

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app.settings')

import websockets.routing
from user.models import UserAuthMiddleware

application = ProtocolTypeRouter({
    'http': get_asgi_application(),
    "websocket": AllowedHostsOriginValidator(
        UserAuthMiddleware(
            URLRouter(websockets.routing.websocket_urlpatterns)
        )
    ),
})

user/models.py::UserAuthMiddleWare

Получает user_id из пользовательского уровня аутентификации и присоединяет user_id к области видимости.

class UserAuthMiddleware(CookieMiddleware):
    def __init__(self, app):
        self.app = app
    async def __call__(self, scope, receive, send):
        # Check this actually has headers. They're a required scope key for HTTP and WS.
        if "headers" not in scope:
            raise UserSessionError(
                "UserAuthMiddleware was passed a scope that did not have a headers key "
                + "(make sure it is only passed HTTP or WebSocket connections)"
            )
        # Go through headers to find the cookie one
        for name, value in scope.get("headers", []):
            if name == b"cookie":
                cookies = parse_cookie(value.decode("latin1"))
                break
        else:
            # No cookie header found - add an empty default.
            cookies = {}

        # now gather user data from session
        try:
            req = HttpRequest()
            req.GET = QueryDict(query_string=scope.get("query_string"))
            setattr(req, 'COOKIES', cookies)
            setattr(req, 'headers', scope.get("headers")),

            session = UserSession(req)
            scope['user_id'] = session.get_user_id()
        except UserSessionError as e:
            raise e

        return await self.app(scope, receive, send)

websockets/routing.py

from django.urls import re_path
from . import consumers

websocket_urlpatterns = [
    re_path(r'ws/userupdates/', consumers.UserUpdatesConsumer.as_asgi())
]

websockets/consumers.py::UserUpdatesConsumer

from channels.generic.websocket import JsonWebsocketConsumer
import json, redis

class UserUpdatesConsumer(JsonWebsocketConsumer):
    def connect(self):
        self.accept()

        self.redis = redis.Redis(host='127.0.0.1', port=6379, db=0, decode_responses=True)
        self.p = self.redis.pubsub()

        if 'user_id' not in self.scope:
            self.send_json({
                'success': False,
                'message': 'No user_id present'
            })
            self.close()
        else:
            self.send_json({
                'success': True,
                'user_id': self.scope['user_id'],
                'message': 'Connected'
            })

            self.p.psubscribe(f"dip_alerts")
            self.p.psubscribe(f"userupdates_{self.scope['user_id']}*")
            for message in self.p.listen():

                if message.get('type') == 'psubscribe' and message.get('data') in [1,2]:
                    continue

                if message.get('channel') == "dip_alerts":
                    self.send_json({
                        "key": "dip_alerts",
                        "event": "dip_alert",
                        "data": json.loads(message.get('data'))
                    })
                else:
                    self.send(message.get('data'))
Вернуться на верх