Background daemon in celery without task explosions

I'm always having a hassle trying to have celery jobs scan the database for work, but to not have things done twice or submission explosions. The issue resides in that:

  • periodic celery-beat jobs just keep submitting, even if there already are many added to the queue, and there's no way to cap this. E.g. have at most 1 of these in the queue or active.
  • Unless you use celery inspect to test if a job already exists, but those calls can easily block for too long, I feel this particularly happens if workers are busy.
  • This then can cause a ton of inspects being stacked up.

This can cause job explosions, e.g. when beat keeps submitting but the worker is busy or down for a while, and once it comes up again, it a gazillion of them push through.

What I'm looking for is a daemon-like process or thread that simply checks for things from the database that need processing, and that will wait if none are there (or query ever 30' or something like that). In this case, if the workload is too high, it doesn't particularly make sense to schedule the same thing a gazillion times.

Keeping track in the database of what has been submitted and what not also is a bit tricky, because a job might get killed (e.g. oom) before the unflagging is done.

One could easily program something like that, but it feels dirty to move away from frameworks like celery for background processes. On the other hand, celery seems unable to allow me to limit how often something gets submitted in a sane way.

What is a robust way, using celery or not, to have something like that running in the background without explosions or spurious inspect blocks?

from celery import Celery
app = Celery(...)
app.autodiscover_tasks()


def not_already_enqueued(pdf_id):
    # celery inspect magic on registered/active/reserved
    # unfortunately spuriously blocks a lot
    return True|False

@app.task
def create_pages_for_pdf(pdf_id):
    # ...
    pass


@app.task
def create_pages_for_pdfs():
    # Queries the database for unprocessed jobs,
    # in my case these are PDFs not converted to
    # images for each page
    unporcessed_pdfs = [...]
    for pdf in unporcessed_pdfs:
        if not_already_enqueued(pdf_id)
            create_pages_for_pdf.apply_async([pdf.id])

    
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(30.0, create_pages_for_pdfs, name='creates pages for each unprocessed PDF')

wrap the task in the redis lock and ignore the task if the lock exist i have made celery handler you can use that

import logging

from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
from django.conf import settings


class CeleryTaskHandler:
    """
    -   Utility class for managing the execution and exception handling of Celery tasks.
    -   Contains a locking mechanism to prevent multiple instances of the same task from running concurrently.

    Args:
        task_name (str): The name of the task.
        lock_duration (int, optional): The duration of the lock in seconds. Defaults to 300.
    """

    def __init__(self, task_name, lock_duration=300):
        self.task_name = task_name
        self.lock_key = f"{self.task_name}_lock"
        self.lock_duration = lock_duration
        self.REDIS_CLIENT = settings.REDIS_CLIENT

    def acquire_lock(self):
        return self.REDIS_CLIENT.set(self.lock_key, "True", nx=True, ex=self.lock_duration)

    def release_lock(self):
        self.REDIS_CLIENT.delete(self.lock_key)

    def execute_task(self, task_func, *args, **kwargs):
        """
        Execute a task function while acquiring and releasing a lock to prevent concurrent execution.
        """
        if self.acquire_lock():
            try:
                task_func(*args, **kwargs)
            except SoftTimeLimitExceeded as e:
                logging.error(f"Notification Limit exceeded: {str(e)}")
            except TimeLimitExceeded as e:
                logging.error(f"Task shutdown hard time limit exceeded: {str(e)}")
            except Exception as e:
                logging.error(f"Unexpected error in {self.task_name}: {str(e)}", exc_info=True)
            finally:
                self.release_lock()
        else:
            logging.info(f"New task is ignored since a previous task is already in progress for {self.task_name}")

Inspired by and similar to the answer of @shaf-shafiq, I created an annotation.

I prefer this, because

  • it's simpler to wrap a function with run_once_or_cancel('bla {0["id"]}, format=True) to annotate a job together with the id of the first argument
  • their solution seems independent of what celery is doing, all tasks are still started but just cancel as soon as the lock is already used. This way, that orthogonality is clearer.

But I might have misunderstood.

import logging
from functools import wraps

from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
from django.conf import settings

from redis import Redis

logger = logging.getLogger("tasklocks")

def get_redis_client():
    logger.info('Setting up client')
    return Redis(**settings.TASK_LOCK_REDIS_CLIENT)


def run_once_or_cancel(lock_name, format=False, redis_client=None, lock_duration=5):
    """Annotation for a function, connects to reddis and only executes if not already executing somethign with lock_name.
    `format` uses `lockname.format(*args, **kwargs)` as a lock_name, e.g. allowing for lock names with the ID of the
    object(s) the function processes in it. Still figuring out what `lock_duration` does, guess it allows re-execution
    after that time in any case"""
    if redis_client is None:
        redis_client = get_redis_client()

    def acquire_lock(lock_name_):
        logger.debug(f"Acquire task lock {lock_name_}")
        return redis_client.set(lock_name_, "True", nx=True, ex=lock_duration)

    def release_lock(lock_name_):
        logger.debug(f"Release task lock {lock_name_}")
        redis_client.delete(lock_name_)

    def f(task_func):
        @wraps(task_func)
        def g(*args, **kwargs):
            lock_name_ = lock_name.format(*args, **kwargs) if format else lock_name
            if acquire_lock(lock_name_):
                try:
                    task_func(*args, **kwargs)
                except SoftTimeLimitExceeded as e:
                    logger.error(f"Notification Limit exceeded: {str(e)}")
                except TimeLimitExceeded as e:
                    logger.error(f"Task shutdown hard time limit exceeded: {str(e)}")
                except Exception as e:
                    logger.error(f"Unexpected error in `{lock_name}`: {str(e)}", exc_info=True)
                finally:
                    release_lock(lock_name_)
            else:
                logger.info(f"New task `{lock_name}` is ignored since a previous task is still in progress")        
        return g
    return f

I've also configured a separate redis databases for these locks, to be sure not to mingle with the ones celery uses (settings.py). In my case there are three, with databases 0 (websockets), 1 (celery) and 2 (the locks).

CELERY_BROKER_URL = (
 'rediss://localhost:16379/1?'
 'ssl_cert_reqs=CERT_REQUIRED'
 '&ssl_ca_certs=docker/secrets/certificates/redis/ca.crt'
 'ssl_keyfile=docker/secrets/certificates/redis/client.key'
 'ssl_certfile=docker/secrets/certificates/redis/client.crt'

TASK_LOCK_REDIS_CLIENT = dict(
    host="localhost", port=16379, ssl=True, db=2,
    ssl_certfile='docker/secrets/certificates/redis/client.crt',
    ssl_keyfile='docker/secrets/certificates/redis/client.key',
    ssl_cert_reqs="required",
    ssl_ca_certs='docker/secrets/certificates/redis/ca.crt',
)


CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [{
                'address': 'rediss://localhost:16379/0',
                'ssl_certfile': 'docker/secrets/certificates/redis/client.crt',
                'ssl_keyfile': 'docker/secrets/certificates/redis/client.key',
                'ssl_cert_reqs': "required",
                'ssl_ca_certs': 'docker/secrets/certificates/redis/ca.crt',
            }],
            'capacity': 1500,
        },
    },
}

Then I have two functions, on of them a celery task and the other just a function that might be called from several places.

This way, unprocessed items are sought for only once at a time, and an item is only processed 1 time at a time.

@app.task
@run_once_or_cancel('process_items', lock_duration=300)
def process_items():
    # ....
    for item in unprocessed:
        process_item(item)

@app.task
@run_once_or_cancel('process_item({0.id})', format=True, lock_duration=300)
def process_items(item):
    # ....


@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(
        10.0, process_items,
        name='process unprocessed items',
    ) # every 10 seconds, or longer if previous run still busy

In both answers, you're effectively passing the "inspection" logic to Redis locks. This lock might not be reliable, especially considering prefork context.

From what I understand, your task is not that "periodic" since the context of execution is heavily dependent on system state.

What I'm looking for is a daemon-like process or thread that simply checks for things from the database that need processing, and that will wait if none are there (or query ever 30' or something like that). In this case, if the workload is too high, it doesn't particularly make sense to schedule the same thing a gazillion times.

You have described the real solution here. You need to have a sidecar to do this stuff, outside of Celery. This way you can control everything. I suggest you decrease the "check" interval to a low value so that there is little buildup. This is not effective with celery since you would spawn many tasks, which would take queue space.

but it feels dirty to move away from frameworks like celery for background processes

Celery is not supposed to run background processes indefinitely. In fact, the more atomic, idempotent and small the task, the better.

I might provide more concrete recommendations if you could provide me with the details of what type of tasks are you running and how many of them.

Вернуться на верх