Celery не повторяет попытку при возникновении исключения при тестировании в Django

У меня есть основанная на классе задача celery, которую я хотел бы протестировать, вызвав ошибку и затем подтвердив, что повторная попытка была вызвана.

Во время тестирования я хочу вызвать ошибку ConnectionError при вызове msg.send() и затем подтвердить, что она "повторно" выполняется.

Задача по существу конструирует электронное письмо, используя django.core.mail.EmailMultiAlternatives (который наследует EmailMessage), а затем пытается отправить это письмо.

Задача хорошо работает на практике, с повторным вызовом, если соединение не существует, моя единственная проблема - как ее протестировать.

На данный момент мне удалось поднять ConnectionError путем исправления EmailMultiAlternatives, однако тест не вызывает повторной попытки в терминале celery

tasks.py

(упрощенно)

class SendMail(Task):
    max_retries = 4
    base = 'BaseTask' 

    def run(self, **kwargs):
        msg = EmailMultiAlternatives(kwargs.get('some_value'), etc.)
        msg.attach_alternative(rendered_html)
        try:
            msg.send(fail_silently=False)
        except ConnectionError as exc:
            self.retry(countdown=backoff(self.request.retries), exc=exc)

test_task.py

(упрощенно)

class SendMailTest(TestCase):

    def setUp(self):
        self.message = {some data}


    def test_error(self):
        with patch.object(EmailMultiAlternatives, 'send', return_value=None) as mock_method:
        mock_method.side_effect = Exception(ConnectionError)
        SendMail(kwargs=self.message)

Мой вопрос заключается в том, почему, когда выполняется тестовый сценарий и явно возникает исключение, Celery не пытается повторить попытку?

Вернуться на верх