Как запустить несколько клиентских соединений websocket?

У меня есть Django-сервис (назовем его "главный"), запускающий HTTP-сервер и WebSocket-сервер, использующий django-каналы. У меня также есть куча других сервисов Django (назовем их целевыми сервисами), которые хотят подключиться к каналам websocket, открываемым главным сервисом. Каждому сервису необходимо установить несколько Websocket-соединений с главным сервисом и отправлять/получать сообщения до тех пор, пока соединение не будет закрыто главным сервисом.

Вот несколько примеров URL-адресов подключения:

URL1 = "wss://main-service.com/ws-connection/1"
URL2 = "wss://main-service.com/ws-connection/2"
URL3 = "wss://main-service.com/ws-connection/3"

Этот список URL заранее не известен; главный сервис посылает нам мини-полезную нагрузку, которая используется для генерации URL во время выполнения, и соединение устанавливается.

Целевой сервис должен обладать способностью поддерживать тысячи соединений websocket (работающих как stateful connections) & он также должен предоставлять HTTP API для обслуживания прямых запросов.

На данный момент у меня есть целевой сервис API, запущенный как проект Django, и базовый клиент websocket, и оба они выглядят следующим образом:

Websocket клиент:

from abc import abstractmethod
from websocket import WebSocketApp as _WebSocketApp


class WebSocketApp(_WebSocketApp):
    def __init__(self, url=None):
            super(WebSocketApp, self).__init__(url=url, on_open=self._on_open on_message=self._on_message, on_error=self._on_error, on_close=self._on_close)
        self.logger = get_logger()  # TODO: set level
        self.logger.debug(f"Websocket adapter is ready")

    @abstractmethod
    def _on_open(self, ws):
        pass

    @abstractmethod
    def _on_message(self, ws, message):
        pass

    @abstractmethod
    def _on_error(self, ws, error):
        pass

    @abstractmethod
    def _on_close(self, ws, code, message):
        pass

Django view:

from adapters.web_socket import WebSocketApp
from rest_framework.decorators import api_view
from server.celery import app


@app.task()
def ws_handler(conversation_id):
    # TODO: standardise
    url = f"wss://main-service.com/connections/{id}"
    socket = WebSocketApp(url=url)
    socket.run_forever()


@api_view(["POST"])
def index(request):
    _id = request.data["_id"]
    ws_handler.delay(id=_id)
        # NOTE: verify connection via caching  

Это работает хорошо, но сервер Django блокируется, как только устанавливается новое websocket-соединение. Пока что я обхожу эту проблему, посылая запросы на соединение по websocket рабочему Celery. Я знаю, что ничем хорошим это не закончится, поскольку Celery просто съест мой сервер, и у меня скоро не останется рабочих потоков, но это позволяет мне держать мой Django-сервер свободным.

Вместо этого я бы хотел превратить его в элегантный асинхронный клиент websocket, чтобы я мог запускать несколько соединений websocket в один поток, желательно тысячи. Я все еще хотел бы оставить celery worker для обработки данных, но я не хотел бы нагружать его управлением websocket, поэтому я хотел бы, чтобы websocket вещи происходили вне Celery & процесс сервера Django

Вот что я собрал на данный момент:

import pdb
import asyncio
import websockets
import json


class WebSocketAdapter:
    def __init__(self, uri):
        self.uri = uri
        self.websocket = None

    async def connect(self):
        try:
            self.websocket = await websockets.connect(self.uri)
            await self.on_connect()
            await self.receive_messages()
            await self.receive_messages()
        except Exception as e:
            await self.on_error(e)

    async def disconnect(self):
        await self.websocket.close()

    async def send_message(self, message):
        if self.websocket is not None:
            await self.websocket.send(json.dumps(message))
            print("Message Sent")
        else:
            print("WebSocket connecion is not established")

    async def receive_messages(self):
        while True:
            try:
                message = await self.websocket.recv()
                await self.on_message((message))
                # asyncio.create_task(self.on_message(await self.websocket.recv()))
            except websockets.exceptions.ConnectionClosed:
                await self.on_close()
                break
            except Exception as e:
                await self.on_error(e)

    async def on_connect(self):
        print("Connected to WebSocket server.")

    async def on_message(self, message):
        print(f"Received message from server: {message}")

    async def on_error(self, error):
        print("WebSocket error:", error)

    async def on_close(self):
        print("WebSocket connection closed.")


async def main(url):
    websocket_adapter = WebSocketAdapter(url)
    await websocket_adapter.connect()
    # NOTE: following line of code is never executed :(
    await websocket_adapter.send_message("Message from client.")



async def test():

    urls = [
        "ws://localhost:8765/conversations/1",
        "ws://localhost:8765/conversations/2",
        "ws://localhost:8765/conversations/3",
    ]
    tasks = [main(url) for url in urls]
    await asyncio.gather(*tasks)


# Run the event loop
asyncio.run(test())
# asyncio.get_event_loop().run_until_complete(test())
# pdb.set_trace()
        

Кажется, это позволяет мне запускать несколько соединений, но я не могу отправлять/получать сообщения после начального рукопожатия, код блокируется на методе receive_message внутри адаптера. Кроме того, я не могу отправлять сообщения за пределы адаптера после установления соединения, что также означает, что управление никогда не возвращается вызывающей стороне, т.е. серверу Django

У меня был простой адаптер websocket, который был синхронным и отлично работал с обратными вызовами, но с asyncio мне приходится строить корутины. Как мне передавать управление обратно, как принимать новые запросы на соединение и как протестировать эту штуку на локальном компьютере?

В основном я хотел бы иметь возможность получать запрос на соединение через конечную точку HTTP, устанавливать соединение websocket в фоновом режиме и продолжать получать новые запросы, т.е. я должен иметь возможность инициализировать множество объектов клиента websocket, с настраиваемыми обратными вызовами, и просто использовать их по своему усмотрению.

Зачем использовать вебсокеты для передачи сообщений между приложениями, можно использовать брокер сообщений AMQP, например RabbitMQ для передачи этих сообщений.

Вернуться на верх