Как запустить задачу celery-beat под декоратором?

У меня есть декоратор "шкафчик":

def lock_task(func):
    def wrapper(*args, **kwargs):
        if redis.set(func.__name__, 'lock', nx=True):
            try:
                result = func(*args, **kwargs)
            finally:
                redis.delete(func.__name__)
            return result or True
        else:
            return 'Skipped'
    return wrapper

Также у меня есть celery-task с моим декоратором:

@celery_app.task
@lock_task
def test():
    call_command('test')

И у меня есть мои настройки биения сельдерея:

celery_app.conf.beat_schedule = {
    'test': {
        'task': 'project.celery.test',
        'schedule': crontab(minute='*/1')
    }
}

После запуска я получил KeyError Received unregistered task of type 'project.celery.test'.

Как назвать эту конструкцию правильной?

Похоже, что функция wrapper будет зарегистрирована в celery вместо фактической функции test. Вы можете убедиться в этом, если увидите этот лог при запуске celery worker:

$ celery --app=tasks worker --loglevel=INFO
...
[tasks]
  . project.celery.wrapper

Для регистрации имени фактической задачи используйте functools.wraps(), который документирован следующим образом:

Без использования этой фабрики декораторов имя функции примера было бы 'wrapper'

.
from functools import wraps

def lock_task(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
    ...

Если ошибка сохраняется, убедитесь, что вы правильно настроили либо:

  • Celery импортирует, например celery_app.conf.update(imports=['project.celery']) или celery_app.conf.imports = ['project.celery']
  • Or Celery include (example) e.g. celery_app = Celery(..., include=['project.celery'])

Для проверки вы должны увидеть свою задачу с именем project.celery.test при запуске celery worker (подчеркиваю, worker, а не scheduler):

$ celery --app=tasks worker --loglevel=INFO
...
[tasks]
  . project.celery.test
  • Обратите внимание на последнюю строку. Она должна быть видна, если вы вызвали рабочего с флагом --loglevel=INFO. Если вы не видите там задания test или видите вместо него wrapper, то вам могут помочь описанные выше шаги.

Проблема была решена путем добавления имени в декоратор, как

@celery_app.task(name='TASKNAME')
@lock_task
def test():
    call_command('test')

и затем установите это имя на scheduler:

celery_app.conf.beat_schedule = {
    'test': {
        'task': 'TASKNAME',
        'schedule': crontab(minute='*/1')
    }
}

Это работает для меня.

Вернуться на верх