Тестирование Django с помощью Celery и как правильно поднять исключение в задаче celery и утвердить повторную попытку
У меня есть задача Celery в проекте Django, которая отправляет электронное письмо, используя EmailMultiAlternatives от Django.
Я хочу поднять ConnectionError, чтобы вызвать повторную попытку.
Задача хорошо работает на практике, с повторными попытками, как и ожидалось, когда существует ConnectionError, но теперь я хочу протестировать ее.
my_app.mailer.tasks.py
from django.core.mail import EmailMultiAlternatives
@app.task(bind=True,
max_retries=4,
base=BaseTaskEmail,
)
def send_mail(self, *args, **kwargs):
msg = EmailMultiAlternatives({some_data})
msg.attach_alternative({some_data})
try:
msg.send(fail_silently=False)
except ConnectionError as exc:
self.retry(countdown=backoff(self.request.retries), exc=exc)
Я предпринял следующую попытку тестирования, которая приводит к AssertionError:
raise AssertionError(_error_message()) from cause
AssertionError: expected call not found.
Expected: send(exc=ConnectionError())
Actual: send(fail_silently=False)
test.py
class SendMailTest(TestCase):
def setUp(self):
self.message = {}
@patch('my_app.mailer.tasks.EmailMultiAlternatives.send')
@patch('my_app.mailer.tasks.send_mail.retry')
def test_retry(self, mock_send, mock_retry):
mock_send.side_effect = Retry()
mock_retry.side_effect = error = ConnectionError()
with raises(Retry):
send_mail(kwargs=self.message)
mock_retry.assert_called_with(exc=error)
Мой вопрос в том, как я могу правильно поднять исключение в этой задаче и утверждать, что повторная попытка была вызвана правильным исключением, в данном случае ConnectionError. Кроме того, если возможно, утверждать, что было предпринято правильное количество попыток повторного выполнения?