How to run a Redis listener in Django when the server starts?

I'm working with Django and I have a Redis listener that I run with the following custom command:

import os
import asyncio
from django.core.management.base import BaseCommand
from notifications.core.redis_listener import RedisListener


class Command(BaseCommand):
    help = 'Listen for notifications with metadata from Redis in real-time'

    def handle(self, *args, **kwargs):
        redis_host = os.getenv('REDIS_HOST')
        redis_port = int(os.getenv('REDIS_PORT'))
        redis_password = os.getenv('REDIS_PASSWORD')
        redis_channel = 'notifications_channel'  # Redis channel to subscribe to
        ssl = os.getenv('REDIS_SSL', 'False') == 'True'

        # Conditionally Use "ssl_cert_reqs"
        if ssl:
            redis_listener = RedisListener(
                redis_host=redis_host,
                redis_port=redis_port,
                redis_password=redis_password,
                redis_channel=redis_channel,
                ssl=ssl,
                ssl_cert_reqs='none'
            )
        else:
            redis_listener = RedisListener(
                redis_host=redis_host,
                redis_port=redis_port,
                redis_password=redis_password,
                redis_channel=redis_channel,
                ssl=ssl,
                ssl_cert_reqs='none'
            )

        # Run the listener function with asyncio
        loop = asyncio.get_event_loop()
        loop.run_until_complete(redis_listener.listen_to_redis())

Currently, I start the Redis listener by running:

python manage.py listen_redis

However, I want the Redis listener to start automatically when I run the Django development server with:

python manage.py runserver

I would like this Redis listener to run concurrently with the Django development server without needing to manually start it via a custom management command.

How can I achieve this?

In my apps.py I tried this -

from django.apps import AppConfig
import os
import threading
import asyncio
from notifications.core.redis_listener import RedisListener
from django.db.backends.signals import connection_created
from django.db import connections

class NotificationsConfig(AppConfig):
    default_auto_field = 'django.db.models.BigAutoField'
    name = 'notifications'

    def ready(self):
        # Delay listener start until the database connections are created
        connection_created.connect(self.start_redis_listener)

    def start_redis_listener(self, **kwargs):
        # Ensuring we only start the listener once
        if not hasattr(self, '_redis_listener_started'):
            self._redis_listener_started = True
            self.start_threaded_redis_listener()

    def start_threaded_redis_listener(self):
        def listener_thread():
            # Get Redis config values
            redis_host = os.getenv('REDIS_HOST')
            redis_port = int(os.getenv('REDIS_PORT'))
            redis_password = os.getenv('REDIS_PASSWORD')
            redis_channel = 'notifications_channel'
            ssl = os.getenv('REDIS_SSL', 'False') == 'True'

            # Create RedisListener instance
            if ssl:
                redis_listener = RedisListener(
                    redis_host=redis_host,
                    redis_port=redis_port,
                    redis_password=redis_password,
                    redis_channel=redis_channel,
                    ssl=ssl,
                    ssl_cert_reqs='none'
                )
            else:
                redis_listener = RedisListener(
                    redis_host=redis_host,
                    redis_port=redis_port,
                    redis_password=redis_password,
                    redis_channel=redis_channel,
                    ssl=ssl,
                    ssl_cert_reqs='none'
                )

            # Run the Redis listener using asyncio
            loop = asyncio.get_event_loop()
            loop.run_until_complete(redis_listener.listen_to_redis())

        # Create and start the Redis listener thread
        thread = threading.Thread(target=listener_thread)
        thread.daemon = True  # Allow the thread to exit when the main process exits
        thread.start()

But I get -

raise AppRegistryNotReady("Apps aren't loaded yet.")
django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.
Вернуться на верх