Использование RabbitMQ с Celery в Django post_save
Я применяю асинхронную обработку задач, используя Celery в моем проекте Django.
Логика моего проекта:,
- со стороны фронтенда есть таблица со строками, в каждой из которых есть
upload button. Пользователь нажимает на нее, и в бэкенд отправляетсяpayload, содержащий url, который содержит файл. - Файл принимается в django views. И сохраняется в базе данных, в таблице
Run. Сразу после сохранения объекта запускаетсяpost_save signalдля выполнения задачи celery. .
- Задача должна быть выполнена следующим образом: получить список прогонов с определенным статусом. Для каждого прогона выполнить задачу загрузки файла.
Я хотел бы выполнить это асинхронно в случае, если есть более одного запуска. Помня о том, что пользователь может нажать кнопку загрузки для более чем одной строки из фронтенда.
Я устанавливаю RabbitMQ в качестве брокера. У меня установлен и работает rabbitMQ. Я установил CELERY_BROKER_URL='amqp://localhost' тоже в settings.py. Я немного запутался в том, что мне делать дальше в моих конфигурациях, не могли бы вы мне подсказать. Я думаю, что мне нужно настроить celery worker для моих задач.
Ниже приведен мой код на данный момент :
views.py #view, сохраняющий в базу данных
class RunsUploadView(APIView):
serializer_class = RunsURLSerializer
def post(self, request, *args, **kwargs):
crawler_name = self.request.data.get('crawler')
run_id = self.kwargs.get("run_id")
run_url = self.request.data.get("run_url")
run = Run()
run.name = f"{crawler_name}_{run_id}"
run.run = run_id
run.url = run_url
run.save()
return Response(model_to_dict(run))
models.py # выполнение сохраняется в таблице Run, затем срабатывает сигнал post_save.
from django.db import models
class Run(models.Model):
UPLOAD_STATUS = (
("Pending", "pending"),
("Running", "running"),
("Success", "success"),
("Failed", "failed"),
)
name = models.CharField(max_length=100)
run = models.CharField(max_length=100, unique=True)
url = models.URLField(max_length=1000)
status = models.CharField(
max_length=50, choices=UPLOAD_STATUS, default="Pending")
started_at = models.DateTimeField(null=True)
done_at = models.DateTimeField(null=True)
signals.py # обработка логики post_save после save()
from django.db.models.signals import post_save
from django.dispatch import receiver
from main.models import Run
from main.tasks import DownloadRun
@receiver(post_save, sender=Run)
def download_file(sender, **kwargs):
pending_runs = Run.objects.filter(status='Pending') #all pending runs collected, I would need to handle the runs asynchronously.
for run in pending_runs:
run.status = "Started"
run.save()
DownloadRun(run)
Tasks.py #использование класса, потому что я собираюсь обновить его, добавив больше функций.
class DownloadRun:
def __init__(self, run):
run_object = model_to_dict(run)
self.run_url = run_object["url"]
self.download_run()
def download_run(self, dest_folder="runs"):
""Run file is downloaded from url""
Я понял, как действовать дальше. Я плохо настроил сельдерей.
tasks.py
from celery import shared_task #import shared_task decorator from celery
class DownloadRun:
def __init__(self, run):
run_object = model_to_dict(run)
self.run_url = run_object["url"]
self.download_run()
def download_run(self, dest_folder="runs"):
""Run file is downloaded from url""
@shared_task
def celery_task(run_id):
DownloadRun(run_id)
signals.py
from django.db.models.signals import post_save
from django.dispatch import receiver
from main.models import Run
from main.tasks import celery_task #import celery_task from tasks.py
@receiver(post_save, sender=Run)
def download_file(sender, **kwargs):
pending_runs = Run.objects.filter(status='Pending')
for run in pending_runs:
run.status = "Started"
run.save()
celery_task.delay(run.run) #call celery delay() to invoke the task (pass the unique key as parameter, could be id, in my case I chose the run)