Запуск задачи celery при тестировании (pytest) в Django
У меня есть три задачи Celery:
@celery_app.task
def load_rawdata_on_monday():
if not load_rawdata(): # run synchronously
notify_report_was_not_updated.delay()
@celery_app.task
def load_rawdata():
# load and process file from FTP
return False # some error happened
@celery_app.task
def notify_rawdata_was_not_updated():
pass # send email by Django
Мне нужно проверить, что письмо было отправлено, если load_rawdata
задача (функция) возвращает False
. Для этого я написал несколько тестов, которые не работают:
@override_settings(EMAIL_BACKEND='django.core.mail.backends.memcache.EmailBackend')
@override_settings(CELERY_ALWAYS_EAGER=False)
@patch('load_rawdata', MagicMock(return_value=False))
def test_load_rawdata_on_monday():
load_rawdata_on_monday()
assert len(mail.outbox) == 1, "Inbox is not empty"
assert mail.outbox[0].subject == 'Subject here'
assert mail.outbox[0].body == 'Here is the message.'
assert mail.outbox[0].from_email == 'from@example.com'
assert mail.outbox[0].to == ['to@example.com']
Кажется, что notify_rawdata_was_not_updated
все еще выполняется асинхронно.
Как написать правильный тест?
Похоже, что могут происходить две вещи:
- Вы должны вызвать свою задачу с помощью
apply()
метода для ее синхронного выполнения. -
CELERY_ALWAYS_EAGER
setting должен быть активным, чтобы последующие вызовы задачи также выполнялись.
@override_settings(EMAIL_BACKEND='django.core.mail.backends.memcache.EmailBackend')
@override_settings(CELERY_ALWAYS_EAGER=True)
@patch('load_rawdata', MagicMock(return_value=False))
def test_load_rawdata_on_monday():
load_rawdata_on_monday.apply()
assert len(mail.outbox) == 1, "Inbox is not empty"
assert mail.outbox[0].subject == 'Subject here'
assert mail.outbox[0].body == 'Here is the message.'
assert mail.outbox[0].from_email == 'from@example.com'
assert mail.outbox[0].to == ['to@example.com']
Хотя @tinom9 прав насчет использования метода apply()
, проблема того, что notify_rawdata_was_not_updated
все еще выполняется асинхронно, связана с определением вашей задачи:
@celery_app.task
def load_rawdata_on_monday():
if not load_rawdata():
notify_report_was_not_updated.delay() # delay is an async invocation
попробуйте это:
@celery_app.task
def load_rawdata_on_monday():
if not load_rawdata():
notify_report_was_not_updated.apply() # run on local thread
И для теста, вызов load_rawdata_on_monday()
без .delay()
или .apply()
должен по-прежнему выполнять задачу локально и блокировать ее до тех пор, пока не вернется результат задачи. Просто убедитесь, что вы правильно обрабатываете возвращаемые значения, некоторые методы вызова celery, такие как apply()
, возвращают экземпляр celery.result.EagerResult по сравнению с delay()
или apply_async()
, которые возвращают экземпляр celery.result.AsyncResult, что может не дать вам желаемого результата, если вы ожидаете False
, когда вы проверяете if not load_rawdata()
.
или в любом другом месте, где вы пытаетесь получить возвращаемое значение функции, а не самой задачи.
Я должен был использовать CELERY_TASK_ALWAYS_EAGER
вместо CELERY_ALWAYS_EAGER