Проблемы с каналами и сигналами Django

У меня есть скрипт на Vue, который получает уведомления от Django и должен отображать и обновлять количество уведомлений в бейдже. Я использую каналы Django, версия Django - 4.2.8, каналы версии 4, uvicorn 0.26 и websockets 12.0. Для обновления количества уведомлений я использую Django signal, поэтому при добавлении нового уведомления из админки или другого источника, срабатывает событие post_save и вызывается метод update_notification_count в consumers.py. В браузере все работает нормально, но когда я добавляю новое уведомление из админки Django, фронтенд не обновляется, то есть update_notification_count не вызывается, даже если срабатывает событие post_save. Вот код.

Сначала мой конфиг:

# settings.py
CHANNEL_LAYERS = {
        'default': {
        'BACKEND': 'channels.layers.InMemoryChannelLayer',
        'CONFIG': {
            'capacity': 1000,
            'expiry': 60,
        },
    }
}

Теперь asgi.py

import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from notifapi.routing import websocket_urlpatterns

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproj.settings')

application = ProtocolTypeRouter({
    "http": get_asgi_application(),
    "websocket": URLRouter(websocket_urlpatterns),
})

Файл signals.py закодирован таким образом

from django.dispatch import receiver
from django.db.models.signals import post_save
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
from .models import NotifyModel as Notification
from .consumers import NotificationConsumer


@receiver(post_save, sender=Notification)
def notification_created(sender, instance, created, **kwargs):
    try:
        if created:
            print(f"A new notification was created {instance.message}")
            channel_layer = get_channel_layer()
            async_to_sync(channel_layer.group_send)(
                "public_room",
                {
                    "type": "update_notification_count",
                    "message": instance.message
                }
            )
    except Exception as e:
        print(f"Error in group_send: {e}")

Теперь consumers.py

from channels.generic.websocket import AsyncWebsocketConsumer
from django.apps import apps
from django.core.serializers import serialize
from asgiref.sync import sync_to_async
import json
import logging

class NotificationConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        # Allow all connections
        await self.channel_layer.group_add("public_room", self.channel_name)
        await self.accept()
        await self.update_notification_count()

    
    async def update_notification_count(self, event=None):
        print("update_notification_count method called with event:", event)

        NotifyModel = apps.get_model('notifapi', 'NotifyModel')
        notifications = await sync_to_async(list)(NotifyModel.objects.all().values('is_read'))
        messages = await sync_to_async(list)(NotifyModel.objects.all().values('message'))
        # Get the notification count asynchronously using a custom utility method
        notification_count = len(notifications)
        
        
        #print(f"Notification count is {notification_count}")
        # Extracting is_read values from notifications
        is_read_values = [notification['is_read'] for notification in notifications]
        messages_values = [notification['message'] for notification in messages]
        #print("Am I here?")
        print(f"Messages values are: {messages_values}")
        await self.send(text_data=json.dumps({
            "type": "notification.update",
            "count": notification_count,
            "is_read_values": is_read_values,
            "messages_values": messages_values
        }))

        
    async def disconnect(self, close_code):
        print("Consumer disconnected")
        # Remove the channel from the public_room group when the WebSocket connection is closed
        await self.channel_layer.group_discard(
            "public_room",
            self.channel_name
        )


    async def receive(self, text_data):
        # Handle incoming messages (if any)
        data = json.loads(text_data)
        if data['type'] == 'update.notification.count':
            await self.update_notification_count()

И, наконец, скрипты Vue (просто для справки, потому что проблема зависит от signals.py, а не от скрипта Vue):

// App.vue
<template>
  <div id="app">
    <div id="wrapper">
      <NotificationBell />
    </div>
  </div>
</template>

<script>
import NotificationBell from './components/NotificationBell.vue';

export default {
  components: {
    NotificationBell,
  },
};
</script>

И NotificationBell.vue

<template>
    <div class="fancy-container">
        <a href="#" class="position-relative">
            <i class="fa fa-bell _gray" style="font-size:24px"></i>
            <span class="my-text btnLnk">Visits</span>
            <span class="position-absolute top-0 start-100 translate-middle badge rounded-pill bg-danger _reduced">
                {{ notificationCount }}
            </span>
        </a>
    </div>
</template>
<script>
//import Axios from 'axios';

export default {
    data() {
        return {
            notifications: [],
            webSocket: null
        };
    },
    computed: {
        notificationCount() {
        // Calculate the notification count based on the current state of notifications
        return this.notifications.filter(notification => !notification.fields.is_read).length;
        }
    },
    mounted() {
        this.establishWebSocketConnection();
    },
    methods: {
        async establishWebSocketConnection() {
            this.webSocket = new WebSocket('ws://127.0.0.1:8001/websocket/ws/notifications/', 'echo-protocol');
            
            this.webSocket.onopen = () => {
                console.log('WebSocket connection established!');
                this.updateNotificationCount();
            };
            
            this.webSocket.onmessage = (event) => {
                console.log("Message received:", event.data);
                const message = JSON.parse(event.data);
                console.log("Received type:", message.type);  // Log the type field to identify the message type
                if(message.type === 'notification.update'){
                    this.notifications = message.is_read_values.map(is_read => ({ fields: { is_read } }));
                }else{
                    console.log("Notification count was not updated!")
                }

            };

            this.webSocket.onclose = () => {
                console.log('WebSocket connection closed.');
            // implement reconnect logic if desired
            };
        },
        updateNotificationCount() {
            console.log('updateNotificationCount called!');
            // Send a message to the WebSocket server to request updated notification count
            this.webSocket.send(JSON.stringify({
                "type": "update.notification.count"  // Define a custom type to trigger the count update
            }));

        },
    },
};
</script>

Я прекрасно знаю, что вся информация о модели может быть получена одним ударом по базе данных в consumers.py, и я отрефакторю его, когда решу проблему.

Проблема в том, что когда я добавляю новое уведомление из админки Django, выводится print внутри функции notification_created в signals.py

print(f"A new notification was created {instance.message}")

но код здесь

async_to_sync(channel_layer.group_send)(
                "public_room",
                {
                    "type": "update_notification_count",
                    "message": instance.message
                }
            )

никогда не вызывается. Я вижу это в консоли gunicorn и в консоли браузера, только после того, как я нажимаю F5 в браузере, информация обновляется.

Сигналы не имеют никакого влияния и не могут вызвать update_notification_count, я не знаю почему, возможно, потому что сигналы не были разработаны для вызова асинхронных методов в файле consumers.py, я вообще не знаю причины, но мне нужен механизм, чтобы подталкивать новые уведомления, когда они добавляются, чтобы счетчик обновлялся без необходимости обновлять браузер.

У меня нет опыта работы с каналами Django и веб-сокетом.

Кто-нибудь может помочь мне с этим?

Поскольку никто не ответил на вопрос, я нашел решение сам. Проблема в том, что conusmer работает на WSGI-сервере или, по крайней мере, работает на порту 8000. Но Uvicorn, который я использовал в разработке, потому что не знал, что Daphne является официальным решением для разработки (Daphne больше не входит в Django Channels с версии 4), работает на порту 8001. Другой сервер и другой порт. Когда код внутри signals.py пытается вызвать асинхронную функцию в consumers

@receiver(post_save, sender=Notification)
def notification_created(sender, instance, created, **kwargs):
    
    if created:
        print(f"A new notification was created {instance.message}")
        channel_layer = get_channel_layer()
        print(channel_layer)
        try:
            async_to_sync(channel_layer.group_send)(
                'public_room',
                {
                    "type":"update_notification_count",
                    "message":instance.message
                }
            )

Даже если он использует async_to_sinc, он все равно не может взаимодействовать с другим сервером, прослушивающим другой порт. Это будет работать только при прямом вызове websocket с клиента и по этой причине работает только при обновлении browsser.

Эта конфигурация

CHANNEL_LAYERS = {
        'default': {
        'BACKEND': 'channels.layers.InMemoryChannelLayer',
        'CONFIG': {
            'capacity': 1000,
            'expiry': 60,
        },
    }
}

подходит только для работы в режиме разработки с

python manage.py runserver

и использование Daphne в режиме разработки для wsgi и asgi, которые будут работать на одном порту 8000 с Daphne. Это никогда не сработает при использовании Uvicorn в режиме производства. InMemoryChannelLayer не был разработан для работы с разными серверами на разных портах. Он был предназначен только для запуска с помощью Daphne на одном порту. Но в документации не упоминаются все эти детали, и мне пришлось разбираться во всем этом самому.

Для этого мне нужен Redis. Redis - это не канальный слой. Это база данных с ключевыми значениями, которая работает в памяти. Но канальный слой channels_redis.core.RedisChannelLayer, который должен быть установлен с

pip install channels-redis

, который также установит зависимость, redis, именно она использует Redis для обеспечения работы коммуникаций и разбивки сообщений между различными серверами. В Windows мне пришлось установить Memurai, который является клоном Redis, работающим под Windows, чтобы проверить все это. И теперь все работает идеально.

Итак, урок таков:

Если вы хотите использовать слои каналов только в режиме разработки, особенно если вы разрабатываете под Windows, то используйте

CHANNEL_LAYERS = {
        'default': {
        'BACKEND': 'channels.layers.InMemoryChannelLayer',
        'CONFIG': {
            'capacity': 1000,
            'expiry': 60,
        },
    }
}

Добавьте это

#WSGI_APPLICATION = 'myproj.wsgi.application'
ASGI_APPLICATION = 'myproj.asgi.application'

соответствующим образом настройте asgi.py и установите Daphne для запуска с сервером разработки.

Если вы хотите запустить Django Channels в производство или протестировать или смоделировать производственную среду, даже в Windows, то вам необходимо установить Memurai и следовать тем же инструкциям, что описаны выше.

Надеюсь, что мое решение поможет кому-то еще. Это очень хитрая проблема, когда вы не знаете, как действовать, потому что даже инструменты искусственного интеллекта совершенно не способны найти решение.

Вернуться на верх