Как запустить задачу celery-beat под декоратором?
У меня есть декоратор "шкафчик":
def lock_task(func):
def wrapper(*args, **kwargs):
if redis.set(func.__name__, 'lock', nx=True):
try:
result = func(*args, **kwargs)
finally:
redis.delete(func.__name__)
return result or True
else:
return 'Skipped'
return wrapper
Также у меня есть celery-task с моим декоратором:
@celery_app.task
@lock_task
def test():
call_command('test')
И у меня есть мои настройки биения сельдерея:
celery_app.conf.beat_schedule = {
'test': {
'task': 'project.celery.test',
'schedule': crontab(minute='*/1')
}
}
После запуска я получил KeyError Received unregistered task of type 'project.celery.test'.
Как назвать эту конструкцию правильной?
Похоже, что функция wrapper
будет зарегистрирована в celery вместо фактической функции test
. Вы можете убедиться в этом, если увидите этот лог при запуске celery worker:
$ celery --app=tasks worker --loglevel=INFO
...
[tasks]
. project.celery.wrapper
Для регистрации имени фактической задачи используйте functools.wraps(), который документирован следующим образом:
Без использования этой фабрики декораторов имя функции примера было бы 'wrapper'
.
from functools import wraps
def lock_task(func):
@wraps(func)
def wrapper(*args, **kwargs):
...
Если ошибка сохраняется, убедитесь, что вы правильно настроили либо:
- Celery импортирует, например
celery_app.conf.update(imports=['project.celery'])
илиcelery_app.conf.imports = ['project.celery']
- Or Celery include (example) e.g.
celery_app = Celery(..., include=['project.celery'])
Для проверки вы должны увидеть свою задачу с именем project.celery.test
при запуске celery worker (подчеркиваю, worker, а не scheduler):
$ celery --app=tasks worker --loglevel=INFO
...
[tasks]
. project.celery.test
- Обратите внимание на последнюю строку. Она должна быть видна, если вы вызвали рабочего с флагом
--loglevel=INFO
. Если вы не видите там заданияtest
или видите вместо негоwrapper
, то вам могут помочь описанные выше шаги.
Проблема была решена путем добавления имени в декоратор, как
@celery_app.task(name='TASKNAME')
@lock_task
def test():
call_command('test')
и затем установите это имя на scheduler:
celery_app.conf.beat_schedule = {
'test': {
'task': 'TASKNAME',
'schedule': crontab(minute='*/1')
}
}
Это работает для меня.