Запуск задачи celery при тестировании (pytest) в Django
У меня есть три задачи Celery:
@celery_app.task
def load_rawdata_on_monday():
if not load_rawdata(): # run synchronously
notify_report_was_not_updated.delay()
@celery_app.task
def load_rawdata():
# load and process file from FTP
return False # some error happened
@celery_app.task
def notify_rawdata_was_not_updated():
pass # send email by Django
Мне нужно проверить, что письмо было отправлено, если load_rawdata задача (функция) возвращает False. Для этого я написал несколько тестов, которые не работают:
@override_settings(EMAIL_BACKEND='django.core.mail.backends.memcache.EmailBackend')
@override_settings(CELERY_ALWAYS_EAGER=False)
@patch('load_rawdata', MagicMock(return_value=False))
def test_load_rawdata_on_monday():
load_rawdata_on_monday()
assert len(mail.outbox) == 1, "Inbox is not empty"
assert mail.outbox[0].subject == 'Subject here'
assert mail.outbox[0].body == 'Here is the message.'
assert mail.outbox[0].from_email == 'from@example.com'
assert mail.outbox[0].to == ['to@example.com']
Кажется, что notify_rawdata_was_not_updated все еще выполняется асинхронно.
Как написать правильный тест?
Похоже, что могут происходить две вещи:
- Вы должны вызвать свою задачу с помощью
apply()метода для ее синхронного выполнения. -
CELERY_ALWAYS_EAGERsetting должен быть активным, чтобы последующие вызовы задачи также выполнялись.
@override_settings(EMAIL_BACKEND='django.core.mail.backends.memcache.EmailBackend')
@override_settings(CELERY_ALWAYS_EAGER=True)
@patch('load_rawdata', MagicMock(return_value=False))
def test_load_rawdata_on_monday():
load_rawdata_on_monday.apply()
assert len(mail.outbox) == 1, "Inbox is not empty"
assert mail.outbox[0].subject == 'Subject here'
assert mail.outbox[0].body == 'Here is the message.'
assert mail.outbox[0].from_email == 'from@example.com'
assert mail.outbox[0].to == ['to@example.com']
Хотя @tinom9 прав насчет использования метода apply(), проблема того, что notify_rawdata_was_not_updated все еще выполняется асинхронно, связана с определением вашей задачи:
@celery_app.task
def load_rawdata_on_monday():
if not load_rawdata():
notify_report_was_not_updated.delay() # delay is an async invocation
попробуйте это:
@celery_app.task
def load_rawdata_on_monday():
if not load_rawdata():
notify_report_was_not_updated.apply() # run on local thread
И для теста, вызов load_rawdata_on_monday() без .delay() или .apply() должен по-прежнему выполнять задачу локально и блокировать ее до тех пор, пока не вернется результат задачи. Просто убедитесь, что вы правильно обрабатываете возвращаемые значения, некоторые методы вызова celery, такие как apply(), возвращают экземпляр celery.result.EagerResult по сравнению с delay() или apply_async(), которые возвращают экземпляр celery.result.AsyncResult, что может не дать вам желаемого результата, если вы ожидаете False, когда вы проверяете if not load_rawdata().
или в любом другом месте, где вы пытаетесь получить возвращаемое значение функции, а не самой задачи.
Я должен был использовать CELERY_TASK_ALWAYS_EAGER вместо CELERY_ALWAYS_EAGER