Проблема в приложении django для использования rabbitmqp в качестве брокера и celery для выполнения простой задачи

Здравствуйте, я пытаюсь узнать, как настроить приложение django для работы с сельдереем. Я запускаю rabbitmq на docker desktop на машине windows с помощью этой команды :

docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq

затем я запускаю celery с помощью этой команды и контролирую его с помощью flower :

celery -A a_shop worker -l Info   
celery -A a_shop flower

в главном файле init приложения django у меня есть следующее :

from .celery import app as celery_app

__all__ = ('celery_app',)

и для настройки сельдерея в файле setting.py у меня есть такие настройки :

CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//'
CELERY_RESULT_BACKEND = 'rpc://'
CELERY_TASK_ACKS_LATE = True
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
CELERY_TASK_SOFT_TIME_LIMIT = 600  # 10 minutes
CELERY_TASK_TIME_LIMIT = 1200
CELERY_IMPORTS = ('orders.task',)
CELERY_TASK_DEFAULT_QUEUE = 'celery'

я создал файл celery.py в основном приложении с такими кодами :

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'a_shop.settings')

app = Celery('a_shop')

app.config_from_object('django.conf:settings', namespace='CELERY')


app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    print('Request: {self.request!r}')

я создаю простую задачу для тестирования :

import logging
from celery import shared_task
from .models import Order
logger = logging.getLogger(__name__)


@shared_task
def order_created(order_id):
    logger.info(f"Task order_created started with order_id: {order_id}")
    try:
        order = Order.objects.get(id=order_id)
        logger.info(f"Processing order: {order.name}")
        print(order_id)
    except Exception as e:
        logger.error(f"Error processing order {order_id}: {e}")
        raise
    logger.info(f"Task order_created completed with order_id: {order_id}")
    return order_id

я решаю задачу в одном из своих представлений с помощью :

order_created.delay(new_order.id)

я вижу задачу resived в консоли celery :

[2024-06-20 15:59:43,220: INFO/MainProcess] Task orders.task.order_created[dbbcafd1-55f8-4e86-ae48-1c68fda5c75f] received
[2024-06-20 15:59:44,289: INFO/SpawnPoolWorker-9] child process 10608 calling self.run()
[2024-06-20 15:59:44,291: INFO/SpawnPoolWorker-10] child process 744 calling self.run()

но через некоторое время я получаю эту ошибку на консоли :

[2024-06-20 05:01:48,262: CRITICAL/MainProcess] Unrecoverable error: PreconditionFailed(406, 'PRECONDITION_FAILED - delivery
acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers d
oc guide to learn more', (0, 0), '')
Traceback (most recent call last):
File "E:\Projects\app\a_shop\Lib\site-packages\celery\worker\worker.py", line 203, in start
self.blueprint.start(self)
File "E:\Projects\app\a_shop\Lib\site-packages\celery\bootsteps.py", line 116, in start
step.start(parent)
File "E:\Projects\app\a_shop\Lib\site-packages\celery\bootsteps.py", line 365, in start
return self.obj.start()
^^^^^^^^^^^^^^^^
File "E:\Projects\app\a_shop\Lib\site-packages\celery\worker\consumer\consumer.py", line 332, in start
blueprint.start(self)
File "E:\Projects\app\a_shop\Lib\site-packages\celery\bootsteps.py", line 116, in start
step.start(parent)
File "E:\Projects\app\a_shop\Lib\site-packages\celery\worker\consumer\consumer.py", line 628, in start
c.loop(*c.loop_args())
File "E:\Projects\app\a_shop\Lib\site-packages\celery\worker\loops.py", line 130, in synloop
connection.drain_events(timeout=2.0)
File "E:\Projects\app\a_shop\Lib\site-packages\kombu\connection.py", line 341, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "E:\Projects\app\a_shop\Lib\site-packages\kombu\transport\pyamqp.py", line 171, in drain_events
return connection.drain_events(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "E:\Projects\app\a_shop\Lib\site-packages\amqp\connection.py", line 526, in drain_events
while not self.blocking_read(timeout):
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "E:\Projects\app\a_shop\Lib\site-packages\amqp\connection.py", line 532, in blocking_read
return self.on_inbound_frame(frame)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "E:\Projects\app\a_shop\Lib\site-packages\amqp\method_framing.py", line 53, in on_frame
callback(channel, method_sig, buf, None)
File "E:\Projects\app\a_shop\Lib\site-packages\amqp\connection.py", line 538, in on_inbound_method
return self.channels[channel_id].dispatch_method(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "E:\Projects\app\a_shop\Lib\site-packages\amqp\abstract_channel.py", line 156, in dispatch_method
listener(*args)
File "E:\Projects\app\a_shop\Lib\site-packages\amqp\channel.py", line 293, in _on_close
raise error_for_code(
amqp.exceptions.PreconditionFailed: (0, 0): (406) PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Time
out value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more

где проблема и как я могу это исправить спасибо большое

Мое предположение заключается в том, что с вызовом базы данных и фиксацией в вашем случае ничего не происходит.

попробуйте использовать delay_on_commit

- order_created.delay(new_order.id)
+ order_created.delay_on_commit(new_order.id)

Для получения дополнительной информации об этом вы можете обратиться к документации

https://docs.celeryq.dev/en/latest/django/first-steps-with-django.html#using-the-shared-task-decorator

Вернуться на верх