Background daemon in celery without task explosions

I'm always having a hassle trying to have celery jobs scan the database for work, but to not have things done twice or submission explosions. The issue resides in that:

  • periodic celery-beat jobs just keep submitting, even if there already are many added to the queue, and there's no way to cap this. E.g. have at most 1 of these in the queue or active.
  • Unless you use celery inspect to test if a job already exists, but those calls can easily block for too long, I feel this particularly happens if workers are busy.
  • This then can cause a ton of inspects being stacked up.

This can cause job explosions, e.g. when beat keeps submitting but the worker is busy or down for a while, and once it comes up again, it a gazillion of them push through.

What I'm looking for is a daemon-like process or thread that simply checks for things from the database that need processing, and that will wait if none are there (or query ever 30' or something like that). In this case, if the workload is too high, it doesn't particularly make sense to schedule the same thing a gazillion times.

Keeping track in the database of what has been submitted and what not also is a bit tricky, because a job might get killed (e.g. oom) before the unflagging is done.

One could easily program something like that, but it feels dirty to move away from frameworks like celery for background processes. On the other hand, celery seems unable to allow me to limit how often something gets submitted in a sane way.

What is a robust way, using celery or not, to have something like that running in the background without explosions or spurious inspect blocks?

from celery import Celery
app = Celery(...)
app.autodiscover_tasks()


def not_already_enqueued(pdf_id):
    # celery inspect magic on registered/active/reserved
    # unfortunately spuriously blocks a lot
    return True|False

@app.task
def create_pages_for_pdf(pdf_id):
    # ...
    pass


@app.task
def create_pages_for_pdfs():
    # Queries the database for unprocessed jobs,
    # in my case these are PDFs not converted to
    # images for each page
    unporcessed_pdfs = [...]
    for pdf in unporcessed_pdfs:
        if not_already_enqueued(pdf_id)
            create_pages_for_pdf.apply_async([pdf.id])

    
@app.on_after_configure.connect
def setup_periodic_tasks(sender, **kwargs):
    sender.add_periodic_task(30.0, create_pages_for_pdfs, name='creates pages for each unprocessed PDF')

wrap the task in the redis lock and ignore the task if the lock exist i have made celery handler you can use that

import logging

from celery.exceptions import SoftTimeLimitExceeded, TimeLimitExceeded
from django.conf import settings


class CeleryTaskHandler:
    """
    -   Utility class for managing the execution and exception handling of Celery tasks.
    -   Contains a locking mechanism to prevent multiple instances of the same task from running concurrently.

    Args:
        task_name (str): The name of the task.
        lock_duration (int, optional): The duration of the lock in seconds. Defaults to 300.
    """

    def __init__(self, task_name, lock_duration=300):
        self.task_name = task_name
        self.lock_key = f"{self.task_name}_lock"
        self.lock_duration = lock_duration
        self.REDIS_CLIENT = settings.REDIS_CLIENT

    def acquire_lock(self):
        return self.REDIS_CLIENT.set(self.lock_key, "True", nx=True, ex=self.lock_duration)

    def release_lock(self):
        self.REDIS_CLIENT.delete(self.lock_key)

    def execute_task(self, task_func, *args, **kwargs):
        """
        Execute a task function while acquiring and releasing a lock to prevent concurrent execution.
        """
        if self.acquire_lock():
            try:
                task_func(*args, **kwargs)
            except SoftTimeLimitExceeded as e:
                logging.error(f"Notification Limit exceeded: {str(e)}")
            except TimeLimitExceeded as e:
                logging.error(f"Task shutdown hard time limit exceeded: {str(e)}")
            except Exception as e:
                logging.error(f"Unexpected error in {self.task_name}: {str(e)}", exc_info=True)
            finally:
                self.release_lock()
        else:
            logging.info(f"New task is ignored since a previous task is already in progress for {self.task_name}")
Вернуться на верх