Django + Celery - Как отправить письмо сброса пароля Django с помощью Celery?

На этот вопрос уже отвечали ранее здесь, но это решение мне не помогло, мой email не отправляется Celery. Я использую Celery 5.1.2

Мой код: forms.py

from django.contrib.auth import forms as admin_forms
from django.utils.translation import gettext_lazy as _
from users.tasks import reset_password


class PwdResetForm(admin_forms.PasswordResetForm):
    email = forms.EmailField(max_length=150, widget=forms.TextInput())

    def clean_email(self):
        email = self.cleaned_data['email']
        test = User.objects.filter(email=email)
        if not test:
            raise forms.ValidationError(
                _('Unfortunately we can not find that email address'))
        return email

    def send_mail(
            self,
            subject_template_name,
            email_template_name,
            context,
            from_email,
            to_email,
            html_email_template_name=None,
    ):
        context['user'] = context['user'].id
        reset_password.delay(subject_template_name=subject_template_name, email_template_name=email_template_name,
                             context=context, from_email=from_email,
                             to_email=to_email, html_email_template_name=html_email_template_name)

tasks.py

from django.contrib.auth import get_user_model
from django.contrib.auth.forms import PasswordResetForm
from celery import shared_task
from config.celery_app import app

User = get_user_model()


@shared_task  # I also tried using @app.task
def reset_password(subject_template_name, email_template_name, context,
                   from_email, to_email, html_email_template_name):
    context['user'] = User.objects.get(pk=context['user'])

    PasswordResetForm.send_mail(
        None,
        subject_template_name,
        email_template_name,
        context,
        from_email,
        to_email,
        html_email_template_name
    )

celery_app.py

import os
from celery import Celery


os.environ.setdefault("DJANGO_SETTINGS_MODULE", "auto_store.settings")

app = Celery("auto_store")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

Возможно, это связано с тем, что у меня также есть tasks.py в другом приложении?

Если это возможно, я хочу решить эту проблему без стороннего пакета. Спасибо за помощь.

Вернуться на верх