Celery worker stops consuming tasks after some minutes for failed sync

On my django project I'm experiencing an issue with celery and redis as messages broker.

Current versions: 
celery==5.3.6
redis==5.0.8

The problem can be described with this log:

2024-09-15 14:31:05  -------------- celery@5d4164b0472c v5.3.6 (emerald-rush)
2024-09-15 14:31:05 --- ***** ----- 
2024-09-15 14:31:05 -- ******* ---- Linux-6.10.0-linuxkit-aarch64-with-glibc2.31 2024-09-15 12:31:05
2024-09-15 14:31:05 - *** --- * --- 
2024-09-15 14:31:05 - ** ---------- [config]
2024-09-15 14:31:05 - ** ---------- .> app:         core:0xffff8a65b550
2024-09-15 14:31:05 - ** ---------- .> transport:   redis://:**@redis-12345.c123.eu-west-3-1.ec2.redns.redis-cloud.com:00000//
2024-09-15 14:31:05 - ** ---------- .> results:     redis://:**@redis-12345.c123.eu-west-3-1.ec2.redns.redis-cloud.com:00000/
2024-09-15 14:31:05 - *** --- * --- .> concurrency: 11 (solo)
2024-09-15 14:31:05 -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
2024-09-15 14:31:05 --- ***** ----- 
2024-09-15 14:31:05  -------------- [queues]
2024-09-15 14:31:05                 .> celery           exchange=celery(direct) key=celery
2024-09-15 14:31:05                 .> celery:1         exchange=celery:1(direct) key=celery:1
2024-09-15 14:31:05                 .> celery:2         exchange=celery:2(direct) key=celery:2
2024-09-15 14:31:05                 .> celery:3         exchange=celery:3(direct) key=celery:3
2024-09-15 14:31:05 
2024-09-15 14:31:05 [tasks]
2024-09-15 14:31:05   . apps.data_integration.tasks.celery_dataddo_process
2024-09-15 14:31:05   . apps.incrementality_experiment.tasks.geolift_market_match_task
2024-09-15 14:31:05 
2024-09-15 14:31:05 [2024-09-15 12:31:05,926: INFO/MainProcess] Connected to redis://:**@redis-12345.c123.eu-west-3-1.ec2.redns.redis-cloud.com:00000//
2024-09-15 14:31:06 [2024-09-15 12:31:06,739: INFO/MainProcess] celery@5d4164b0472c ready.
2024-09-15 14:38:16 [2024-09-15 12:38:16,078: INFO/MainProcess] sync with celery@03ffef68e38c
2024-09-15 14:38:16 [2024-09-15 12:38:16,621: ERROR/MainProcess] Control command error: ValueError('not enough values to unpack (expected 3, got 1)')
2024-09-15 14:38:16 Traceback (most recent call last):
2024-09-15 14:38:16   File "/home/pippo/.local/lib/python3.11/site-packages/celery/worker/pidbox.py", line 44, in on_message
2024-09-15 14:38:16     self.node.handle_message(body, message)
2024-09-15 14:38:16   File "/home/pippo/.local/lib/python3.11/site-packages/kombu/pidbox.py", line 143, in handle_message
2024-09-15 14:38:16     return self.dispatch(**body)
2024-09-15 14:38:16            ^^^^^^^^^^^^^^^^^^^^^
2024-09-15 14:38:16   File "/home/pippo/.local/lib/python3.11/site-packages/kombu/pidbox.py", line 110, in dispatch
2024-09-15 14:38:16     self.reply({self.hostname: reply},
2024-09-15 14:38:16   File "/home/pippo/.local/lib/python3.11/site-packages/kombu/pidbox.py", line 147, in reply
2024-09-15 14:38:16     self.mailbox._publish_reply(data, exchange, routing_key, ticket,
2024-09-15 14:38:16   File "/home/pippo/.local/lib/python3.11/site-packages/kombu/pidbox.py", line 277, in _publish_reply
2024-09-15 14:38:16     producer.publish(
2024-09-15 14:38:16   File "/home/pippo/.local/lib/python3.11/site-packages/kombu/messaging.py", line 186, in publish
2024-09-15 14:38:16     return _publish(
2024-09-15 14:38:16            ^^^^^^^^^
2024-09-15 14:38:16   File "/home/pippo/.local/lib/python3.11/site-packages/kombu/connection.py", line 556, in _ensured
2024-09-15 14:38:16     return fun(*args, **kwargs)
2024-09-15 14:38:16            ^^^^^^^^^^^^^^^^^^^^
2024-09-15 14:38:16   File "/home/pippo/.local/lib/python3.11/site-packages/kombu/messaging.py", line 208, in _publish
2024-09-15 14:38:16     return channel.basic_publish(
2024-09-15 14:38:16            ^^^^^^^^^^^^^^^^^^^^^^
2024-09-15 14:38:16   File "/home/pippo/.local/lib/python3.11/site-packages/kombu/transport/virtual/base.py", line 610, in basic_publish
2024-09-15 14:38:16     return self.typeof(exchange).deliver(
2024-09-15 14:38:16            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-09-15 14:38:16   File "/home/pippo/.local/lib/python3.11/site-packages/kombu/transport/virtual/exchange.py", line 74, in deliver
2024-09-15 14:38:16     for queue in _lookup(exchange, routing_key):
2024-09-15 14:38:16                  ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-09-15 14:38:16   File "/home/pippo/.local/lib/python3.11/site-packages/kombu/transport/virtual/base.py", line 721, in _lookup
2024-09-15 14:38:16     R = self.typeof(exchange).lookup(
2024-09-15 14:38:16         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-09-15 14:38:16   File "/home/pippo/.local/lib/python3.11/site-packages/kombu/transport/virtual/exchange.py", line 66, in lookup
2024-09-15 14:38:16     return {
2024-09-15 14:38:16            ^
2024-09-15 14:38:16   File "/home/pippo/.local/lib/python3.11/site-packages/kombu/transport/virtual/exchange.py", line 67, in <setcomp>
2024-09-15 14:38:16     queue for rkey, _, queue in table
2024-09-15 14:38:16               ^^^^^^^^^^^^^^
2024-09-15 14:38:16 ValueError: not enough values to unpack (expected 3, got 1)

This is happening both in docker and in the aws system when I deploy. Right after the deployment or the compose of the container, if I give the system tasks to run it's elaborating them perfectly, but if I leave it unused for a couple of minutes its breaking as the log says.

I currently using this configuration on docker-compose:

celery:
    <<: *app
    command: celery -A core worker --without-mingle --without-gossip --without-heartbeat --pool=solo -l info -Q celery,celery:1,celery:2,celery:3
    ports:
        - 5554:5554
    depends_on:
        - db
        - redis

and this configuration on celery.py:

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'core.settings')

# Initialize Celery application.
app = Celery('core')

# Load task-related settings from the Django project settings using the 'CELERY' namespace.
app.config_from_object('django.conf:settings', namespace='CELERY')

# Ensure broker connection retries on startup for Celery 6.0 and above.
app.conf.broker_connection_retry_on_startup = True

# Broker transport options for message priority and task routing strategy.
app.conf.broker_transport_options = {
    'priority_steps': list(range(10)),
    'sep': ':',
    'queue_order_strategy': 'priority',
}

# Limit task prefetching to one task at a time to balance worker load.
app.conf.worker_prefetch_multiplier = 1

app.conf.task_soft_time_limit = 1200  # 20 minutes
app.conf.task_time_limit = 1500       # 25 minutes, hard limit

app.conf.broker_heartbeat = 30  # Increase Redis heartbeat check interval
app.conf.broker_connection_timeout = 10  # Time in seconds to wait for connection
app.conf.broker_connection_retry_on_startup = True  # Ensure retries

app.conf.task_acks_late = True  # Acknowledge task only after completion
app.conf.task_reject_on_worker_lost = True  # Reject task if worker is lost
app.conf.task_default_retry_delay = 300  # Retry after 5 minutes
app.conf.task_max_retries = 5  # Max retries for failed tasks

# Automatically discover tasks from installed apps (tasks.py).
app.autodiscover_tasks()
Back to Top