Django Celery: Kombu connection refused error
It seems a bit odd, maybe I'm missing something, but whenever I send tasks to celery queue it suddenly gives error:
AttributeError: 'ChannelPromise' object has no attribute 'value'
It works at first but if tasks are sent to the queue at a slightly higher frequency, it suddenly starts to give the above error. Looks like some kind of blocked process or something like that. Broker is aws sqs
Celery settings:
CELERY_RESULT_BACKEND = 'django-db'
CELERY_BROKER_URL = 'sqs://'
CELERY_BROKER_TRANSPORT_OPTIONS = {
'region' : 'us-south-1', #temp name
'visibility-timeout' : 3600,
'polling-interval' : 10
}
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Kolkata'
Packages:
celery = ">=5.0.5"
django = "==4.2.16"
kombu = "==5.4.2"
django-celery-beat = ">=2.0.0"
django-celery-results = ">=2.2.0"
Files: project/init.py
from .celery import app as celery_app
__all__ = ['celery_app']
project/celery.py
import os
from celery import Celery
from project.settings import settings
# set the default Django settings module for the 'celery' program.
#settings are kept inside a separate folder for multiple envs [project/settings/settings_prod.py,project/settings/settings_stag.py,project/settings/settings.py]
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'project.settings.settings')
app = Celery('project')
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
@app.task(bind=True)
def debug_task(self):
pass
Complete error trace:
kombu.exceptions.OperationalError: [Errno 111] Connection refused on next try in production. complete error trace: Traceback (most recent call last):
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/django/core/handlers/exception.py", line 55, in inner
response = get_response(request)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/django/core/handlers/base.py", line 197, in _get_response
response = wrapped_callback(request, *callback_args, **callback_kwargs)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/sentry_sdk/integrations/django/views.py", line 94, in sentry_wrapped_callback
return callback(request, *args, **kwargs)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/django/views/decorators/csrf.py", line 56, in wrapper_view
return view_func(*args, **kwargs)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/django/views/generic/base.py", line 104, in view
return self.dispatch(request, *args, **kwargs)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/rest_framework/views.py", line 509, in dispatch
response = self.handle_exception(exc)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/rest_framework/views.py", line 469, in handle_exception
self.raise_uncaught_exception(exc)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/rest_framework/views.py", line 480, in raise_uncaught_exception
raise exc
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/rest_framework/views.py", line 506, in dispatch
response = handler(request, *args, **kwargs)
File "/var/app/current/appname/views/views_user.py", line 230, in patch
my_task.delay()
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/celery/app/task.py", line 444, in delay
return self.apply_async(args, kwargs)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/sentry_sdk/integrations/celery/__init__.py", line 290, in apply_async
return f(*args, **kwargs)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/celery/app/task.py", line 594, in apply_async
return app.send_task(
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/sentry_sdk/integrations/celery/__init__.py", line 290, in apply_async
return f(*args, **kwargs)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/celery/app/base.py", line 801, in send_task
amqp.send_task_message(P, name, message, **options)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/celery/app/amqp.py", line 518, in send_task_message
ret = producer.publish(
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/sentry_sdk/utils.py", line 1788, in runner
return sentry_patched_function(*args, **kwargs)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/sentry_sdk/integrations/celery/__init__.py", line 527, in sentry_publish
return original_publish(self, *args, **kwargs)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/messaging.py", line 186, in publish
return _publish(
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/connection.py", line 556, in _ensured
return fun(*args, **kwargs)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/messaging.py", line 195, in _publish
channel = self.channel
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/messaging.py", line 218, in _get_channel
channel = self._channel = channel()
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/utils/functional.py", line 34, in __call__
value = self.__value__ = self.__contract__()
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/messaging.py", line 234, in <lambda>
channel = ChannelPromise(lambda: connection.default_channel)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/connection.py", line 953, in default_channel
self._ensure_connection(**conn_opts)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/connection.py", line 459, in _ensure_connection
return retry_over_time(
File "/usr/lib64/python3.8/contextlib.py", line 131, in __exit__
self.gen.throw(type, value, traceback)
File "/var/app/venv/staging-LQM1lest/lib64/python3.8/site-packages/kombu/connection.py", line 476, in _reraise_as_library_errors
raise ConnectionError(str(exc)) from exc
kombu.exceptions.OperationalError: [Errno 111] Connection refused