Проблема с выполнением Celery задач в Django проекте: первая задача выполняется, вторая — нет
Концепция работы проекта: Есть django-project, он поднят локально, в нем есть 2 приложения, но виновник события - приложение с базой данных. В рамках нее, а точнее в рамках таблиц прописаны методы по доступу к этим таблицам, они же являются задачами Celery. Celery связан с брокером сообщений RabbitMQ, вместе с запуском Django-проекта запускается RabbitMQConsumer, который прослушивает приходящие на очередь bot_queue сообщения и ставит на исполнение задачи Celery. Эти сообщения содержат имя задачи Celery (именно так как она зарегистрирована в самом Celery) и аргументы для вызываемого задачей метода. Запросы к базе данных формирует и отправляет в брокер телеграм-бот.
Я столкнулся со следующей проблемой: запуск бота сопровождается отправкой запроса в БД на получение id всех пользователей этого бота, данный запрос принимается и завершается успешно. Второй же запрос связан с процессом регистрации пользователя, и вот с ним начинаются проблемы. Сам запрос отправляется и доходит до Conumer-а, более того, имя задачи и аргументы поступают правильные, но вот когда дело доходит до исполнения задачи Celery, вот тут все и ломается. По каким-то причинам задача не исполняется (логи в самом методе не выводятся). Пока пытался сам разобраться с проблемой, столкнулся с интересными особенностями, если передавать в этот проблемный второй метод неправильное количество аргументов, то задача попытается выполнить данный метод, но выведет ошибку, а если передавать нужное количество аргументов, то задача почему-то вообще не будет выполняться)) А если повторить запрос от бота, то выполнится задача с аргументами, которые были отправлены ранее, а новый запрос так же останется висеть хрен пойми где.
Проблемный метод:
@app.task(name='bot.tasks.get_group_id', bind = True)
def get_group_id(self, group_number, reply_to, correlation_id):
logger.info(
f"Задача get_group_id запущена с параметрами: group_number={group_number}, reply_to={reply_to}, correlation_id={correlation_id}")
try:
logger.info(f"Пытаемся найти группу с номером: {group_number}")
group = Group.objects.get(group_number=group_number)
group_id = group.id
logger.info(f"Группа найдена: {group_id}")
except ObjectDoesNotExist:
group_id = None
logger.error(f"Группа с номером {group_number} не найдена")
raise ValueError(f"Group number {group_number} not found")
finally:
result = {'result': group_id}
logger.info(f"Отправляем результат: {result}")
asyncio.run(send_response(result, reply_to, correlation_id))
Функция send_response отправляет обратно в брокер результат выполнения метода, reply_to и correlation_id - это те аргументы, которые поступили в джанго вместе с запросом от телеграм бота. На стороне самого Teleragm-бота реализована фильтрация по correlation_id, чтобы ответы поступали туда же, откуда они вызывались.
Сама функция send_response:
import json
import logging
import os
import sys
import aio_pika
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
# Устанавливаем переменную окружения для настроек Django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djcore.settings')
# Инициализация Django (если нужно)
from djcore.settings import RABBITMQ_USER, RABBITMQ_PASSWORD, RABBITMQ_HOST, RABBITMQ_PORT
import json
import logging
import aio_pika
import asyncio
async def send_response(result, reply_to, correlation_id):
connection = await aio_pika.connect_robust(
host=RABBITMQ_HOST,
port=RABBITMQ_PORT,
login=RABBITMQ_USER,
password=RABBITMQ_PASSWORD
)
async with connection:
channel = await connection.channel()
# Объявляем очередь для reply_to
queue = await channel.declare_queue(reply_to, durable=True)
exchange = await channel.declare_exchange('direct_exchange', aio_pika.ExchangeType.DIRECT, durable=True)
await queue.bind(exchange, routing_key=reply_to)
# Формируем и отправляем сообщение с результатом
message = aio_pika.Message(
body=json.dumps(result).encode(),
correlation_id=correlation_id
)
logging.info(f'Очередь ответов {reply_to}\nСам ответ - {result}')
# Публикуем сообщение с использованием reply_to в качестве routing_key
await exchange.publish(
message,
routing_key=reply_to
)
Класс RabbitMQConsumer
import os
import sys
import aio_pika
import json
import logging
# Добавляем путь к корневой директории вашего проекта в sys.path
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.append(BASE_DIR)
# Устанавливаем переменную окружения для настроек Django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djcore.settings')
# Инициализация Django (если нужно)
from djcore.settings import RABBITMQ_USER, RABBITMQ_PASSWORD, RABBITMQ_HOST, RABBITMQ_PORT
from djcore.celery_app import app
logger = logging.getLogger(__name__)
class RabbitMQConsumer:
def __init__(self):
self.connection = None
self.channel = None
self.exchange = None
self.queue = None
async def connect(self):
self.connection = await aio_pika.connect_robust(
host=RABBITMQ_HOST,
port=RABBITMQ_PORT,
login=RABBITMQ_USER,
password=RABBITMQ_PASSWORD
)
self.channel = await self.connection.channel()
# Объявляем exchange
self.exchange = await self.channel.declare_exchange('direct_exchange', aio_pika.ExchangeType.DIRECT, durable=True)
# Объявляем очередь
self.queue = await self.channel.declare_queue('bot_queue', durable=True)
# Связываем очередь с exchange и routing_key
await self.queue.bind(self.exchange, routing_key='bot_queue')
async def process_message(self, message: aio_pika.IncomingMessage):
async with message.process():
try:
body = message.body.decode()
data = json.loads(body)
if isinstance(data, dict) and 'task' in data and 'data' in data:
task_name = data['task']
task_data = data['data']
reply_to = message.reply_to
correlation_id = message.correlation_id
#logger.exception(f"Получена задача: {task_name} с данными: {reply_to, correlation_id}")
if task_name not in app.tasks.keys():
logger.error(f"Задача {task_name} не найдена в Celery.")
raise ValueError("Задача не найдена в Celery")
# Отправляем задачу в Celery с аргументами reply_to и correlation_id
task = app.tasks[task_name]
# logger.exception(f'{task_data, message.reply_to}')
task_data.append(reply_to)
task_data.append(correlation_id)
task.apply_async(args=task_data, queue='bot_queue')
except Exception as e:
logger.exception("Ошибка при обработке сообщения")
async def start_consuming(self):
await self.connect()
await self.queue.consume(self.process_message)
logger.info("Запуск асинхронного RabbitMQ Consumer для очереди bot_queue")
Конфиг файл с настройкой Celery:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from djcore import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djcore.settings')
app = Celery('djcore')
app.conf.task_routes = {
'bot.tasks.get_group_id': {'queue': 'bot_queue'},
'bot.tasks.get_all_users_ids':{'queue': 'bot_queue'}
}
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
Запускаю я сам воркер следующей командой:
celery -A djcore worker --loglevel=debug --pool=solo -Q bot_queue
А это логи Celery-воркера при получении проблемного запроса:
[2024-10-15 19:09:27,161: DEBUG/MainProcess] Received frame <Basic.Deliver object at 0x26fc1338e90> in channel #1 weight=71 on <Connection: "amqp://rabbitmq:******@localhost:5672//" at 0x26fc1440be0>
[2024-10-15 19:09:27,162: DEBUG/MainProcess] Received frame <pamqp.header.ContentHeader object at 0x0000026FC145AB50> in channel #1 weight=556 on <Connection: "amqp://rabbitmq:******@localhost:5672//" at 0x26fc1440be0>
[2024-10-15 19:09:27,162: DEBUG/MainProcess] Received frame <pamqp.body.ContentBody object at 0x0000026FC1458890> in channel #1 weight=154 on <Connection: "amqp://rabbitmq:******@localhost:5672//" at 0x26fc1440be0>
[2024-10-15 19:09:27,162: DEBUG/MainProcess] Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x0
0<\x00P\x00\x00\x00\x00\x00\x00\x00\x01\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStor
e.__on_task_done.<locals>.remover() at C:\Users\homos\PycharmProjects\project\venv\Lib\site-packages\aiormq\base.py:33, <1 more>, Task.task_wakeup()]>)
Логи воркера при выполнении непроблемной задачи:
[2024-10-15 19:09:20,379: DEBUG/MainProcess] Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00-\x00
<\x00(\x00\x00\x0fdirect_exchange\x15django_response_queue\x01\xce\x02\x00\x01\x00\x00\x00Z\x00<\x00\x00\x00\x00\
x00\x00\x00\x00\x00\x0e<\x80\x00\x00\x00\x00\x01\x00$077052c4-74d3-41b9-9996-a13cafe3a0df 3bb10fc8e93a478e841493b
50d4ee60e\xce\x03\x00\x01\x00\x00\x00\x0e{"result": []}\xce', should_close=False, drain_future=<Future pending cb
=[FutureStore.__on_task_done.<locals>.remover() at C:\Users\homos\PycharmProjects\project\venv\Lib\site-packages\aiormq\base.py:33, <1 more>, Task.task_wakeup()]>)
[2024-10-15 19:09:20,381: DEBUG/MainProcess] Received frame <Basic.Ack object at 0x26fc14cdbd0> in channel #1 weight=21 on <Connection: "amqp://rabbitmq:******@localhost:5672//" at 0x26fc14cc0f0>
[2024-10-15 19:09:20,382: DEBUG/MainProcess] Closing connection <Connection: "amqp://rabbitmq:******@localhost:5672//" at 0x26fc14cc0f0> cause: <class 'asyncio.exceptions.CancelledError'>
[2024-10-15 19:09:20,383: DEBUG/MainProcess] Sending <Connection.Close object at 0x26fc14966f0> to <Connection: "amqp://rabbitmq:******@localhost:5672//" at 0x26fc14cc0f0>
[2024-10-15 19:09:20,383: DEBUG/MainProcess] Reader exited for <Connection: "amqp://rabbitmq:******@localhost:5672//" at 0x26fc14cc0f0>
[2024-10-15 19:09:20,384: DEBUG/MainProcess] Writer on connection amqp://rabbitmq:******@localhost:5672// closed
[2024-10-15 19:09:20,384: DEBUG/MainProcess] Writer exited for <Connection: "amqp://rabbitmq:******@localhost:5672//" at 0x26fc14cc0f0>
[2024-10-15 19:09:20,387: DEBUG/MainProcess] Closing connection <Connection: "amqp://rabbitmq:******@localhost:5672//" at 0x26fc14cc0f0> cause: CancelledError()
[2024-10-15 19:09:20,389: INFO/MainProcess] Task bot.tasks.get_all_users_ids[dcfef7c6-ceb8-413f-8bda-1ebf832c389f] succeeded in 0.07799999999406282s: None
Если кто-нибудь знает, в чем проблема, помоги, пожалуйста... Заранее спасибо за все ответы