Django channels test code AsyncConsumer.__call__() missing 1 required positional argument: 'send'
Тест каналов Я писал тестовый код, просматривая документацию.
Но возникает эта ошибка.
Traceback (most recent call last):
File "C:\Users\Home\Desktop\exchange-rate\venv\lib\site-packages\asgiref\testing.py", line 74, in receive_output
return await self.output_queue.get()
File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\asyncio\queues.py", line 159, in get
await getter
asyncio.exceptions.CancelledError
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "C:\Users\Home\Desktop\exchange-rate\venv\lib\site-packages\asgiref\sync.py", line 213, in __call__
return call_result.result()
File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\concurrent\futures\_base.py", line 439, in result
return self.__get_result()
File "C:\Users\Home\AppData\Local\Programs\Python\Python310\lib\concurrent\futures\_base.py", line 391, in __get_result
raise self._exception
File "C:\Users\Home\Desktop\exchange-rate\venv\lib\site-packages\asgiref\sync.py", line 279, in main_wrap
result = await self.awaitable(*args, **kwargs)
File "C:\Users\Home\Desktop\exchange-rate\backend\exchange_rate\channel\tests.py", line 15, in test_connect
response = await communicator.get_response()
File "C:\Users\Home\Desktop\exchange-rate\venv\lib\site-packages\channels\testing\http.py", line 42, in get_response
response_start = await self.receive_output(timeout)
File "C:\Users\Home\Desktop\exchange-rate\venv\lib\site-packages\asgiref\testing.py", line 78, in receive_output
self.future.result()
File "C:\Users\Home\Desktop\exchange-rate\venv\lib\site-packages\asgiref\compatibility.py", line 34, in new_application
return await instance(receive, send)
TypeError: AsyncConsumer.__call__() missing 1 required positional argument: 'send'
Он хорошо используется в среде runerver и развертывания.
Только тестовый код выбрасывает ошибку.
Я писал тестовый код, просматривая документацию. Но возникает эта ошибка.
Он хорошо используется в среде runerver и развертывания.
Только тестовый код выбрасывает ошибку.
потребители
from channels.generic.websocket import AsyncJsonWebsocketConsumer
class TestConsumer(AsyncJsonWebsocketConsumer):
async def connect(self):
await self.accept()
маршрутизация
from django.urls import path
websocket_urlpatterns = [
path("ws/test/", TestConsumer.as_asgi()),
]
asgi
django_asgi_app = get_asgi_application()
application = ProtocolTypeRouter(
{
"http": django_asgi_app,
"websocket": AllowedHostsOriginValidator(
AuthMiddlewareStack(URLRouter(websocket_urlpatterns))
),
}
)
test
class ChannelsConnectTest(TestCase):
async def test_connect(self):
communicator = HttpCommunicator(TestConsumer, "GET", "/test/")
await communicator.get_response()