Запуск фоновых задач из Django Admin с помощью Celery
Проекты на Django часто включают в себя длительные административные задачи, такие как создание отчетов, удаление неактивных пользователей, очистка токенов аутентификации и создание эскизов. Хотя эти задачи не так важны, как задачи, запускаемые пользователями, они все равно могут существенно повлиять на скорость и доступность вашего веб-приложения, если не будут выполнены надлежащим образом.
Вместо того, чтобы запускать административные задачи в главном потоке и блокировать ваше веб-приложение, вам всегда следует использовать очередь задач. Очередь задач позволяет выполнять задачи асинхронно, что означает, что ваше веб-приложение остается быстрым и отзывчивым.
В этом руководстве мы рассмотрим, как запускать фоновые задачи непосредственно из администратора Django. Мы будем использовать Celery, но аналогичные концепции применимы и к любым другим очередям задач, таким как Django-RQ, Django Q, или Хьюи.
Содержимое
- Цели
- Настройка проекта
- Настройка сельдерея
- Определить задачу
- Запустить задачу
- Статус задачи
- Завершение
Цели
К концу этого урока вы сможете:
- Объяснить основы приготовления сельдерея
- Интегрировать Celery с Django с помощью Docker Compose
- Определить пользовательские задачи Celery и запускайте их с помощью Django views и Django admin
- Использовать серверную часть Celery result для получения сведений о задаче и ее статусе
Настройка проекта
Чтобы упростить изучение руководства, я подготовил проект dockerized Django. Все, что делает проект, - это предоставляет представление API, которое имитирует длительную задачу создания отчета.
Сначала клонируем base
ветку репозитория GitHub:
$ git clone https://github.com/duplxey/django-celery-admin.git \
--single-branch --branch base && cd django-celery-admin
Затем создайте и разверните контейнеры с помощью Docker:
$ chmod +x web/entrypoint.sh
$ docker compose up --build -d
Затем создайте суперпользователя:
$ docker compose exec web python manage.py createsuperuser
Теперь вы сможете войти в систему по адресу http://localhost:8000/admin/, используя свои учетные данные.
Чтобы запустить генерацию отчета, перейдите по ссылке http://localhost:8000/generate-report/. Вы заметите, что веб-приложение фактически зависнет на 15 секунд. Это происходит потому, что генерация отчета происходит синхронно (блокируя основной поток).
Настройка сельдерея
Как упоминалось ранее, Celery - это асинхронная очередь задач, которая позволяет выполнять трудоемкие задачи в фоновом режиме. Без очереди задач вашему приложению Django приходится ждать завершения задачи, прежде чем возвращать ответ и переходить к следующему запросу.
По своей сути сельдерей состоит из четырех компонентов:
- Задачи (пользовательский код, который будет выполняться в рабочем процессе)
- Очередь задач (отвечает за доставку сообщений/задач работнику)
- Рабочий (дополнительный процесс, который выполняет задачи асинхронно с вашим приложением Django)
- Серверная часть результатов (хранилище для отслеживания статуса и результатов)
Упрощенная архитектура Celery выглядит примерно так:
Для получения более подробной информации о Celery ознакомьтесь с официальной документацией по Celery, а также с Руководством по работе с Django и Celery.
Начните с добавления celery
и redis
в web/requirements.txt файл:
# web/requirements.txt
Django==5.1.7
celery==5.4.0
redis==5.2.1
Затем измените docker-compose.yml примерно так:
# docker-compose.yml
services:
web:
build: ./web
command: python manage.py runserver 0.0.0.0:8000
volumes:
- ./web:/usr/src/app/
ports:
- "8000:8000"
environment:
- DEBUG=1
- SECRET_KEY=niks4af)ge#ff42is0vpsk07qach(mrcool1_#wx8c6izoi3vi
- ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- redis
celery:
build: ./web
entrypoint: []
command: celery --app=core worker --loglevel=info
volumes:
- ./web:/usr/src/app/
environment:
- DEBUG=1
- SECRET_KEY=niks4af)ge#ff42is0vpsk07qach(mrcool1_#wx8c6izoi3vi
- ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- redis
- web
redis:
image: redis:7.4.2-alpine
volumes:
- redis_data:/data
ports:
- "6379:6379"
volumes:
redis_data:
Что здесь происходит?
- Сначала мы добавили два сервиса Docker:
celery
иredis
. Redis будет использоваться в качестве посредника сообщений и серверной части хранилища для Celery. - Чтобы разрешить Celery и Django подключаться к Redis, мы определили две переменные окружения,
CELERY_BROKER
иCELERY_BACKEND
. - Наконец, чтобы сохранить данные Redis, мы создали том с именем
redis_data
.
Загрузите переменные окружения в web/core/settings.py вот так:
# web/core/settings.py
CELERY_BROKER_URL = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")
CELERY_RESULT_BACKEND = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")
Затем создайте celery.py файл в папке "web/core" со следующим содержимым:
# web/core/celery.py
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings")
app = Celery("core", result_extended=True)
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
Что здесь происходит?
- Мы устанавливаем новую переменную окружения с именем
DJANGO_SETTINGS_MODULE
, которая сообщает Celery, где находится наш Django settings.py файл. - Мы импортировали
Celery
и использовали его для инициализации экземпляра Celeryapp
. - Наконец, мы вызвали
autodiscover_tasks()
, который автоматически сканирует наш проект на Django, находит задачи Celery и регистрирует их.
И, наконец, обновите core/__init__.py , чтобы импортировать сельдерей при запуске Django:
# web/core/__init__.py
from .celery import app as celery_app
__all__ = [
"celery_app",
]
Отлично, мы успешно установили Celery.
Чтобы применить изменения, перезапустите проект Docker Compose:
$ docker compose down
$ docker compose up --build -d
Определите задачу
Давайте преобразуем нашу синхронную задачу generate_report_view()
в задачу Celery.
Создайте tasks.py файл в папке "web/reports" со следующим содержимым:
# web/reports/tasks.py
import time
from celery import shared_task
from reports.models import Report
@shared_task
def generate_report_task(report_id, **kwargs):
report = Report.objects.get(id=report_id)
# Simulate a long-running report generation
time.sleep(15)
report.content = "testdriven.io is cool!"
report.is_ready = True
report.save()
return "The report has been successfully generated!"
В этом фрагменте кода мы определили новую задачу Celery, используя декоратор @shared_task
. Вместо создания экземпляра отчета в задаче мы передаем его задаче по идентификатору. Делая это, Django узнает, каким будет идентификатор отчета до завершения задачи.
Кроме того, мы добавили **kwargs
в сигнатуру функции, что позволило нам позже передавать пользовательские аргументы задаче.
Поскольку в нашем контейнере с сельдереем не включена функция горячей перезагрузки, перезапустите его, прежде чем продолжить:
$ docker compose restart celery
Задача запуска
Запуск задачи через просмотр
Чтобы запустить задачу с нашей точки зрения, измените ее следующим образом:
# web/reports/views.py
from django.http import JsonResponse
from reports.models import Report
from reports.tasks import generate_report_task
def generate_report_view(request):
report = Report.objects.create()
task = generate_report_task.delay(report.pk)
return JsonResponse({
"status": "The report is being generated...",
"task_id": task.id,
})
Celery delay()
метод - это ярлык для асинхронного применения задачи путем отправки сообщения брокеру. Как упоминалось в предыдущем разделе, мы должны передать идентификатор отчета.
Протестируйте конечную точку, перейдя по ссылке http://localhost:8000/generate-report/ в вашем браузере:
{
"status": "The report is being generated...",
"task_id": "7b3b8132-41bc-4bca-a0bb-bd91e65e283a"
}
Вы заметите, что теперь ответ приходит мгновенно. Если вы зайдете в панель администратора отчетов, то увидите, что был создан новый отчет. Через 15 секунд отчет должен быть готов.
Запуск задачи через администратора
Запустить задачу с помощью администратора Django немного сложнее. Нам нужно:
- Создайте новое представление администратора Django.
- Переопределите метод
ModelAdmin
дляget_urls()
регистрации представления. - Переопределите шаблон администратора и добавьте кнопку запуска.
Давайте сделаем это!
Сначала перейдите к web/reports/admin.py и вставьте следующий код:
# web/reports/admin.py
from django.contrib import admin
from django.db import transaction
from django.shortcuts import redirect
from django.urls import path, reverse
from reports.models import Report
from reports.tasks import generate_report_task
class ReportAdmin(admin.ModelAdmin):
list_display = ["__str__", "created_at", "updated_at", "is_ready"]
change_list_template = "admin/reports/report/change_list.html"
readonly_fields = ["created_at", "updated_at"]
def get_urls(self):
urls = super().get_urls()
custom_urls = [
path(
"generate/",
self.admin_site.admin_view(self.admin_generate_report_view),
name="reports_generate",
),
]
return custom_urls + urls
def admin_generate_report_view(self, request):
with transaction.atomic():
report = Report.objects.create()
result = generate_report_task.delay(report_id=report.id)
self.message_user(request, "Started generating a report...")
return redirect(reverse(
"admin:reports_report_change",
kwargs={"object_id": report.id},
))
admin.site.register(Report, ReportAdmin)
Что здесь происходит?
- Мы определили представление под названием
admin_generate_report_view()
, которое запускает задачу. - Мы зарегистрировали представление, переопределив метод
get_urls()
. Обратите внимание, что при переопределении URL-адресов администратора Django пользовательские URL-адреса всегда должны быть первыми. - Мы изменили
change_list_template
, указав путь к пользовательскому шаблону.
Затем создайте следующую структуру каталогов в вашей папке "web/reports":
templates/
└── admin/
└── reports/
└── report/
└── change_list.html
Затем поместите следующий код в HTML-файл:
<!-- web/reports/templates/admin/reports/report/change_list.html -->
{% extends "admin/change_list.html" %}
{% block object-tools-items %}
<style>
.generatelink {
background: var(--link-fg) !important;
}
.generatelink:hover {
background: var(--link-hover-color) !important;
}
</style>
<li>
<a href="{% url "admin:reports_generate" %}" class="generatelink">
Generate report
</a>
</li>
{{ block.super }}
{% endblock %}
В этом шаблоне мы добавили пользовательскую кнопку "Создать отчет" рядом с кнопкой "История".
Перейдите на свой сайт администратора, нажмите кнопку Создать, и вы будете перенаправлены на страницу сведений об отчете. Через 15 секунд обновите страницу, и отчет будет готов.
Если вы хотите добавить кнопки запуска в другие места администрирования Django, ознакомьтесь с Исходным кодом Django, чтобы узнать, какие шаблоны вы должны переопределить.
Статус задачи
Помните серверную часть result, о которой я говорил ранее? Что ж, давайте используем ее для отображения статуса задачи в реальном времени и перенаправления пользователя только после завершения создания отчета.
Чтобы достичь этого, нам нужно:
- Включить расширенный сервер обработки результатов Celery.
- Определите представление администратора, которое возвращает статус задачи Celery.
- Создайте страницу администратора Django, на которой отображается счетчик и проводятся опросы о статусе.
- Передайте
redirect_url
задаче и перенаправьте на нее, как только задача будет выполнена.
Сначала добавьте следующие две настройки в свой web/core/settings.py файл:
# web/core/settings.py
CELERY_RESULT_EXTENDED = True
CELERY_TASK_TRACK_STARTED = True
Первая настройка указывает Celery сохранять не только статус задачи и результат, но и название задачи, аргументы, именованные аргументы и так далее. Вторая настройка предписывает Celery отслеживать все изменения статуса (не только PENDING
и окончательные).
Перезапустите контейнер с сельдереем:
$ docker compose restart celery
Затем создайте CustomAdminConfig
и CustomAdminSite
в web/core/admin.py вот так:
# web/core/admin.py
from django.contrib import admin
from django.http import JsonResponse
from django.shortcuts import render
from django.urls import path
from django.contrib.admin.apps import AdminConfig
from celery.result import AsyncResult
class CustomAdminConfig(AdminConfig):
default_site = "core.admin.CustomAdminSite"
class CustomAdminSite(admin.AdminSite):
def get_urls(self):
urls = super().get_urls()
custom_urls = [
path(
"task-status/<str:task_id>/",
self.admin_view(self.admin_task_status_view),
name="task_status",
)
]
return custom_urls + urls
def admin_task_status_view(self, request, task_id):
task = AsyncResult(task_id)
task_data = {
"id": task.id,
"name": task.name,
"args": task.args,
"kwargs": task.kwargs,
"state": task.state,
}
# Return JSON response if requested
if request.headers.get("Accept", "").startswith("application/json"):
return JsonResponse(task_data)
# Otherwise, render HTML response
return render(
request,
"admin/task_status.html",
{
"title": "Task Status",
"task": task_data,
},
)
Мы определили пользовательский сайт администратора Django с admin_task_status_view()
, который выбирает задачи Celery по идентификатору и возвращает их статус. Позже мы будем использовать эту конечную точку для опроса статуса из нашего шаблона Django.
Далее переключаем администрацию сайта в режим web/core/settings.py вот так:
# web/core/settings.py
INSTALLED_APPS = [
"core.admin.CustomAdminConfig", # the default admin was replaced by this one
"django.contrib.auth",
"django.contrib.contenttypes",
"django.contrib.sessions",
"django.contrib.messages",
"django.contrib.staticfiles",
"reports.apps.ReportsConfig",
]
Наконец, позаботьтесь о шаблонах, создав следующую структуру каталогов в "web":
templates/
└── admin/
├── components/
│ └── spinner.html
└── task_status.html
В файле spinner.html добавьте следующий код:
<!-- web/templates/admin/components/spinner.html -->
<div class="spinner"></div>
<style>
.spinner {
width: 32px;
height: 32px;
border: 3px solid var(--border-color);
border-top: 3px solid var(--button-bg);
border-radius: 50%;
animation: spin 1s linear infinite;
margin: 12px 0;
}
@keyframes spin {
0% {
transform: rotate(0deg);
}
100% {
transform: rotate(360deg);
}
}
</style>
И добавьте к следующееtask_status.html:
<!-- web/templates/admin/task_status.html -->
{% extends "admin/base_site.html" %}
{% block content %}
<ul style="padding-left: 24px;">
<li><b>Task ID:</b> {{ task.id }}</li>
<li><b>Task Name:</b> {{ task.name }}</li>
<li><b>Task Args:</b> {{ task.args }}</li>
<li><b>Task Kwargs:</b> {{ task.kwargs }}</li>
<li><b>Task State:</b> {{ task.state }}</li>
</ul>
{% if task.state == "PENDING" or task.state == "STARTED" %}
<hr style="margin: 12px 0;">
<div>
<p>Task is running...</p>
{% include "admin/components/spinner.html" %}
<p>(Last update: <span id="lastUpdate"></span>)</p>
</div>
<script>
let intervalId = null;
const lastUpdateSpan = document.getElementById("lastUpdate");
async function fetchStatusAndUpdate() {
try {
console.log("Fetching task status...");
// Fetch task status from the server
const response = await fetch(`/admin/task-status/{{ task.id }}/`, {
headers: { Accept: "application/json" }
});
const data = await response.json();
if (data["state"] === "SUCCESS" || data["state"] === "FAILURE") {
// Stop with the polling
if (intervalId) {
clearInterval(intervalId);
}
// If 'redirect_url' is present redirect to that URL
if (data["kwargs"] && data["kwargs"]["redirect_url"]) {
window.location.href = data["kwargs"]["redirect_url"];
}
}
// Update the 'lastUpdate' span
lastUpdateSpan.innerHTML = new Date().toLocaleTimeString();
} catch (error) {
console.error("Error fetching task status:", error);
}
}
// Instantly check the status and then poll every 3 seconds
intervalId = setInterval(fetchStatusAndUpdate, 3000);
fetchStatusAndUpdate();
</script>
{% endif %}
{% endblock %}
Что здесь происходит?
- Шаблон извлекает статус задачи и отображает его.
- После этого он опрашивает обновления статуса задачи каждые 3 секунды, используя
setInterval()
. - Как только задача выполняется успешно или с ошибкой, она перенаправляет пользователя на
redirect_url
.
Наконец, измените ReportAdmin
в admin_generate_report_view()
, чтобы передать URL-адрес перенаправления на задачу вместо мгновенного перенаправления на страницу сведений об отчете:
# web/reports/admin.py
class ReportAdmin(admin.ModelAdmin):
...
def admin_generate_report_view(self, request):
with transaction.atomic():
report = Report.objects.create()
redirect_url = reverse("admin:reports_report_change", kwargs={
"object_id": report.id,
})
result = generate_report_task.delay(
report_id=report.id,
redirect_url=redirect_url,
)
self.message_user(request, "Started generating a report...")
return redirect("admin:task_status", result.id)
Хорошая работа!
Теперь у нас есть пользовательское представление администратора, которое запрашивает статус задачи и перенаправляет пользователя на redirect_url
после завершения задачи. Чтобы протестировать это, перейдите к своему администратору Django и запустите генерацию отчета, как и раньше.
Заключение
В этом руководстве мы рассмотрели, как интегрировать Celery с Django, чтобы запускать длительные задачи из Django admin. Кроме того, мы узнали, как использовать серверную часть результатов для получения статуса задачи и отображения его практически в режиме реального времени.
Окончательный проект можно просмотреть в репозитории django-celery-admin на GitHub.
Чтобы получить максимальную отдачу от Django admin и Celery, ознакомьтесь со следующими ресурсами:
- Настройка администратора Django
- Добавление диаграмм в Django с помощью Chart.js
- Работа с Django и Celery
- Полное руководство по Celery и Django