Получите сообщение "RuntimeError: Цикл событий закрыт" при запуске pytest в каналах django

Я написал тест для моего соединения через websocket.

Вот мой код pytest.

import pytest
from channels.testing import WebsocketCommunicator
from django.contrib.auth import get_user_model
from rest_framework_simplejwt.tokens import AccessToken

from dashboard.token_auth import TokenAuthMiddleware
from channels.routing import URLRouter
from django.urls import path

from dashboard.consumers import DashboardConusmer

UserModel = get_user_model()


@pytest.fixture
@pytest.mark.django_db
def access_token():
    user = UserModel.objects.create_user(email='hello@hello.com', password='hello@hello1')
    access = AccessToken.for_user(user)
    return str(access)


@pytest.fixture(autouse=True)
@pytest.mark.django_db
def set_channel_layer(settings):
    settings.CHANNEL_LAYERS = {"default": {"BACKEND": "channels.layers.InMemoryChannelLayer"}}


@pytest.mark.django_db
@pytest.mark.asyncio
class TestDashboard:
    pytestmark = pytest.mark.django_db

    async def test_connection(self, access_token):
        application = TokenAuthMiddleware(
            URLRouter([path("ws/dashboard/", DashboardConusmer.as_asgi())]))
        url = f"ws/dashboard/?token={access_token}"
        communicator = WebsocketCommunicator(application, url)
        connected, _ = await communicator.connect()
        assert connected
        await communicator.disconnect()

Он показывает RuntimeError: Event loop is closed

Код ошибки:

Task was destroyed but it is pending!
task: <Task cancelling name='Task-2' coro=<ProtocolTypeRouter.__call__() running at /usr/local/lib/python3.10/site-packages/channels/routing.py:62> wait_for=<Future finished result=None>>
Exception ignored in: <coroutine object ProtocolTypeRouter.__call__ at 0x7f139df44f20>
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/channels/routing.py", line 62, in __call__
  File "/home/app/web/dashboard/token_auth.py", line 37, in __call__
  File "/usr/local/lib/python3.10/site-packages/channels/routing.py", line 132, in __call__
  File "/usr/local/lib/python3.10/site-packages/channels/consumer.py", line 94, in app
  File "/usr/local/lib/python3.10/site-packages/channels/consumer.py", line 58, in __call__
  File "/usr/local/lib/python3.10/site-packages/channels/utils.py", line 55, in await_many_dispatch
  File "/usr/local/lib/python3.10/asyncio/base_events.py", line 753, in call_soon
  File "/usr/local/lib/python3.10/asyncio/base_events.py", line 515, in _check_closed
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending name='Task-4' coro=<InMemoryChannelLayer.receive() running at /usr/local/lib/python3.10/site-packages/channels/layers.py:249> wait_for=<Future cancelled>>

Вот мой код в consumers.py.

from asgiref.sync import async_to_sync
from channels.generic.websocket import WebsocketConsumer

class DashboardConusmer(WebsocketConsumer):

    def connect(self):
        self.group_name = 'dashboard'
        self.user = self.scope['user']
        if self.user.is_authenticated:
            async_to_sync(self.channel_layer.group_add)(self.group_name, self.channel_name)
            self.accept()
        else:
            self.close()

Чтобы получить scope["user"], я создал собственное промежуточное ПО под названием 'TokenAuthMiddleware'.

from urllib.parse import parse_qs

from channels.db import database_sync_to_async
from django.contrib.auth import get_user_model
from django.contrib.auth.models import AnonymousUser
from django.core.exceptions import ObjectDoesNotExist
from django.db import close_old_connections
from rest_framework_simplejwt.exceptions import TokenError
from rest_framework_simplejwt.tokens import AccessToken
from channels.sessions import CookieMiddleware, SessionMiddleware

UserModel = get_user_model()

@database_sync_to_async
def get_user(token):
    if token:
        try:
            token = AccessToken(token)
            user = UserModel.objects.get(pk=token['user_id'])
            return user
        except (TokenError, ObjectDoesNotExist):
            return AnonymousUser()
    return AnonymousUser()

class TokenAuthMiddleware:

    def __init__(self, app):
        self.app = app

    async def __call__(self, scope, receive, send):
        close_old_connections()
        token = parse_qs(scope['query_string'].strip().decode()).get("token", None)[0]
        scope['user'] = await get_user(token)
        scope = dict(scope)
        return await self.app(scope, receive, send)

Я успешно протестировал его, используя расширение "Simple Web Socket Client" в FireFox. Но я не могу пройти тест. Я думаю, что проблема в том, что я неправильно создал приложение канала. Но я не знаю, как это выяснить.

Вернуться на верх