Использование RabbitMQ с Celery в Django post_save

Я применяю асинхронную обработку задач, используя Celery в моем проекте Django.

Логика моего проекта:,

  • со стороны фронтенда есть таблица со строками, в каждой из которых есть upload button. Пользователь нажимает на нее, и в бэкенд отправляется payload, содержащий url, который содержит файл.
  • Файл принимается в django views. И сохраняется в базе данных, в таблице Run. Сразу после сохранения объекта запускается post_save signal для выполнения задачи celery.
  • .
  • Задача должна быть выполнена следующим образом: получить список прогонов с определенным статусом. Для каждого прогона выполнить задачу загрузки файла.

Я хотел бы выполнить это асинхронно в случае, если есть более одного запуска. Помня о том, что пользователь может нажать кнопку загрузки для более чем одной строки из фронтенда.

Я устанавливаю RabbitMQ в качестве брокера. У меня установлен и работает rabbitMQ. Я установил CELERY_BROKER_URL='amqp://localhost' тоже в settings.py. Я немного запутался в том, что мне делать дальше в моих конфигурациях, не могли бы вы мне подсказать. Я думаю, что мне нужно настроить celery worker для моих задач.

Ниже приведен мой код на данный момент :

views.py #view, сохраняющий в базу данных

class RunsUploadView(APIView):
    serializer_class = RunsURLSerializer

    def post(self, request,  *args, **kwargs):
        crawler_name = self.request.data.get('crawler')
        run_id = self.kwargs.get("run_id")
        run_url = self.request.data.get("run_url")

        run = Run()
        run.name = f"{crawler_name}_{run_id}"
        run.run = run_id
        run.url = run_url
        run.save()

        return Response(model_to_dict(run))

models.py # выполнение сохраняется в таблице Run, затем срабатывает сигнал post_save.

from django.db import models


class Run(models.Model):
    UPLOAD_STATUS = (
        ("Pending", "pending"),
        ("Running", "running"),
        ("Success", "success"),
        ("Failed", "failed"),
    )
    name = models.CharField(max_length=100)
    run = models.CharField(max_length=100, unique=True)
    url = models.URLField(max_length=1000)
    status = models.CharField(
        max_length=50, choices=UPLOAD_STATUS, default="Pending")
    started_at = models.DateTimeField(null=True)
    done_at = models.DateTimeField(null=True)

signals.py # обработка логики post_save после save()

from django.db.models.signals import post_save
from django.dispatch import receiver
from main.models import Run
from main.tasks import DownloadRun


@receiver(post_save, sender=Run)
def download_file(sender, **kwargs):
    pending_runs = Run.objects.filter(status='Pending') #all pending runs collected, I would need to handle the runs asynchronously. 
    for run in pending_runs:
        run.status = "Started"
        run.save()
        DownloadRun(run)

Tasks.py #использование класса, потому что я собираюсь обновить его, добавив больше функций.

class DownloadRun:
    def __init__(self, run):
        run_object = model_to_dict(run)
        self.run_url = run_object["url"]

        self.download_run()


    def download_run(self, dest_folder="runs"):
        ""Run file is downloaded from url""

Я понял, как действовать дальше. Я плохо настроил сельдерей.

tasks.py

from celery import shared_task #import shared_task decorator from celery
class DownloadRun:
    def __init__(self, run):
        run_object = model_to_dict(run)
        self.run_url = run_object["url"]

        self.download_run()


    def download_run(self, dest_folder="runs"):
        ""Run file is downloaded from url""

@shared_task 
def celery_task(run_id):
    DownloadRun(run_id)

signals.py

from django.db.models.signals import post_save
from django.dispatch import receiver
from main.models import Run
from main.tasks import celery_task #import celery_task from tasks.py


@receiver(post_save, sender=Run)
def download_file(sender, **kwargs):
    pending_runs = Run.objects.filter(status='Pending')
    for run in pending_runs:
        run.status = "Started"
        run.save()
        celery_task.delay(run.run) #call celery delay() to invoke the task (pass the unique key as parameter, could be id, in my case I chose the run)

Вернуться на верх