Запуск задачи celery при тестировании (pytest) в Django

У меня есть три задачи Celery:

@celery_app.task
def load_rawdata_on_monday():
    if not load_rawdata():  # run synchronously
        notify_report_was_not_updated.delay()


@celery_app.task
def load_rawdata():
    # load and process file from FTP
    return False  # some error happened


@celery_app.task
def notify_rawdata_was_not_updated():
    pass  # send email by Django

Мне нужно проверить, что письмо было отправлено, если load_rawdata задача (функция) возвращает False. Для этого я написал несколько тестов, которые не работают:

@override_settings(EMAIL_BACKEND='django.core.mail.backends.memcache.EmailBackend')
@override_settings(CELERY_ALWAYS_EAGER=False)
@patch('load_rawdata', MagicMock(return_value=False))
def test_load_rawdata_on_monday():
    load_rawdata_on_monday()
    assert len(mail.outbox) == 1, "Inbox is not empty"
    assert mail.outbox[0].subject == 'Subject here'
    assert mail.outbox[0].body == 'Here is the message.'
    assert mail.outbox[0].from_email == 'from@example.com'
    assert mail.outbox[0].to == ['to@example.com']

Кажется, что notify_rawdata_was_not_updated все еще выполняется асинхронно. Как написать правильный тест?

Похоже, что могут происходить две вещи:

  • Вы должны вызвать свою задачу с помощью apply() метода для ее синхронного выполнения.
  • CELERY_ALWAYS_EAGER setting должен быть активным, чтобы последующие вызовы задачи также выполнялись.
@override_settings(EMAIL_BACKEND='django.core.mail.backends.memcache.EmailBackend')
@override_settings(CELERY_ALWAYS_EAGER=True)
@patch('load_rawdata', MagicMock(return_value=False))
def test_load_rawdata_on_monday():
    load_rawdata_on_monday.apply()
    assert len(mail.outbox) == 1, "Inbox is not empty"
    assert mail.outbox[0].subject == 'Subject here'
    assert mail.outbox[0].body == 'Here is the message.'
    assert mail.outbox[0].from_email == 'from@example.com'
    assert mail.outbox[0].to == ['to@example.com']

Хотя @tinom9 прав насчет использования метода apply(), проблема того, что notify_rawdata_was_not_updated все еще выполняется асинхронно, связана с определением вашей задачи:

@celery_app.task
def load_rawdata_on_monday():
    if not load_rawdata():  
        notify_report_was_not_updated.delay() # delay is an async invocation

попробуйте это:

@celery_app.task
def load_rawdata_on_monday():
    if not load_rawdata():  
        notify_report_was_not_updated.apply() # run on local thread

И для теста, вызов load_rawdata_on_monday() без .delay() или .apply() должен по-прежнему выполнять задачу локально и блокировать ее до тех пор, пока не вернется результат задачи. Просто убедитесь, что вы правильно обрабатываете возвращаемые значения, некоторые методы вызова celery, такие как apply(), возвращают экземпляр celery.result.EagerResult по сравнению с delay() или apply_async(), которые возвращают экземпляр celery.result.AsyncResult, что может не дать вам желаемого результата, если вы ожидаете False, когда вы проверяете if not load_rawdata(). или в любом другом месте, где вы пытаетесь получить возвращаемое значение функции, а не самой задачи.

Я должен был использовать CELERY_TASK_ALWAYS_EAGER вместо CELERY_ALWAYS_EAGER

Вернуться на верх