Тестирование Django с помощью Celery и как правильно поднять исключение в задаче celery и утвердить повторную попытку

У меня есть задача Celery в проекте Django, которая отправляет электронное письмо, используя EmailMultiAlternatives от Django.

Я хочу поднять ConnectionError, чтобы вызвать повторную попытку.

Задача хорошо работает на практике, с повторными попытками, как и ожидалось, когда существует ConnectionError, но теперь я хочу протестировать ее.

my_app.mailer.tasks.py

from django.core.mail import EmailMultiAlternatives

@app.task(bind=True,
          max_retries=4,
          base=BaseTaskEmail,
          )
def send_mail(self, *args, **kwargs):
    msg = EmailMultiAlternatives({some_data})
    msg.attach_alternative({some_data})
    try:
        msg.send(fail_silently=False)
    except ConnectionError as exc:
        self.retry(countdown=backoff(self.request.retries), exc=exc)

Я предпринял следующую попытку тестирования, которая приводит к AssertionError:

raise AssertionError(_error_message()) from cause
AssertionError: expected call not found.
Expected: send(exc=ConnectionError())
Actual: send(fail_silently=False)

test.py

class SendMailTest(TestCase):

    def setUp(self):
        self.message = {}

    @patch('my_app.mailer.tasks.EmailMultiAlternatives.send')
    @patch('my_app.mailer.tasks.send_mail.retry')
    def test_retry(self, mock_send, mock_retry):

        mock_send.side_effect = Retry()
        mock_retry.side_effect = error = ConnectionError()

        with raises(Retry):
           send_mail(kwargs=self.message)
        mock_retry.assert_called_with(exc=error)

Мой вопрос в том, как я могу правильно поднять исключение в этой задаче и утверждать, что повторная попытка была вызвана правильным исключением, в данном случае ConnectionError. Кроме того, если возможно, утверждать, что было предпринято правильное количество попыток повторного выполнения?

Вернуться на верх