Django Celery: Kombu connection refused error

It seems a bit odd, maybe I'm missing something, but whenever I send tasks to celery queue it suddenly gives error:

AttributeError: 'ChannelPromise' object has no attribute 'value'

It works at first but if tasks are sent to the queue at a slightly higher frequency, it suddenly starts to give the above error. Looks like some kind of blocked process or something like that. Broker is aws sqs

Celery settings:

CELERY_RESULT_BACKEND           = 'django-db'
CELERY_BROKER_URL               = 'sqs://'
CELERY_BROKER_TRANSPORT_OPTIONS = {
    'region' : 'us-south-1', #temp name
    'visibility-timeout' : 3600,
    'polling-interval' : 10
}
CELERY_ACCEPT_CONTENT           = ['application/json']
CELERY_TASK_SERIALIZER          = 'json'
CELERY_RESULT_SERIALIZER        = 'json'
CELERY_TIMEZONE                 = 'Asia/Kolkata'

Packages:

celery = ">=5.0.5" 
django = "==4.2.16" 
kombu = "==5.4.2"
django-celery-beat = ">=2.0.0" 
django-celery-results = ">=2.2.0"

Files: project/init.py

from .celery import app as celery_app

__all__ = ['celery_app']

project/celery.py

import os

from celery import Celery

from project.settings import settings

# set the default Django settings module for the 'celery' program.
#settings are kept inside a separate folder for multiple envs [project/settings/settings_prod.py,project/settings/settings_stag.py,project/settings/settings.py]
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project.settings.settings')

app = Celery('project')

app.config_from_object('django.conf:settings', namespace='CELERY')

# Load task modules from all registered Django app configs.
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)


@app.task(bind=True)
def debug_task(self):
    pass

Complete error trace:

kombu.exceptions.OperationalError: [Errno 111] Connection refused on next try in production. complete error trace: Traceback (most recent call last):
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/django/core/handlers/exception.py", line 55, in inner
    response = get_response(request)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/django/core/handlers/base.py", line 197, in _get_response
    response = wrapped_callback(request, *callback_args, **callback_kwargs)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/sentry_sdk/integrations/django/views.py", line 94, in sentry_wrapped_callback
    return callback(request, *args, **kwargs)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/django/views/decorators/csrf.py", line 56, in wrapper_view
    return view_func(*args, **kwargs)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/django/views/generic/base.py", line 104, in view
    return self.dispatch(request, *args, **kwargs)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/rest_framework/views.py", line 509, in dispatch
    response = self.handle_exception(exc)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/rest_framework/views.py", line 469, in handle_exception
    self.raise_uncaught_exception(exc)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/rest_framework/views.py", line 480, in raise_uncaught_exception
    raise exc
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/rest_framework/views.py", line 506, in dispatch
    response = handler(request, *args, **kwargs)
  File "/var/app/current/appname/views/views_user.py", line 230, in patch
    my_task.delay()
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/celery/app/task.py", line 444, in delay
    return self.apply_async(args, kwargs)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/sentry_sdk/integrations/celery/__init__.py", line 290, in apply_async
    return f(*args, **kwargs)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/celery/app/task.py", line 594, in apply_async
    return app.send_task(
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/sentry_sdk/integrations/celery/__init__.py", line 290, in apply_async
    return f(*args, **kwargs)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/celery/app/base.py", line 801, in send_task
    amqp.send_task_message(P, name, message, **options)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/celery/app/amqp.py", line 518, in send_task_message
    ret = producer.publish(
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/sentry_sdk/utils.py", line 1788, in runner
    return sentry_patched_function(*args, **kwargs)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/sentry_sdk/integrations/celery/__init__.py", line 527, in sentry_publish
    return original_publish(self, *args, **kwargs)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/messaging.py", line 186, in publish
    return _publish(
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/connection.py", line 556, in _ensured
    return fun(*args, **kwargs)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/messaging.py", line 195, in _publish
    channel = self.channel
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/messaging.py", line 218, in _get_channel
    channel = self._channel = channel()
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/utils/functional.py", line 34, in __call__
    value = self.__value__ = self.__contract__()
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/messaging.py", line 234, in <lambda>
    channel = ChannelPromise(lambda: connection.default_channel)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/connection.py", line 953, in default_channel
    self._ensure_connection(**conn_opts)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/connection.py", line 459, in _ensure_connection
    return retry_over_time(
  File "/usr/lib64/python3.8/contextlib.py", line 131, in __exit__
    self.gen.throw(type, value, traceback)
  File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/connection.py", line 476, in _reraise_as_library_errors
    raise ConnectionError(str(exc)) from exc
kombu.exceptions.OperationalError: [Errno 111] Connection refused
Вернуться на верх