Запуск фоновых задач из Django Admin с помощью Celery

Проекты на Django часто включают в себя длительные административные задачи, такие как создание отчетов, удаление неактивных пользователей, очистка токенов аутентификации и создание эскизов. Хотя эти задачи не так важны, как задачи, запускаемые пользователями, они все равно могут существенно повлиять на скорость и доступность вашего веб-приложения, если не будут выполнены надлежащим образом.

Вместо того, чтобы запускать административные задачи в главном потоке и блокировать ваше веб-приложение, вам всегда следует использовать очередь задач. Очередь задач позволяет выполнять задачи асинхронно, что означает, что ваше веб-приложение остается быстрым и отзывчивым.

В этом руководстве мы рассмотрим, как запускать фоновые задачи непосредственно из администратора Django. Мы будем использовать Celery, но аналогичные концепции применимы и к любым другим очередям задач, таким как Django-RQ, Django Q, или Хьюи.

Содержимое

Цели

К концу этого урока вы сможете:

  1. Объяснить основы приготовления сельдерея
  2. Интегрировать Celery с Django с помощью Docker Compose
  3. Определить пользовательские задачи Celery и запускайте их с помощью Django views и Django admin
  4. Использовать серверную часть Celery result для получения сведений о задаче и ее статусе

Настройка проекта

Чтобы упростить изучение руководства, я подготовил проект dockerized Django. Все, что делает проект, - это предоставляет представление API, которое имитирует длительную задачу создания отчета.

Сначала клонируем base ветку репозитория GitHub:

$ git clone https://github.com/duplxey/django-celery-admin.git \
    --single-branch --branch base && cd django-celery-admin

Затем создайте и разверните контейнеры с помощью Docker:

$ chmod +x web/entrypoint.sh
$ docker compose up --build -d

Затем создайте суперпользователя:

$ docker compose exec web python manage.py createsuperuser

Теперь вы сможете войти в систему по адресу http://localhost:8000/admin/, используя свои учетные данные.

Чтобы запустить генерацию отчета, перейдите по ссылке http://localhost:8000/generate-report/. Вы заметите, что веб-приложение фактически зависнет на 15 секунд. Это происходит потому, что генерация отчета происходит синхронно (блокируя основной поток).

Настройка сельдерея

Как упоминалось ранее, Celery - это асинхронная очередь задач, которая позволяет выполнять трудоемкие задачи в фоновом режиме. Без очереди задач вашему приложению Django приходится ждать завершения задачи, прежде чем возвращать ответ и переходить к следующему запросу.

По своей сути сельдерей состоит из четырех компонентов:

  1. Задачи (пользовательский код, который будет выполняться в рабочем процессе)
  2. Очередь задач (отвечает за доставку сообщений/задач работнику)
  3. Рабочий (дополнительный процесс, который выполняет задачи асинхронно с вашим приложением Django)
  4. Серверная часть результатов (хранилище для отслеживания статуса и результатов)

Упрощенная архитектура Celery выглядит примерно так:

Django + Celery Architecture

Для получения более подробной информации о Celery ознакомьтесь с официальной документацией по Celery, а также с Руководством по работе с Django и Celery.

Начните с добавления celery и redis в web/requirements.txt файл:

# web/requirements.txt

Django==5.1.7
celery==5.4.0
redis==5.2.1

Затем измените docker-compose.yml примерно так:

# docker-compose.yml

services:
  web:
    build: ./web
    command: python manage.py runserver 0.0.0.0:8000
    volumes:
      - ./web:/usr/src/app/
    ports:
      - "8000:8000"
    environment:
      - DEBUG=1
      - SECRET_KEY=niks4af)ge#ff42is0vpsk07qach(mrcool1_#wx8c6izoi3vi
      - ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
      - CELERY_BROKER=redis://redis:6379/0
      - CELERY_BACKEND=redis://redis:6379/0
    depends_on:
      - redis

  celery:
    build: ./web
    entrypoint: []
    command: celery --app=core worker --loglevel=info
    volumes:
      - ./web:/usr/src/app/
    environment:
      - DEBUG=1
      - SECRET_KEY=niks4af)ge#ff42is0vpsk07qach(mrcool1_#wx8c6izoi3vi
      - ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
      - CELERY_BROKER=redis://redis:6379/0
      - CELERY_BACKEND=redis://redis:6379/0
    depends_on:
      - redis
      - web

  redis:
    image: redis:7.4.2-alpine
    volumes:
      - redis_data:/data
    ports:
      - "6379:6379"

volumes:
  redis_data:

Что здесь происходит?

  1. Сначала мы добавили два сервиса Docker: celery и redis. Redis будет использоваться в качестве посредника сообщений и серверной части хранилища для Celery.
  2. Чтобы разрешить Celery и Django подключаться к Redis, мы определили две переменные окружения, CELERY_BROKER и CELERY_BACKEND.
  3. Наконец, чтобы сохранить данные Redis, мы создали том с именем redis_data.

Загрузите переменные окружения в web/core/settings.py вот так:

# web/core/settings.py

CELERY_BROKER_URL = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")
CELERY_RESULT_BACKEND = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")

Затем создайте celery.py файл в папке "web/core" со следующим содержимым:

# web/core/celery.py

import os

from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings")

app = Celery("core", result_extended=True)
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

Что здесь происходит?

  1. Мы устанавливаем новую переменную окружения с именем DJANGO_SETTINGS_MODULE, которая сообщает Celery, где находится наш Django settings.py файл.
  2. Мы импортировали Celery и использовали его для инициализации экземпляра Celery app.
  3. Наконец, мы вызвали autodiscover_tasks(), который автоматически сканирует наш проект на Django, находит задачи Celery и регистрирует их.

И, наконец, обновите core/__init__.py , чтобы импортировать сельдерей при запуске Django:

# web/core/__init__.py

from .celery import app as celery_app

__all__ = [
    "celery_app",
]

Отлично, мы успешно установили Celery.

Чтобы применить изменения, перезапустите проект Docker Compose:

$ docker compose down
$ docker compose up --build -d

Определите задачу

Давайте преобразуем нашу синхронную задачу generate_report_view() в задачу Celery.

Создайте tasks.py файл в папке "web/reports" со следующим содержимым:

# web/reports/tasks.py

import time

from celery import shared_task

from reports.models import Report


@shared_task
def generate_report_task(report_id, **kwargs):
    report = Report.objects.get(id=report_id)

    # Simulate a long-running report generation
    time.sleep(15)
    report.content = "testdriven.io is cool!"
    report.is_ready = True
    report.save()

    return "The report has been successfully generated!"

В этом фрагменте кода мы определили новую задачу Celery, используя декоратор @shared_task. Вместо создания экземпляра отчета в задаче мы передаем его задаче по идентификатору. Делая это, Django узнает, каким будет идентификатор отчета до завершения задачи.

Кроме того, мы добавили **kwargs в сигнатуру функции, что позволило нам позже передавать пользовательские аргументы задаче.

Поскольку в нашем контейнере с сельдереем не включена функция горячей перезагрузки, перезапустите его, прежде чем продолжить:

$ docker compose restart celery

Задача запуска

Запуск задачи через просмотр

Чтобы запустить задачу с нашей точки зрения, измените ее следующим образом:

# web/reports/views.py

from django.http import JsonResponse

from reports.models import Report
from reports.tasks import generate_report_task


def generate_report_view(request):
    report = Report.objects.create()
    task = generate_report_task.delay(report.pk)

    return JsonResponse({
        "status": "The report is being generated...",
        "task_id": task.id,
    })

Celery delay() метод - это ярлык для асинхронного применения задачи путем отправки сообщения брокеру. Как упоминалось в предыдущем разделе, мы должны передать идентификатор отчета.

Протестируйте конечную точку, перейдя по ссылке http://localhost:8000/generate-report/ в вашем браузере:

{
  "status": "The report is being generated...",
  "task_id": "7b3b8132-41bc-4bca-a0bb-bd91e65e283a"
}

Вы заметите, что теперь ответ приходит мгновенно. Если вы зайдете в панель администратора отчетов, то увидите, что был создан новый отчет. Через 15 секунд отчет должен быть готов.

Запуск задачи через администратора

Запустить задачу с помощью администратора Django немного сложнее. Нам нужно:

  1. Создайте новое представление администратора Django.
  2. Переопределите метод ModelAdmin для get_urls() регистрации представления.
  3. Переопределите шаблон администратора и добавьте кнопку запуска.

Давайте сделаем это!

Сначала перейдите к web/reports/admin.py и вставьте следующий код:

# web/reports/admin.py

from django.contrib import admin
from django.db import transaction
from django.shortcuts import redirect
from django.urls import path, reverse

from reports.models import Report
from reports.tasks import generate_report_task


class ReportAdmin(admin.ModelAdmin):
    list_display = ["__str__", "created_at", "updated_at", "is_ready"]
    change_list_template = "admin/reports/report/change_list.html"
    readonly_fields = ["created_at", "updated_at"]

    def get_urls(self):
        urls = super().get_urls()
        custom_urls = [
            path(
                "generate/",
                self.admin_site.admin_view(self.admin_generate_report_view),
                name="reports_generate",
            ),
        ]
        return custom_urls + urls

    def admin_generate_report_view(self, request):
        with transaction.atomic():
            report = Report.objects.create()

        result = generate_report_task.delay(report_id=report.id)

        self.message_user(request, "Started generating a report...")
        return redirect(reverse(
            "admin:reports_report_change",
            kwargs={"object_id": report.id},
        ))


admin.site.register(Report, ReportAdmin)

Что здесь происходит?

  1. Мы определили представление под названием admin_generate_report_view(), которое запускает задачу.
  2. Мы зарегистрировали представление, переопределив метод get_urls(). Обратите внимание, что при переопределении URL-адресов администратора Django пользовательские URL-адреса всегда должны быть первыми.
  3. Мы изменили change_list_template, указав путь к пользовательскому шаблону.

Затем создайте следующую структуру каталогов в вашей папке "web/reports":

templates/
└── admin/
    └── reports/
        └── report/
            └── change_list.html

Затем поместите следующий код в HTML-файл:

<!-- web/reports/templates/admin/reports/report/change_list.html -->

{% extends "admin/change_list.html" %}

{% block object-tools-items %}
  <style>
    .generatelink {
      background: var(--link-fg) !important;
    }

    .generatelink:hover {
      background: var(--link-hover-color) !important;
    }
  </style>
  <li>
    <a href="{% url "admin:reports_generate" %}" class="generatelink">
      Generate report
    </a>
  </li>
  {{ block.super }}
{% endblock %}

В этом шаблоне мы добавили пользовательскую кнопку "Создать отчет" рядом с кнопкой "История".

Перейдите на свой сайт администратора, нажмите кнопку Создать, и вы будете перенаправлены на страницу сведений об отчете. Через 15 секунд обновите страницу, и отчет будет готов.

Django Admin Generate Report

Если вы хотите добавить кнопки запуска в другие места администрирования Django, ознакомьтесь с Исходным кодом Django, чтобы узнать, какие шаблоны вы должны переопределить.

Статус задачи

Помните серверную часть result, о которой я говорил ранее? Что ж, давайте используем ее для отображения статуса задачи в реальном времени и перенаправления пользователя только после завершения создания отчета.

Чтобы достичь этого, нам нужно:

  1. Включить расширенный сервер обработки результатов Celery.
  2. Определите представление администратора, которое возвращает статус задачи Celery.
  3. Создайте страницу администратора Django, на которой отображается счетчик и проводятся опросы о статусе.
  4. Передайте redirect_url задаче и перенаправьте на нее, как только задача будет выполнена.

Сначала добавьте следующие две настройки в свой web/core/settings.py файл:

# web/core/settings.py

CELERY_RESULT_EXTENDED = True
CELERY_TASK_TRACK_STARTED = True

Первая настройка указывает Celery сохранять не только статус задачи и результат, но и название задачи, аргументы, именованные аргументы и так далее. Вторая настройка предписывает Celery отслеживать все изменения статуса (не только PENDING и окончательные).

Перезапустите контейнер с сельдереем:

$ docker compose restart celery

Затем создайте CustomAdminConfig и CustomAdminSite в web/core/admin.py вот так:

# web/core/admin.py

from django.contrib import admin
from django.http import JsonResponse
from django.shortcuts import render
from django.urls import path
from django.contrib.admin.apps import AdminConfig
from celery.result import AsyncResult


class CustomAdminConfig(AdminConfig):
    default_site = "core.admin.CustomAdminSite"


class CustomAdminSite(admin.AdminSite):
    def get_urls(self):
        urls = super().get_urls()
        custom_urls = [
            path(
                "task-status/<str:task_id>/",
                self.admin_view(self.admin_task_status_view),
                name="task_status",
            )
        ]
        return custom_urls + urls

    def admin_task_status_view(self, request, task_id):
        task = AsyncResult(task_id)
        task_data = {
            "id": task.id,
            "name": task.name,
            "args": task.args,
            "kwargs": task.kwargs,
            "state": task.state,
        }

        # Return JSON response if requested
        if request.headers.get("Accept", "").startswith("application/json"):
            return JsonResponse(task_data)

        # Otherwise, render HTML response
        return render(
            request,
            "admin/task_status.html",
            {
                "title": "Task Status",
                "task": task_data,
            },
        )

Мы определили пользовательский сайт администратора Django с admin_task_status_view(), который выбирает задачи Celery по идентификатору и возвращает их статус. Позже мы будем использовать эту конечную точку для опроса статуса из нашего шаблона Django.

Далее переключаем администрацию сайта в режим web/core/settings.py вот так:

# web/core/settings.py

INSTALLED_APPS = [
    "core.admin.CustomAdminConfig",  # the default admin was replaced by this one
    "django.contrib.auth",
    "django.contrib.contenttypes",
    "django.contrib.sessions",
    "django.contrib.messages",
    "django.contrib.staticfiles",
    "reports.apps.ReportsConfig",
]

Наконец, позаботьтесь о шаблонах, создав следующую структуру каталогов в "web":

templates/
└── admin/
    ├── components/
    │   └── spinner.html
    └── task_status.html

В файле spinner.html добавьте следующий код:

<!-- web/templates/admin/components/spinner.html -->

<div class="spinner"></div>
<style>
  .spinner {
    width: 32px;
    height: 32px;
    border: 3px solid var(--border-color);
    border-top: 3px solid var(--button-bg);
    border-radius: 50%;
    animation: spin 1s linear infinite;
    margin: 12px 0;
  }

  @keyframes spin {
    0% {
      transform: rotate(0deg);
    }
    100% {
      transform: rotate(360deg);
    }
  }
</style>

И добавьте к следующееtask_status.html:

<!-- web/templates/admin/task_status.html -->

{% extends "admin/base_site.html" %}

{% block content %}
  <ul style="padding-left: 24px;">
    <li><b>Task ID:</b> {{ task.id }}</li>
    <li><b>Task Name:</b> {{ task.name }}</li>
    <li><b>Task Args:</b> {{ task.args }}</li>
    <li><b>Task Kwargs:</b> {{ task.kwargs }}</li>
    <li><b>Task State:</b> {{ task.state }}</li>
  </ul>
  {% if task.state == "PENDING" or task.state == "STARTED" %}
    <hr style="margin: 12px 0;">
    <div>
      <p>Task is running...</p>
      {% include "admin/components/spinner.html" %}
      <p>(Last update: <span id="lastUpdate"></span>)</p>
    </div>
    <script>
      let intervalId = null;
      const lastUpdateSpan = document.getElementById("lastUpdate");

      async function fetchStatusAndUpdate() {
        try {
          console.log("Fetching task status...");

          // Fetch task status from the server
          const response = await fetch(`/admin/task-status/{{ task.id }}/`, {
            headers: { Accept: "application/json" }
          });
          const data = await response.json();

          if (data["state"] === "SUCCESS" || data["state"] === "FAILURE") {
            // Stop with the polling
            if (intervalId) {
              clearInterval(intervalId);
            }

            // If 'redirect_url' is present redirect to that URL
            if (data["kwargs"] && data["kwargs"]["redirect_url"]) {
              window.location.href = data["kwargs"]["redirect_url"];
            }
          }

          // Update the 'lastUpdate' span
          lastUpdateSpan.innerHTML = new Date().toLocaleTimeString();
        } catch (error) {
          console.error("Error fetching task status:", error);
        }
      }

      // Instantly check the status and then poll every 3 seconds
      intervalId = setInterval(fetchStatusAndUpdate, 3000);
      fetchStatusAndUpdate();
    </script>
  {% endif %}
{% endblock %}

Что здесь происходит?

  1. Шаблон извлекает статус задачи и отображает его.
  2. После этого он опрашивает обновления статуса задачи каждые 3 секунды, используя setInterval().
  3. Как только задача выполняется успешно или с ошибкой, она перенаправляет пользователя на redirect_url.

Наконец, измените ReportAdmin в admin_generate_report_view(), чтобы передать URL-адрес перенаправления на задачу вместо мгновенного перенаправления на страницу сведений об отчете:

# web/reports/admin.py

class ReportAdmin(admin.ModelAdmin):

    ...

    def admin_generate_report_view(self, request):
        with transaction.atomic():
            report = Report.objects.create()

        redirect_url = reverse("admin:reports_report_change", kwargs={
            "object_id": report.id,
        })
        result = generate_report_task.delay(
            report_id=report.id,
            redirect_url=redirect_url,
        )

        self.message_user(request, "Started generating a report...")
        return redirect("admin:task_status", result.id)

Хорошая работа!

Теперь у нас есть пользовательское представление администратора, которое запрашивает статус задачи и перенаправляет пользователя на redirect_url после завершения задачи. Чтобы протестировать это, перейдите к своему администратору Django и запустите генерацию отчета, как и раньше.

Django Admin Task Status

Заключение

В этом руководстве мы рассмотрели, как интегрировать Celery с Django, чтобы запускать длительные задачи из Django admin. Кроме того, мы узнали, как использовать серверную часть результатов для получения статуса задачи и отображения его практически в режиме реального времени.

Окончательный проект можно просмотреть в репозитории django-celery-admin на GitHub.

Чтобы получить максимальную отдачу от Django admin и Celery, ознакомьтесь со следующими ресурсами:

Вернуться на верх