Вызывающая сторона пыталась использовать каналы rabbitmq в другом цикле событий, а не в том, в котором они были инициализированы
Я пытаюсь инициализировать очередь кроликов-потребителей, обработать эти сообщения и отправить их на вебсокет в django. Но получаю ошибку в заголовке.
Я новичок в таких вещах. Может кто-нибудь объяснить мне, как я могу заставить это работать. Вот мой код:
asgi.py
import threading
import django
from django.core.asgi import get_asgi_application
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from apps.websocket import routing
import asyncio
from apps.websocket.services.message_processor import start_rabbitmq_consumer
django.setup()
application = ProtocolTypeRouter(
{
"http": get_asgi_application(),
"websocket": AuthMiddlewareStack(URLRouter(routing.websocket_urlpatterns)),
}
)
def start_consumer_thread():
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(start_rabbitmq_consumer())
consumer_thread = threading.Thread(target=start_consumer_thread, daemon=True)
consumer_thread.start()
consumer.py
import asyncio
import json
import logging
from aio_pika import connect, exceptions as aio_pika_exceptions
import httpx
from channels.layers import get_channel_layer
from apps.websocket.services.data_extractor import extract_notification_data
from core.settings import RABBIT_INTERSECTION_QUEUE, RABBIT_URL, API_TRACKER
logger = logging.getLogger(__name__)
async def start_rabbitmq_consumer():
"""Inicializa el consumidor"""
while True:
try:
connection = await connect(RABBIT_URL)
await consume_messages(connection)
except aio_pika_exceptions.AMQPConnectionError as e:
await asyncio.sleep(5)
except Exception as e:
await asyncio.sleep(5)
async def consume_messages(connection):
"""Iniciamos el consumo de mensajes de la cola de rabbit"""
channel = await connection.channel()
queue = await channel.declare_queue(RABBIT_INTERSECTION_QUEUE, durable=True)
async for message in queue:
async with message.process():
data = json.loads(message.body.decode())
await process_message(data)
async def process_message(data):
"""Procesamos los mensajes provenientes de la cola de Intersection"""
notifications = extract_notification_data(data)
for notification_payload in notifications:
response_payload = await create_notification(notification_payload)
if response_payload:
await send_websocket_notification(response_payload, data['route_pre']['empresa_generadora'])
async def create_notification(notification_payload):
"""Enviamos una solicitud para crear la nueva notificación"""
async with httpx.AsyncClient() as client:
try:
response = await client.post(
f"{API_TRACKER}/tracker/notification/create/",
json=notification_payload,
)
if response.status_code == 201:
return response.json()
else:
return None
except httpx.RequestError as exc:
return None
async def send_websocket_notification(notification_response, company_id):
"""Enviamos al websocket la notifiación"""
channel_layer = get_channel_layer()
room_group_name = f"chat_{company_id}"
await channel_layer.group_send(
room_group_name,
{"type": "send_notification", "message": notification_response},
)
websocket.py
import asyncio
import json
import logging
from channels.generic.websocket import AsyncWebsocketConsumer
from apps.websocket.services.message_processor import start_rabbitmq_consumer
logger = logging.getLogger(__name__)
class WebSocketConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.company_id = self.scope['url_route']['kwargs']['company_id']
self.room_group_name = f"chat_{self.company_id}"
await self.channel_layer.group_add(self.room_group_name, self.channel_name)
await self.accept()
logger.info(f"WebSocket connected: {self.channel_name}")
async def disconnect(self, close_code):
await self.channel_layer.group_discard(self.room_group_name, self.channel_name)
logger.info(
f"WebSocket disconnected: {self.channel_name} with code {close_code}"
)
async def receive(self, text_data):
text_data_json = json.loads(text_data)
message = text_data_json["message"]
await self.send(text_data=json.dumps({"message": message}))
logger.info(f"Message received and sent back to client: {message}")
async def send_notification(self, event):
message = event["message"]
await self.send(text_data=json.dumps({"message": message}))
logger.info(f"WebSocket sending notification: {message}")
я нахожу это не элегантное временное решение:
class WebSocketConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.company_id = self.scope['url_route']['kwargs']['company_id']
self.room_group_name = f"chat_{self.company_id}"
await self.channel_layer.group_add(self.room_group_name, self.channel_name)
await self.accept()
logger.info(f"WebSocket connected: {self.channel_name}")
asyncio.create_task(
start_rabbitmq_consumer()
) # Starting de consumer inside the ws
async def disconnect(self, close_code):
await self.channel_layer.group_discard(self.room_group_name, self.channel_name)
logger.info(
f"WebSocket disconnected: {self.channel_name} with code {close_code}"
)
но должно быть лучшее решение.