Celery Ограничение количества задач, выполняемых одним пользователем

У меня есть задача в Celery, которая выглядит следующим образом:

@app.task(name='task_one')
def task_one(user_id, *args, **kwargs):
    # Long running task

Эта задача создается в views каждый раз, когда пользователь отправляет форму, задача требует много ресурсов и в среднем занимает около 10 минут на выполнение.

(views.py)
...
if request.method == 'POST':
    task_one.delay(user.id)
...

Я хочу ограничить количество task_one задач, созданных для одного пользователя, до одной (активной или зарезервированной)

До сих пор я проверяю, есть ли активная или зарезервированная задача для этого пользователя, прежде чем создавать задачу:

def user_created_task(active_tasks, reserved_tasks, user_id):
  for task in list(active_tasks.values())[0] + list(reserved_tasks.values())[0]:
    if task['name'] == 'task_one' and task['args'][0] == user_id:
      # Check if there is a `task_one` task created for the user
      return True
  
  return False

def user_tasks_already_running_or_reserved(user_id):
  inspect = app.control.inspect()

  active_tasks = inspect.active()
  reserved_tasks = inspect.reserved()

  if active_tasks is None and reserved_tasks is None:
    # Celery workers are disconnected 
    return False

  return user_created_task(active_tasks, reserved_tasks, user_id)


(views.py)
...
if request.method == 'POST':
    if not user_tasks_already_running_or_reserved(user.id):
        task_one.delay(user.id)
...

Мне интересно, есть ли более эффективный способ сделать это, вместо того, чтобы проверять всех рабочих при каждом запросе пользователя, может есть способ добавить это условие в Celery перед выполнением задачи, пока что я не нашел ничего в документации.

Описанная вами ситуация требует использования распределенной блокировки (поскольку n = 1), но в более общем виде может быть описана как распределенный семафор. Грубо говоря, эти блокировки и механизмы выходят за рамки того, что встроено в celery.

Как отмечают комментаторы (подсказка: @bernhard vallant), прямая реализация распределенной блокировки обычно использует что-то вроде таблицы в базе данных или redis rlock / redlocks.

Для того чтобы использовать одну общую реализацию, вы можете сделать следующее:

from redlock import MultipleRedlockException, Redlock
from django.conf import settings

@app.task(name='task_one', autoretry_for=(MultipleRedlockException, ),  retry_kwargs={'max_retries': 5})
def task_one(user_id, *args, **kwargs):
    # assumes you are using redis for django cache with location
    # set to the redis url
    lock_manager = Redlock([ settings.CACHES['default']['LOCATION'] ])
    lock_name = f'task_one:{user_id}'
    # if the lock fails, we'll get the MultipleRedlockException and trigger
    # celery auto retry
    lock_manager.lock(lock_name, 60 * 60 * 2)  # lock for 2 hours
    try:
        # the main body of what you want to do goes here
        pass
    finally:
        lock_manager.unlock(lock_name)

Вернуться на верх