Celery worker stops consuming tasks after some minutes for failed sync
On my django project I'm experiencing an issue with celery and redis as messages broker.
Current versions:
celery==5.3.6
redis==5.0.8
The problem can be described with this log:
2024-09-15 14:31:05 -------------- celery@5d4164b0472c v5.3.6 (emerald-rush)
2024-09-15 14:31:05 --- ***** -----
2024-09-15 14:31:05 -- ******* ---- Linux-6.10.0-linuxkit-aarch64-with-glibc2.31 2024-09-15 12:31:05
2024-09-15 14:31:05 - *** --- * ---
2024-09-15 14:31:05 - ** ---------- [config]
2024-09-15 14:31:05 - ** ---------- .> app: core:0xffff8a65b550
2024-09-15 14:31:05 - ** ---------- .> transport: redis://:**@redis-12345.c123.eu-west-3-1.ec2.redns.redis-cloud.com:00000//
2024-09-15 14:31:05 - ** ---------- .> results: redis://:**@redis-12345.c123.eu-west-3-1.ec2.redns.redis-cloud.com:00000/
2024-09-15 14:31:05 - *** --- * --- .> concurrency: 11 (solo)
2024-09-15 14:31:05 -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
2024-09-15 14:31:05 --- ***** -----
2024-09-15 14:31:05 -------------- [queues]
2024-09-15 14:31:05 .> celery exchange=celery(direct) key=celery
2024-09-15 14:31:05 .> celery:1 exchange=celery:1(direct) key=celery:1
2024-09-15 14:31:05 .> celery:2 exchange=celery:2(direct) key=celery:2
2024-09-15 14:31:05 .> celery:3 exchange=celery:3(direct) key=celery:3
2024-09-15 14:31:05
2024-09-15 14:31:05 [tasks]
2024-09-15 14:31:05 . apps.data_integration.tasks.celery_dataddo_process
2024-09-15 14:31:05 . apps.incrementality_experiment.tasks.geolift_market_match_task
2024-09-15 14:31:05
2024-09-15 14:31:05 [2024-09-15 12:31:05,926: INFO/MainProcess] Connected to redis://:**@redis-12345.c123.eu-west-3-1.ec2.redns.redis-cloud.com:00000//
2024-09-15 14:31:06 [2024-09-15 12:31:06,739: INFO/MainProcess] celery@5d4164b0472c ready.
2024-09-15 14:38:16 [2024-09-15 12:38:16,078: INFO/MainProcess] sync with celery@03ffef68e38c
2024-09-15 14:38:16 [2024-09-15 12:38:16,621: ERROR/MainProcess] Control command error: ValueError('not enough values to unpack (expected 3, got 1)')
2024-09-15 14:38:16 Traceback (most recent call last):
2024-09-15 14:38:16 File "/home/pippo/.local/lib/python3.11/site-packages/celery/worker/pidbox.py", line 44, in on_message
2024-09-15 14:38:16 self.node.handle_message(body, message)
2024-09-15 14:38:16 File "/home/pippo/.local/lib/python3.11/site-packages/kombu/pidbox.py", line 143, in handle_message
2024-09-15 14:38:16 return self.dispatch(**body)
2024-09-15 14:38:16 ^^^^^^^^^^^^^^^^^^^^^
2024-09-15 14:38:16 File "/home/pippo/.local/lib/python3.11/site-packages/kombu/pidbox.py", line 110, in dispatch
2024-09-15 14:38:16 self.reply({self.hostname: reply},
2024-09-15 14:38:16 File "/home/pippo/.local/lib/python3.11/site-packages/kombu/pidbox.py", line 147, in reply
2024-09-15 14:38:16 self.mailbox._publish_reply(data, exchange, routing_key, ticket,
2024-09-15 14:38:16 File "/home/pippo/.local/lib/python3.11/site-packages/kombu/pidbox.py", line 277, in _publish_reply
2024-09-15 14:38:16 producer.publish(
2024-09-15 14:38:16 File "/home/pippo/.local/lib/python3.11/site-packages/kombu/messaging.py", line 186, in publish
2024-09-15 14:38:16 return _publish(
2024-09-15 14:38:16 ^^^^^^^^^
2024-09-15 14:38:16 File "/home/pippo/.local/lib/python3.11/site-packages/kombu/connection.py", line 556, in _ensured
2024-09-15 14:38:16 return fun(*args, **kwargs)
2024-09-15 14:38:16 ^^^^^^^^^^^^^^^^^^^^
2024-09-15 14:38:16 File "/home/pippo/.local/lib/python3.11/site-packages/kombu/messaging.py", line 208, in _publish
2024-09-15 14:38:16 return channel.basic_publish(
2024-09-15 14:38:16 ^^^^^^^^^^^^^^^^^^^^^^
2024-09-15 14:38:16 File "/home/pippo/.local/lib/python3.11/site-packages/kombu/transport/virtual/base.py", line 610, in basic_publish
2024-09-15 14:38:16 return self.typeof(exchange).deliver(
2024-09-15 14:38:16 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-09-15 14:38:16 File "/home/pippo/.local/lib/python3.11/site-packages/kombu/transport/virtual/exchange.py", line 74, in deliver
2024-09-15 14:38:16 for queue in _lookup(exchange, routing_key):
2024-09-15 14:38:16 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-09-15 14:38:16 File "/home/pippo/.local/lib/python3.11/site-packages/kombu/transport/virtual/base.py", line 721, in _lookup
2024-09-15 14:38:16 R = self.typeof(exchange).lookup(
2024-09-15 14:38:16 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2024-09-15 14:38:16 File "/home/pippo/.local/lib/python3.11/site-packages/kombu/transport/virtual/exchange.py", line 66, in lookup
2024-09-15 14:38:16 return {
2024-09-15 14:38:16 ^
2024-09-15 14:38:16 File "/home/pippo/.local/lib/python3.11/site-packages/kombu/transport/virtual/exchange.py", line 67, in <setcomp>
2024-09-15 14:38:16 queue for rkey, _, queue in table
2024-09-15 14:38:16 ^^^^^^^^^^^^^^
2024-09-15 14:38:16 ValueError: not enough values to unpack (expected 3, got 1)
This is happening both in docker and in the aws system when I deploy. Right after the deployment or the compose of the container, if I give the system tasks to run it's elaborating them perfectly, but if I leave it unused for a couple of minutes its breaking as the log says.
I currently using this configuration on docker-compose:
celery:
<<: *app
command: celery -A core worker --without-mingle --without-gossip --without-heartbeat --pool=solo -l info -Q celery,celery:1,celery:2,celery:3
ports:
- 5554:5554
depends_on:
- db
- redis
and this configuration on celery.py:
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'core.settings')
# Initialize Celery application.
app = Celery('core')
# Load task-related settings from the Django project settings using the 'CELERY' namespace.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Ensure broker connection retries on startup for Celery 6.0 and above.
app.conf.broker_connection_retry_on_startup = True
# Broker transport options for message priority and task routing strategy.
app.conf.broker_transport_options = {
'priority_steps': list(range(10)),
'sep': ':',
'queue_order_strategy': 'priority',
}
# Limit task prefetching to one task at a time to balance worker load.
app.conf.worker_prefetch_multiplier = 1
app.conf.task_soft_time_limit = 1200 # 20 minutes
app.conf.task_time_limit = 1500 # 25 minutes, hard limit
app.conf.broker_heartbeat = 30 # Increase Redis heartbeat check interval
app.conf.broker_connection_timeout = 10 # Time in seconds to wait for connection
app.conf.broker_connection_retry_on_startup = True # Ensure retries
app.conf.task_acks_late = True # Acknowledge task only after completion
app.conf.task_reject_on_worker_lost = True # Reject task if worker is lost
app.conf.task_default_retry_delay = 300 # Retry after 5 minutes
app.conf.task_max_retries = 5 # Max retries for failed tasks
# Automatically discover tasks from installed apps (tasks.py).
app.autodiscover_tasks()