Celery не повторяет попытку при возникновении исключения при тестировании в Django
У меня есть основанная на классе задача celery, которую я хотел бы протестировать, вызвав ошибку и затем подтвердив, что повторная попытка была вызвана.
Во время тестирования я хочу вызвать ошибку ConnectionError при вызове msg.send() и затем подтвердить, что она "повторно" выполняется.
Задача по существу конструирует электронное письмо, используя django.core.mail.EmailMultiAlternatives (который наследует EmailMessage), а затем пытается отправить это письмо.
Задача хорошо работает на практике, с повторным вызовом, если соединение не существует, моя единственная проблема - как ее протестировать.
На данный момент мне удалось поднять ConnectionError путем исправления EmailMultiAlternatives, однако тест не вызывает повторной попытки в терминале celery
tasks.py
(упрощенно)
class SendMail(Task):
max_retries = 4
base = 'BaseTask'
def run(self, **kwargs):
msg = EmailMultiAlternatives(kwargs.get('some_value'), etc.)
msg.attach_alternative(rendered_html)
try:
msg.send(fail_silently=False)
except ConnectionError as exc:
self.retry(countdown=backoff(self.request.retries), exc=exc)
test_task.py
(упрощенно)
class SendMailTest(TestCase):
def setUp(self):
self.message = {some data}
def test_error(self):
with patch.object(EmailMultiAlternatives, 'send', return_value=None) as mock_method:
mock_method.side_effect = Exception(ConnectionError)
SendMail(kwargs=self.message)
Мой вопрос заключается в том, почему, когда выполняется тестовый сценарий и явно возникает исключение, Celery не пытается повторить попытку?