Ошибка при аутентификации вебсокета с помощью маркерной аутентификации на каналах django?
У меня возникает ошибка при вызове url websocket с передачей токена JWT для аутентификации:
мой вебсокетный запрос:
ws://127.0.0.1:8000/chat/chat_2/?token=
ошибка:
raise ValueError("Не найден маршрут для пути %r." % path) ValueError: Не найден маршрут для пути 'chat/chat_2/'.
Я использую пользовательское промежуточное ПО аутентификации:
middleware.py
"""
General web socket middlewares
"""
from channels.db import database_sync_to_async
from django.contrib.auth import get_user_model
from django.contrib.auth.models import AnonymousUser
from rest_framework_simplejwt.exceptions import InvalidToken, TokenError
from rest_framework_simplejwt.tokens import UntypedToken
from rest_framework_simplejwt.authentication import JWTTokenUserAuthentication
from channels.middleware import BaseMiddleware
from channels.auth import AuthMiddlewareStack
from django.db import close_old_connections
from urllib.parse import parse_qs
from jwt import decode as jwt_decode
from django.conf import settings
from django.contrib.auth import get_user_model
User = get_user_model()
@database_sync_to_async
def get_user(validated_token):
try:
user = get_user_model().objects.get(id=validated_token["user_id"])
print(f"{user}")
return user
except User.DoesNotExist:
return AnonymousUser()
class JwtAuthMiddleware(BaseMiddleware):
def __init__(self, inner):
self.inner = inner
async def __call__(self, scope, receive, send):
# Close old database connections to prevent usage of timed out connections
close_old_connections()
# Get the token
token = parse_qs(scope["query_string"].decode("utf8"))["token"][0]
# Try to authenticate the user
try:
# This will automatically validate the token and raise an error if token is invalid
UntypedToken(token)
except (InvalidToken, TokenError) as e:
# Token is invalid
print(e)
return None
else:
# Then token is valid, decode it
decoded_data = jwt_decode(
token, settings.SECRET_KEY, algorithms=["HS256"]
)
print(decoded_data)
# Get the user using ID
scope["user"] = await get_user(validated_token=decoded_data)
return await super().__call__(scope, receive, send)
def JwtAuthMiddlewareStack(inner):
return JwtAuthMiddleware(AuthMiddlewareStack(inner))
routing.py:
from . import consumers
from django.urls.conf import path
websocket_urlpatterns = [
path("ws/chat/<str:room_name>/", consumers.ChatConsumer.as_asgi()),
path(
"ws/personal_chat/<str:room_name>/",
consumers.PersonalConsumer.as_asgi(),
),
]
asgi.py:
import os
import ChatApp.routing
from django.core.asgi import get_asgi_application
django_asgi_app = get_asgi_application()
from ChatApp.middlewares import JwtAuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "Hookax.settings")
application = ProtocolTypeRouter(
{
"http": django_asgi_app,
"websocket": JwtAuthMiddlewareStack(
URLRouter(ChatApp.routing.websocket_urlpatterns)
),
}
)
Проект, основанный на:
Django 3.2.7
Channels 3.0.4
Есть предложения по решению?