Проблема в приложении django для использования rabbitmqp в качестве брокера и celery для выполнения простой задачи
Здравствуйте, я пытаюсь узнать, как настроить приложение django для работы с сельдереем. Я запускаю rabbitmq на docker desktop на машине windows с помощью этой команды :
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq
затем я запускаю celery с помощью этой команды и контролирую его с помощью flower :
celery -A a_shop worker -l Info
celery -A a_shop flower
в главном файле init приложения django у меня есть следующее :
from .celery import app as celery_app
__all__ = ('celery_app',)
и для настройки сельдерея в файле setting.py у меня есть такие настройки :
CELERY_BROKER_URL = 'amqp://guest:guest@localhost:5672//'
CELERY_RESULT_BACKEND = 'rpc://'
CELERY_TASK_ACKS_LATE = True
CELERY_WORKER_PREFETCH_MULTIPLIER = 1
CELERY_TASK_SOFT_TIME_LIMIT = 600 # 10 minutes
CELERY_TASK_TIME_LIMIT = 1200
CELERY_IMPORTS = ('orders.task',)
CELERY_TASK_DEFAULT_QUEUE = 'celery'
я создал файл celery.py в основном приложении с такими кодами :
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'a_shop.settings')
app = Celery('a_shop')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
print('Request: {self.request!r}')
я создаю простую задачу для тестирования :
import logging
from celery import shared_task
from .models import Order
logger = logging.getLogger(__name__)
@shared_task
def order_created(order_id):
logger.info(f"Task order_created started with order_id: {order_id}")
try:
order = Order.objects.get(id=order_id)
logger.info(f"Processing order: {order.name}")
print(order_id)
except Exception as e:
logger.error(f"Error processing order {order_id}: {e}")
raise
logger.info(f"Task order_created completed with order_id: {order_id}")
return order_id
я решаю задачу в одном из своих представлений с помощью :
order_created.delay(new_order.id)
я вижу задачу resived в консоли celery :
[2024-06-20 15:59:43,220: INFO/MainProcess] Task orders.task.order_created[dbbcafd1-55f8-4e86-ae48-1c68fda5c75f] received
[2024-06-20 15:59:44,289: INFO/SpawnPoolWorker-9] child process 10608 calling self.run()
[2024-06-20 15:59:44,291: INFO/SpawnPoolWorker-10] child process 744 calling self.run()
но через некоторое время я получаю эту ошибку на консоли :
[2024-06-20 05:01:48,262: CRITICAL/MainProcess] Unrecoverable error: PreconditionFailed(406, 'PRECONDITION_FAILED - delivery
acknowledgement on channel 1 timed out. Timeout value used: 1800000 ms. This timeout value can be configured, see consumers d
oc guide to learn more', (0, 0), '')
Traceback (most recent call last):
File "E:\Projects\app\a_shop\Lib\site-packages\celery\worker\worker.py", line 203, in start
self.blueprint.start(self)
File "E:\Projects\app\a_shop\Lib\site-packages\celery\bootsteps.py", line 116, in start
step.start(parent)
File "E:\Projects\app\a_shop\Lib\site-packages\celery\bootsteps.py", line 365, in start
return self.obj.start()
^^^^^^^^^^^^^^^^
File "E:\Projects\app\a_shop\Lib\site-packages\celery\worker\consumer\consumer.py", line 332, in start
blueprint.start(self)
File "E:\Projects\app\a_shop\Lib\site-packages\celery\bootsteps.py", line 116, in start
step.start(parent)
File "E:\Projects\app\a_shop\Lib\site-packages\celery\worker\consumer\consumer.py", line 628, in start
c.loop(*c.loop_args())
File "E:\Projects\app\a_shop\Lib\site-packages\celery\worker\loops.py", line 130, in synloop
connection.drain_events(timeout=2.0)
File "E:\Projects\app\a_shop\Lib\site-packages\kombu\connection.py", line 341, in drain_events
return self.transport.drain_events(self.connection, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "E:\Projects\app\a_shop\Lib\site-packages\kombu\transport\pyamqp.py", line 171, in drain_events
return connection.drain_events(**kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "E:\Projects\app\a_shop\Lib\site-packages\amqp\connection.py", line 526, in drain_events
while not self.blocking_read(timeout):
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "E:\Projects\app\a_shop\Lib\site-packages\amqp\connection.py", line 532, in blocking_read
return self.on_inbound_frame(frame)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "E:\Projects\app\a_shop\Lib\site-packages\amqp\method_framing.py", line 53, in on_frame
callback(channel, method_sig, buf, None)
File "E:\Projects\app\a_shop\Lib\site-packages\amqp\connection.py", line 538, in on_inbound_method
return self.channels[channel_id].dispatch_method(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "E:\Projects\app\a_shop\Lib\site-packages\amqp\abstract_channel.py", line 156, in dispatch_method
listener(*args)
File "E:\Projects\app\a_shop\Lib\site-packages\amqp\channel.py", line 293, in _on_close
raise error_for_code(
amqp.exceptions.PreconditionFailed: (0, 0): (406) PRECONDITION_FAILED - delivery acknowledgement on channel 1 timed out. Time
out value used: 1800000 ms. This timeout value can be configured, see consumers doc guide to learn more
где проблема и как я могу это исправить спасибо большое
Мое предположение заключается в том, что с вызовом базы данных и фиксацией в вашем случае ничего не происходит.
попробуйте использовать delay_on_commit
- order_created.delay(new_order.id)
+ order_created.delay_on_commit(new_order.id)
Для получения дополнительной информации об этом вы можете обратиться к документации