Проблема с выполнением Celery задач в Django проекте: первая задача выполняется, вторая — нет

Концепция работы проекта: Есть django-project, он поднят локально, в нем есть 2 приложения, но виновник события - приложение с базой данных. В рамках нее, а точнее в рамках таблиц прописаны методы по доступу к этим таблицам, они же являются задачами Celery. Celery связан с брокером сообщений RabbitMQ, вместе с запуском Django-проекта запускается RabbitMQConsumer, который прослушивает приходящие на очередь bot_queue сообщения и ставит на исполнение задачи Celery. Эти сообщения содержат имя задачи Celery (именно так как она зарегистрирована в самом Celery) и аргументы для вызываемого задачей метода. Запросы к базе данных формирует и отправляет в брокер телеграм-бот.

Я столкнулся со следующей проблемой: запуск бота сопровождается отправкой запроса в БД на получение id всех пользователей этого бота, данный запрос принимается и завершается успешно. Второй же запрос связан с процессом регистрации пользователя, и вот с ним начинаются проблемы. Сам запрос отправляется и доходит до Conumer-а, более того, имя задачи и аргументы поступают правильные, но вот когда дело доходит до исполнения задачи Celery, вот тут все и ломается. По каким-то причинам задача не исполняется (логи в самом методе не выводятся). Пока пытался сам разобраться с проблемой, столкнулся с интересными особенностями, если передавать в этот проблемный второй метод неправильное количество аргументов, то задача попытается выполнить данный метод, но выведет ошибку, а если передавать нужное количество аргументов, то задача почему-то вообще не будет выполняться)) А если повторить запрос от бота, то выполнится задача с аргументами, которые были отправлены ранее, а новый запрос так же останется висеть хрен пойми где.

Проблемный метод:

@app.task(name='bot.tasks.get_group_id', bind = True)
    def get_group_id(self, group_number, reply_to, correlation_id):
        logger.info(
            f"Задача get_group_id запущена с параметрами: group_number={group_number}, reply_to={reply_to}, correlation_id={correlation_id}")


        try:
            logger.info(f"Пытаемся найти группу с номером: {group_number}")
            group = Group.objects.get(group_number=group_number)
            group_id = group.id
            logger.info(f"Группа найдена: {group_id}")
        except ObjectDoesNotExist:
            group_id = None
            logger.error(f"Группа с номером {group_number} не найдена")
            raise ValueError(f"Group number {group_number} not found")
        finally:
            result = {'result': group_id}
            logger.info(f"Отправляем результат: {result}")
            asyncio.run(send_response(result, reply_to, correlation_id))

Функция send_response отправляет обратно в брокер результат выполнения метода, reply_to и correlation_id - это те аргументы, которые поступили в джанго вместе с запросом от телеграм бота. На стороне самого Teleragm-бота реализована фильтрация по correlation_id, чтобы ответы поступали туда же, откуда они вызывались.

Сама функция send_response:

import json
import logging
import os
import sys

import aio_pika


BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)

# Устанавливаем переменную окружения для настроек Django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djcore.settings')

# Инициализация Django (если нужно)
from djcore.settings import RABBITMQ_USER, RABBITMQ_PASSWORD, RABBITMQ_HOST, RABBITMQ_PORT

import json
import logging
import aio_pika
import asyncio

async def send_response(result, reply_to, correlation_id):

    connection = await aio_pika.connect_robust(
        host=RABBITMQ_HOST,
        port=RABBITMQ_PORT,
        login=RABBITMQ_USER,
        password=RABBITMQ_PASSWORD
    )

    async with connection:
        channel = await connection.channel()


        # Объявляем очередь для reply_to
        queue = await channel.declare_queue(reply_to, durable=True)
        exchange = await channel.declare_exchange('direct_exchange', aio_pika.ExchangeType.DIRECT, durable=True)
        await queue.bind(exchange, routing_key=reply_to)

        # Формируем и отправляем сообщение с результатом
        message = aio_pika.Message(
            body=json.dumps(result).encode(),
            correlation_id=correlation_id
        )
        logging.info(f'Очередь ответов {reply_to}\nСам ответ - {result}')
        # Публикуем сообщение с использованием reply_to в качестве routing_key
        await exchange.publish(
            message,
            routing_key=reply_to
        )

Класс RabbitMQConsumer

import os
import sys
import aio_pika
import json
import logging

# Добавляем путь к корневой директории вашего проекта в sys.path
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)

# Устанавливаем переменную окружения для настроек Django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djcore.settings')

# Инициализация Django (если нужно)
from djcore.settings import RABBITMQ_USER, RABBITMQ_PASSWORD, RABBITMQ_HOST, RABBITMQ_PORT
from djcore.celery_app import app

logger = logging.getLogger(__name__)

class RabbitMQConsumer:
    def __init__(self):
        self.connection = None
        self.channel = None
        self.exchange = None
        self.queue = None

    async def connect(self):
        self.connection = await aio_pika.connect_robust(
            host=RABBITMQ_HOST,
            port=RABBITMQ_PORT,
            login=RABBITMQ_USER,
            password=RABBITMQ_PASSWORD
        )
        self.channel = await self.connection.channel()

        # Объявляем exchange
        self.exchange = await self.channel.declare_exchange('direct_exchange', aio_pika.ExchangeType.DIRECT, durable=True)
        # Объявляем очередь
        self.queue = await self.channel.declare_queue('bot_queue', durable=True)
        # Связываем очередь с exchange и routing_key
        await self.queue.bind(self.exchange, routing_key='bot_queue')

    async def process_message(self, message: aio_pika.IncomingMessage):
        async with message.process():
            try:
                body = message.body.decode()
                data = json.loads(body)
                if isinstance(data, dict) and 'task' in data and 'data' in data:
                    task_name = data['task']
                    task_data = data['data']
                    reply_to = message.reply_to
                    correlation_id = message.correlation_id
                    #logger.exception(f"Получена задача: {task_name} с данными: {reply_to, correlation_id}")
                    if task_name not in app.tasks.keys():
                        logger.error(f"Задача {task_name} не найдена в Celery.")
                        raise ValueError("Задача не найдена в Celery")
                    # Отправляем задачу в Celery с аргументами reply_to и correlation_id
                    task = app.tasks[task_name]
                    #     logger.exception(f'{task_data, message.reply_to}')
                    task_data.append(reply_to)
                    task_data.append(correlation_id)

                    task.apply_async(args=task_data, queue='bot_queue')

            except Exception as e:
                logger.exception("Ошибка при обработке сообщения")

    async def start_consuming(self):
        await self.connect()
        await self.queue.consume(self.process_message)
        logger.info("Запуск асинхронного RabbitMQ Consumer для очереди bot_queue")

Конфиг файл с настройкой Celery:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

from djcore import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djcore.settings')

app = Celery('djcore')

app.conf.task_routes = {

    'bot.tasks.get_group_id': {'queue': 'bot_queue'},
    'bot.tasks.get_all_users_ids':{'queue': 'bot_queue'}
}

app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

Запускаю я сам воркер следующей командой:

celery -A djcore worker --loglevel=debug --pool=solo -Q bot_queue

А это логи Celery-воркера при получении проблемного запроса:

[2024-10-15 19:09:27,161: DEBUG/MainProcess] Received frame <Basic.Deliver object at 0x26fc1338e90> in channel #1 weight=71 on <Connection: "amqp://rabbitmq:******@localhost:5672//" at 0x26fc1440be0>
[2024-10-15 19:09:27,162: DEBUG/MainProcess] Received frame <pamqp.header.ContentHeader object at 0x0000026FC145AB50> in channel #1 weight=556 on <Connection: "amqp://rabbitmq:******@localhost:5672//" at 0x26fc1440be0>        
[2024-10-15 19:09:27,162: DEBUG/MainProcess] Received frame <pamqp.body.ContentBody object at 0x0000026FC1458890> in channel #1 weight=154 on <Connection: "amqp://rabbitmq:******@localhost:5672//" at 0x26fc1440be0>
[2024-10-15 19:09:27,162: DEBUG/MainProcess] Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x0
0<\x00P\x00\x00\x00\x00\x00\x00\x00\x01\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStor
e.__on_task_done.<locals>.remover() at C:\Users\homos\PycharmProjects\project\venv\Lib\site-packages\aiormq\base.py:33, <1 more>, Task.task_wakeup()]>)

Логи воркера при выполнении непроблемной задачи:

[2024-10-15 19:09:20,379: DEBUG/MainProcess] Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00-\x00
<\x00(\x00\x00\x0fdirect_exchange\x15django_response_queue\x01\xce\x02\x00\x01\x00\x00\x00Z\x00<\x00\x00\x00\x00\
x00\x00\x00\x00\x00\x0e<\x80\x00\x00\x00\x00\x01\x00$077052c4-74d3-41b9-9996-a13cafe3a0df 3bb10fc8e93a478e841493b
50d4ee60e\xce\x03\x00\x01\x00\x00\x00\x0e{"result": []}\xce', should_close=False, drain_future=<Future pending cb
=[FutureStore.__on_task_done.<locals>.remover() at C:\Users\homos\PycharmProjects\project\venv\Lib\site-packages\aiormq\base.py:33, <1 more>, Task.task_wakeup()]>)
[2024-10-15 19:09:20,381: DEBUG/MainProcess] Received frame <Basic.Ack object at 0x26fc14cdbd0> in channel #1 weight=21 on <Connection: "amqp://rabbitmq:******@localhost:5672//" at 0x26fc14cc0f0>
[2024-10-15 19:09:20,382: DEBUG/MainProcess] Closing connection <Connection: "amqp://rabbitmq:******@localhost:5672//" at 0x26fc14cc0f0> cause: <class 'asyncio.exceptions.CancelledError'>
[2024-10-15 19:09:20,383: DEBUG/MainProcess] Sending <Connection.Close object at 0x26fc14966f0> to <Connection: "amqp://rabbitmq:******@localhost:5672//" at 0x26fc14cc0f0>
[2024-10-15 19:09:20,383: DEBUG/MainProcess] Reader exited for <Connection: "amqp://rabbitmq:******@localhost:5672//" at 0x26fc14cc0f0>
[2024-10-15 19:09:20,384: DEBUG/MainProcess] Writer on connection amqp://rabbitmq:******@localhost:5672// closed 
[2024-10-15 19:09:20,384: DEBUG/MainProcess] Writer exited for <Connection: "amqp://rabbitmq:******@localhost:5672//" at 0x26fc14cc0f0>
[2024-10-15 19:09:20,387: DEBUG/MainProcess] Closing connection <Connection: "amqp://rabbitmq:******@localhost:5672//" at 0x26fc14cc0f0> cause: CancelledError()
[2024-10-15 19:09:20,389: INFO/MainProcess] Task bot.tasks.get_all_users_ids[dcfef7c6-ceb8-413f-8bda-1ebf832c389f] succeeded in 0.07799999999406282s: None

Если кто-нибудь знает, в чем проблема, помоги, пожалуйста... Заранее спасибо за все ответы

Вернуться на верх