Получите сообщение "RuntimeError: Цикл событий закрыт" при запуске pytest в каналах django
Я написал тест для моего соединения через websocket.
Вот мой код pytest.
import pytest
from channels.testing import WebsocketCommunicator
from django.contrib.auth import get_user_model
from rest_framework_simplejwt.tokens import AccessToken
from dashboard.token_auth import TokenAuthMiddleware
from channels.routing import URLRouter
from django.urls import path
from dashboard.consumers import DashboardConusmer
UserModel = get_user_model()
@pytest.fixture
@pytest.mark.django_db
def access_token():
user = UserModel.objects.create_user(email='hello@hello.com', password='hello@hello1')
access = AccessToken.for_user(user)
return str(access)
@pytest.fixture(autouse=True)
@pytest.mark.django_db
def set_channel_layer(settings):
settings.CHANNEL_LAYERS = {"default": {"BACKEND": "channels.layers.InMemoryChannelLayer"}}
@pytest.mark.django_db
@pytest.mark.asyncio
class TestDashboard:
pytestmark = pytest.mark.django_db
async def test_connection(self, access_token):
application = TokenAuthMiddleware(
URLRouter([path("ws/dashboard/", DashboardConusmer.as_asgi())]))
url = f"ws/dashboard/?token={access_token}"
communicator = WebsocketCommunicator(application, url)
connected, _ = await communicator.connect()
assert connected
await communicator.disconnect()
Он показывает RuntimeError: Event loop is closed
Код ошибки:
Task was destroyed but it is pending!
task: <Task cancelling name='Task-2' coro=<ProtocolTypeRouter.__call__() running at /usr/local/lib/python3.10/site-packages/channels/routing.py:62> wait_for=<Future finished result=None>>
Exception ignored in: <coroutine object ProtocolTypeRouter.__call__ at 0x7f139df44f20>
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/channels/routing.py", line 62, in __call__
File "/home/app/web/dashboard/token_auth.py", line 37, in __call__
File "/usr/local/lib/python3.10/site-packages/channels/routing.py", line 132, in __call__
File "/usr/local/lib/python3.10/site-packages/channels/consumer.py", line 94, in app
File "/usr/local/lib/python3.10/site-packages/channels/consumer.py", line 58, in __call__
File "/usr/local/lib/python3.10/site-packages/channels/utils.py", line 55, in await_many_dispatch
File "/usr/local/lib/python3.10/asyncio/base_events.py", line 753, in call_soon
File "/usr/local/lib/python3.10/asyncio/base_events.py", line 515, in _check_closed
RuntimeError: Event loop is closed
Task was destroyed but it is pending!
task: <Task pending name='Task-4' coro=<InMemoryChannelLayer.receive() running at /usr/local/lib/python3.10/site-packages/channels/layers.py:249> wait_for=<Future cancelled>>
Вот мой код в consumers.py
.
from asgiref.sync import async_to_sync
from channels.generic.websocket import WebsocketConsumer
class DashboardConusmer(WebsocketConsumer):
def connect(self):
self.group_name = 'dashboard'
self.user = self.scope['user']
if self.user.is_authenticated:
async_to_sync(self.channel_layer.group_add)(self.group_name, self.channel_name)
self.accept()
else:
self.close()
Чтобы получить scope["user"], я создал собственное промежуточное ПО под названием 'TokenAuthMiddleware'.
from urllib.parse import parse_qs
from channels.db import database_sync_to_async
from django.contrib.auth import get_user_model
from django.contrib.auth.models import AnonymousUser
from django.core.exceptions import ObjectDoesNotExist
from django.db import close_old_connections
from rest_framework_simplejwt.exceptions import TokenError
from rest_framework_simplejwt.tokens import AccessToken
from channels.sessions import CookieMiddleware, SessionMiddleware
UserModel = get_user_model()
@database_sync_to_async
def get_user(token):
if token:
try:
token = AccessToken(token)
user = UserModel.objects.get(pk=token['user_id'])
return user
except (TokenError, ObjectDoesNotExist):
return AnonymousUser()
return AnonymousUser()
class TokenAuthMiddleware:
def __init__(self, app):
self.app = app
async def __call__(self, scope, receive, send):
close_old_connections()
token = parse_qs(scope['query_string'].strip().decode()).get("token", None)[0]
scope['user'] = await get_user(token)
scope = dict(scope)
return await self.app(scope, receive, send)
Я успешно протестировал его, используя расширение "Simple Web Socket Client" в FireFox. Но я не могу пройти тест. Я думаю, что проблема в том, что я неправильно создал приложение канала. Но я не знаю, как это выяснить.