Невозможно установить соединение с помощью celery и django
Я научился отправлять электронную почту с помощью Django и Celery. Я написал весь код довольно хорошо. Затем, когда я запустил его, я столкнулся с необычной ошибкой, называемой
ConnectionRefusedError: [WinError 10061] No connection could be made because the target machine actively refused it.
- Я не использую брандмауэр или систему защиты в своей системе
- Я новичок в этом, поэтому я могу даже сделать ошибку в командной строке, поэтому если нет ошибки, пожалуйста, скажите мне, что правильно ввести в командной строке
- Я запустил сервер redis и мне кажется, что он работает довольно хорошо
Вот вся ошибка, с которой я столкнулся
settings.py
CELERY_BROKER_URL = 'redis://127.0.0.1:6379'
accept_content = ['application/json']
result_serializer = 'json'
task_serializer = 'json'
timezone = 'Asia/Kolkata'
result_backend = 'django-db'
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
EMAIL_BACKEND = "django.core.mail.backends.smtp.EmailBackend"
EMAIL_USE_TLS = True
EMAIL_USE = "smtp.gmail.com"
EMAIL_PORT = 587
EMAIL_HOST_USER = 'cxxxl.com'
EMAIL_HOST_PASSWORD = 'xxx'
DEFAULT_FROM_EMAIL = 'Celery <xxx.com>'
tasks.py
from django.contrib.auth import get_user_model
from celery import shared_task
from django.core.mail import send_mail
from djangocelery import settings
@shared_task(bind=True)
def user_send_mail(self):
users = get_user_model().objects.all()
for user in users:
mail_subject = "Hi! Celery Testing."
message = "12345"
print(user.email)
to_email = user.email
send_mail(
subject=mail_subject,
message=message,
from_email=settings.EMAIL_HOST_USER,
recipient_list=[to_email],
fail_silently=False,
)
return "Done"
views.py
from django.shortcuts import render
from django.http import HttpResponse
from .tasks import test_func
from send_mail.tasks import user_send_mail
# Create your views here.
def test(request):
test_func.delay()
return HttpResponse("<h1>Done!</h1>")
def send_mail_to_all(request):
user_send_mail.delay()
return HttpResponse("Sent")
celery.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'djangocelery.settings')
app = Celery('djangocelery')
app.conf.enable_utf = False
# This is used to disable UTC timezone and enable our own one
app.conf.update(timezone="Asia/Kolkata")
app.config_from_object(settings, namespace="CELERY")
# CELERY BEAT SETTINGS
app.conf.beat_schedule = {
}
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f"Request: {self.request!r}")