Как запустить несколько клиентских соединений websocket?
У меня есть Django-сервис (назовем его "главный"), запускающий HTTP-сервер и WebSocket-сервер, использующий django-каналы. У меня также есть куча других сервисов Django (назовем их целевыми сервисами), которые хотят подключиться к каналам websocket, открываемым главным сервисом. Каждому сервису необходимо установить несколько Websocket-соединений с главным сервисом и отправлять/получать сообщения до тех пор, пока соединение не будет закрыто главным сервисом.
Вот несколько примеров URL-адресов подключения:
URL1 = "wss://main-service.com/ws-connection/1"
URL2 = "wss://main-service.com/ws-connection/2"
URL3 = "wss://main-service.com/ws-connection/3"
Этот список URL заранее не известен; главный сервис посылает нам мини-полезную нагрузку, которая используется для генерации URL во время выполнения, и соединение устанавливается.
Целевой сервис должен обладать способностью поддерживать тысячи соединений websocket (работающих как stateful connections) & он также должен предоставлять HTTP API для обслуживания прямых запросов.
На данный момент у меня есть целевой сервис API, запущенный как проект Django, и базовый клиент websocket, и оба они выглядят следующим образом:
Websocket клиент:
from abc import abstractmethod
from websocket import WebSocketApp as _WebSocketApp
class WebSocketApp(_WebSocketApp):
def __init__(self, url=None):
super(WebSocketApp, self).__init__(url=url, on_open=self._on_open on_message=self._on_message, on_error=self._on_error, on_close=self._on_close)
self.logger = get_logger() # TODO: set level
self.logger.debug(f"Websocket adapter is ready")
@abstractmethod
def _on_open(self, ws):
pass
@abstractmethod
def _on_message(self, ws, message):
pass
@abstractmethod
def _on_error(self, ws, error):
pass
@abstractmethod
def _on_close(self, ws, code, message):
pass
Django view:
from adapters.web_socket import WebSocketApp
from rest_framework.decorators import api_view
from server.celery import app
@app.task()
def ws_handler(conversation_id):
# TODO: standardise
url = f"wss://main-service.com/connections/{id}"
socket = WebSocketApp(url=url)
socket.run_forever()
@api_view(["POST"])
def index(request):
_id = request.data["_id"]
ws_handler.delay(id=_id)
# NOTE: verify connection via caching
Это работает хорошо, но сервер Django блокируется, как только устанавливается новое websocket-соединение. Пока что я обхожу эту проблему, посылая запросы на соединение по websocket рабочему Celery. Я знаю, что ничем хорошим это не закончится, поскольку Celery просто съест мой сервер, и у меня скоро не останется рабочих потоков, но это позволяет мне держать мой Django-сервер свободным.
Вместо этого я бы хотел превратить его в элегантный асинхронный клиент websocket, чтобы я мог запускать несколько соединений websocket в один поток, желательно тысячи. Я все еще хотел бы оставить celery worker для обработки данных, но я не хотел бы нагружать его управлением websocket, поэтому я хотел бы, чтобы websocket вещи происходили вне Celery & процесс сервера Django
Вот что я собрал на данный момент:
import pdb
import asyncio
import websockets
import json
class WebSocketAdapter:
def __init__(self, uri):
self.uri = uri
self.websocket = None
async def connect(self):
try:
self.websocket = await websockets.connect(self.uri)
await self.on_connect()
await self.receive_messages()
await self.receive_messages()
except Exception as e:
await self.on_error(e)
async def disconnect(self):
await self.websocket.close()
async def send_message(self, message):
if self.websocket is not None:
await self.websocket.send(json.dumps(message))
print("Message Sent")
else:
print("WebSocket connecion is not established")
async def receive_messages(self):
while True:
try:
message = await self.websocket.recv()
await self.on_message((message))
# asyncio.create_task(self.on_message(await self.websocket.recv()))
except websockets.exceptions.ConnectionClosed:
await self.on_close()
break
except Exception as e:
await self.on_error(e)
async def on_connect(self):
print("Connected to WebSocket server.")
async def on_message(self, message):
print(f"Received message from server: {message}")
async def on_error(self, error):
print("WebSocket error:", error)
async def on_close(self):
print("WebSocket connection closed.")
async def main(url):
websocket_adapter = WebSocketAdapter(url)
await websocket_adapter.connect()
# NOTE: following line of code is never executed :(
await websocket_adapter.send_message("Message from client.")
async def test():
urls = [
"ws://localhost:8765/conversations/1",
"ws://localhost:8765/conversations/2",
"ws://localhost:8765/conversations/3",
]
tasks = [main(url) for url in urls]
await asyncio.gather(*tasks)
# Run the event loop
asyncio.run(test())
# asyncio.get_event_loop().run_until_complete(test())
# pdb.set_trace()
Кажется, это позволяет мне запускать несколько соединений, но я не могу отправлять/получать сообщения после начального рукопожатия, код блокируется на методе receive_message
внутри адаптера. Кроме того, я не могу отправлять сообщения за пределы адаптера после установления соединения, что также означает, что управление никогда не возвращается вызывающей стороне, т.е. серверу Django
У меня был простой адаптер websocket, который был синхронным и отлично работал с обратными вызовами, но с asyncio мне приходится строить корутины. Как мне передавать управление обратно, как принимать новые запросы на соединение и как протестировать эту штуку на локальном компьютере?
В основном я хотел бы иметь возможность получать запрос на соединение через конечную точку HTTP, устанавливать соединение websocket в фоновом режиме и продолжать получать новые запросы, т.е. я должен иметь возможность инициализировать множество объектов клиента websocket, с настраиваемыми обратными вызовами, и просто использовать их по своему усмотрению.
Зачем использовать вебсокеты для передачи сообщений между приложениями, можно использовать брокер сообщений AMQP, например RabbitMQ для передачи этих сообщений.