Обработка периодических задач в Django с помощью Celery и Docker
По мере создания и масштабирования приложения Django вам неизбежно потребуется периодически и автоматически выполнять определенные задачи в фоновом режиме.
Некоторые примеры:
- Генерирование периодических отчетов
- Очистка кэша
- Отправка пакетных уведомлений по электронной почте
- Запуск ночных заданий по обслуживанию
Это одна из немногих функциональных частей, необходимых для создания и масштабирования веб-приложений, которая не является частью ядра Django. К счастью, Celery предлагает мощное решение, которое довольно легко реализовать под названием Celery Beat.
В следующей статье мы покажем вам, как настроить Django, Celery и Redis с помощью Docker, чтобы периодически запускать пользовательскую команду Django Admin с помощью Celery Beat.
Django + Celery Series:
Цели
К концу этого урока вы должны уметь:
- Контейнеризируйте Django, Celery и Redis с помощью Docker
- Интегрируйте Celery в приложение Django и создавайте задачи
- Напишите пользовательскую команду администратора Django
- Запланируйте периодическое выполнение пользовательской команды администратора Django через Celery Beat.
Настройка проекта
Склонируйте базовый проект из репо django-celery-beat, а затем проверьте ветку base:
$ git clone https://github.com/testdrivenio/django-celery-beat --branch base --single-branch
$ cd django-celery-beat
Поскольку нам потребуется управлять четырьмя процессами (Django, Redis, worker и scheduler), мы используем Docker для упрощения рабочего процесса, соединив их так, чтобы все они могли запускаться из одного окна терминала одной командой.
Из корня проекта создайте образы и запустите контейнеры Docker:
$ docker-compose up -d --build
Далее, примените миграции:
$ docker-compose exec web python manage.py migrate
После завершения сборки перейдите по адресу http://localhost:1337, чтобы убедиться, что приложение работает так, как ожидалось. Вы должны увидеть следующий текст:
Orders
No orders found!
Ознакомьтесь со структурой проекта, прежде чем двигаться дальше:
├── .gitignore
├── docker-compose.yml
└── project
├── Dockerfile
├── core
│ ├── __init__.py
│ ├── asgi.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
├── entrypoint.sh
├── manage.py
├── orders
│ ├── __init__.py
│ ├── admin.py
│ ├── apps.py
│ ├── migrations
│ │ ├── 0001_initial.py
│ │ └── __init__.py
│ ├── models.py
│ ├── tests.py
│ ├── urls.py
│ └── views.py
├── products.json
├── requirements.txt
└── templates
└── orders
└── order_list.html
Хотите узнать, как построить этот проект? Просмотрите статью Dockerizing Django with Postgres, Gunicorn, and Nginx в блоге
Celery и Redis
Теперь нам нужно добавить контейнеры для Celery, Celery Beat и Redis.
Начнем с добавления зависимостей в файл requirements.txt:
Django==3.2.4
celery==5.1.2
redis==3.5.3
Далее, добавьте следующее в конец файла docker-compose.yml:
redis:
image: redis:alpine
celery:
build: ./project
command: celery -A core worker -l info
volumes:
- ./project/:/usr/src/app/
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
depends_on:
- redis
celery-beat:
build: ./project
command: celery -A core beat -l info
volumes:
- ./project/:/usr/src/app/
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
depends_on:
- redis
Нам также необходимо обновить раздел depends_on
веб-сервиса:
web:
build: ./project
command: python manage.py runserver 0.0.0.0:8000
volumes:
- ./project/:/usr/src/app/
ports:
- 1337:8000
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
depends_on:
- redis # NEW
Полный файл docker-compose.yml теперь должен выглядеть следующим образом:
version: '3.8'
services:
web:
build: ./project
command: python manage.py runserver 0.0.0.0:8000
volumes:
- ./project/:/usr/src/app/
ports:
- 1337:8000
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
depends_on:
- redis
redis:
image: redis:alpine
celery:
build: ./project
command: celery -A core worker -l info
volumes:
- ./project/:/usr/src/app/
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
depends_on:
- redis
celery-beat:
build: ./project
command: celery -A core beat -l info
volumes:
- ./project/:/usr/src/app/
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
depends_on:
- redis
Перед созданием новых контейнеров нам необходимо настроить Celery в нашем приложении Django.
Конфигурация Celery
Настройка
В каталоге "core" создайте файл celery.py и добавьте следующий код:
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings")
app = Celery("core")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
Что здесь происходит?
- First, we set a default value for the
DJANGO_SETTINGS_MODULE
environment variable so that the Celery will know how to find the Django project. - Next, we created a new Celery instance, with the name
core
, and assigned the value to a variable calledapp
. - We then loaded the celery configuration values from the settings object from
django.conf
. We usednamespace="CELERY"
to prevent clashes with other Django settings. All config settings for Celery must be prefixed withCELERY_
, in other words. - Finally,
app.autodiscover_tasks()
tells Celery to look for Celery tasks from applications defined insettings.INSTALLED_APPS
.
Добавьте следующий код в core/__init__.py:
from .celery import app as celery_app
__all__ = ("celery_app",)
В заключение, обновите файл core/settings.py со следующими настройками Celery, чтобы он мог подключаться к Redis:
CELERY_BROKER_URL = "redis://redis:6379"
CELERY_RESULT_BACKEND = "redis://redis:6379"
Соберите новые контейнеры, чтобы убедиться, что все работает:
$ docker-compose up -d --build
Просмотрите журналы для каждой службы, чтобы убедиться, что они готовы, без ошибок:
$ docker-compose logs 'web'
$ docker-compose logs 'celery'
$ docker-compose logs 'celery-beat'
$ docker-compose logs 'redis'
Если все прошло хорошо, у нас теперь есть четыре контейнера, каждый с различными службами.
Теперь мы готовы создать пример задания, чтобы убедиться, что оно работает так, как нужно.
Создать задачу
Создайте новый файл под названием core/tasks.py и добавьте следующий код для примера задачи, которая просто ведет журнал в консоль:
from celery import shared_task
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@shared_task
def sample_task():
logger.info("The sample task just ran.")
Запланировать задачу
В конце файла settings.py добавьте следующий код, чтобы запланировать выполнение sample_task
раз в минуту, используя Celery Beat:
CELERY_BEAT_SCHEDULE = {
"sample_task": {
"task": "core.tasks.sample_task",
"schedule": crontab(minute="*/1"),
},
}
Здесь мы определили периодическую задачу, используя настройку CELERY_BEAT_SCHEDULE. Мы дали задаче имя, sample_task
, а затем объявили две настройки:
task
объявляет, какую задачу выполнять.Schedule
устанавливает интервал, в течение которого задача должна выполняться. Это может быть целое число, timedelta или crontab. Мы использовали шаблон crontab для нашей задачи, чтобы заставить ее запускаться каждую минуту. Вы можете найти больше информации о расписании Celery здесь.
Убедитесь, что добавили импорты:
from celery.schedules import crontab
import core.tasks
Перезапустите контейнер, чтобы применить новые настройки:
$ docker-compose up -d --build
После этого посмотрите на поленья сельдерея в контейнере:
$ docker-compose logs -f 'celery'
Вы должны увидеть что-то похожее на:
celery_1 | -------------- [queues]
celery_1 | .> celery exchange=celery(direct) key=celery
celery_1 |
celery_1 |
celery_1 | [tasks]
celery_1 | . core.tasks.sample_task
Мы видим, что Celery подхватил наш образец задачи, core.tasks.sample_task
.
Каждую минуту вы должны видеть строку в журнале, которая заканчивается словами "Только что была выполнена задача-образец.":
celery_1 | [2021-07-01 03:06:00,003: INFO/MainProcess]
Task core.tasks.sample_task[b8041b6c-bf9b-47ce-ab00-c37c1e837bc7] received
celery_1 | [2021-07-01 03:06:00,004: INFO/ForkPoolWorker-8]
core.tasks.sample_task[b8041b6c-bf9b-47ce-ab00-c37c1e837bc7]:
The sample task just ran.
Пользовательская команда администратора Django
Django предоставляет множество встроенных django-admin
команд, например:
migrate
startproject
startapp
dumpdata
makemigrations
Наряду со встроенными командами, Django также предоставляет нам возможность создавать собственные пользовательские команды:
Пользовательские команды управления особенно полезны для запуска автономных сценариев или сценариев, которые периодически выполняются из кронтаба UNIX или из панели управления запланированными задачами Windows.
Итак, сначала мы настроим новую команду, а затем используем Celery Beat для ее автоматического запуска.
Начните с создания нового файла orders/management/commands/my_custom_command.py. Затем добавьте минимально необходимый код для его запуска:
from django.core.management.base import BaseCommand, CommandError
class Command(BaseCommand):
help = "A description of the command"
def handle(self, *args, **options):
pass
У BaseCommand
есть несколько методов, которые могут быть переопределены, но единственный метод, который необходим, это handle
. handle
является точкой входа для пользовательских команд. Другими словами, когда мы запускаем команду, вызывается этот метод.
Для тестирования мы обычно просто добавляем быстрый оператор print. Однако, согласно документации Django, рекомендуется использовать stdout.write
:
Когда вы используете команды управления и хотите обеспечить консольный вывод, вам следует писать в self.stdout и self.stderr, вместо того чтобы печатать в stdout и stderr напрямую. Использование этих прокси-серверов значительно упрощает тестирование вашей пользовательской команды. Обратите внимание, что вам не нужно заканчивать сообщения символом новой строки, он будет добавлен автоматически, если вы не укажете параметр окончания.
Итак, добавьте команду self.stdout.write
:
from django.core.management.base import BaseCommand, CommandError
class Command(BaseCommand):
help = "A description of the command"
def handle(self, *args, **options):
self.stdout.write("My sample command just ran.") # NEW
Для проверки, из командной строки, выполните:
$ docker-compose exec web python manage.py my_custom_command
Вы должны увидеть:
My sample command just ran.
С этим давайте свяжем все вместе!
Запланировать пользовательскую команду с Celery Beat
Теперь, когда мы запустили контейнеры, проверили, что можем запланировать периодическое выполнение задачи, и написали пользовательскую команду Django Admin, пришло время настроить Celery Beat на периодическое выполнение пользовательской команды.
Настройка
В проекте у нас есть очень базовое приложение под названием orders. Оно содержит две модели, Product
и Order
. Давайте создадим пользовательскую команду, которая отправляет по электронной почте отчет о подтвержденных заказах за день.
Для начала мы добавим несколько товаров и заказов в базу данных с помощью приспособления, включенного в этот проект:
$ docker-compose exec web python manage.py loaddata products.json
Далее добавьте несколько образцов заказов через интерфейс Django Admin. Для этого сначала создайте суперпользователя:
$ docker-compose exec web python manage.py createsuperuser
Введите имя пользователя, электронную почту и пароль, когда появится запрос. Затем перейдите по адресу http://127.0.0.1:1337/admin в своем веб-браузере. Войдите в систему под только что созданным суперпользователем и создайте несколько заказов. Убедитесь, что хотя бы один из них имеет дату confirmed_date
сегодня.
Давайте создадим новую пользовательскую команду для нашего отчета по электронной почте.
Создайте файл с именем orders/management/commands/email_report.py:
from datetime import timedelta, time, datetime
from django.core.mail import mail_admins
from django.core.management import BaseCommand
from django.utils import timezone
from django.utils.timezone import make_aware
from orders.models import Order
today = timezone.now()
tomorrow = today + timedelta(1)
today_start = make_aware(datetime.combine(today, time()))
today_end = make_aware(datetime.combine(tomorrow, time()))
class Command(BaseCommand):
help = "Send Today's Orders Report to Admins"
def handle(self, *args, **options):
orders = Order.objects.filter(confirmed_date__range=(today_start, today_end))
if orders:
message = ""
for order in orders:
message += f"{order} \n"
subject = (
f"Order Report for {today_start.strftime('%Y-%m-%d')} "
f"to {today_end.strftime('%Y-%m-%d')}"
)
mail_admins(subject=subject, message=message, html_message=None)
self.stdout.write("E-mail Report was sent.")
else:
self.stdout.write("No orders confirmed today.")
В коде мы запрашивали базу данных на предмет заказов с датой confirmed_date
сегодня, объединяли заказы в одно сообщение для тела письма и использовали встроенную в Django команду mail_admins
для отправки писем администраторам.
Добавьте фиктивный email администратора и настройте EMAIL_BACKEND
на использование бэкенда Console backend, чтобы email отправлялся в stdout, в файле настроек:
EMAIL_BACKEND = "django.core.mail.backends.console.EmailBackend"
DEFAULT_FROM_EMAIL = "noreply@email.com"
ADMINS = [("testuser", "test.user@email.com"), ]
Теперь должно быть возможно запустить нашу новую команду из терминала.
$ docker-compose exec web python manage.py email_report
Вывод должен выглядеть примерно так:
Content-Type: text/plain; charset="utf-8"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
Subject: [Django] Order Report for 2021-07-01 to 2021-07-02
From: root@localhost
To: test.user@email.com
Date: Thu, 01 Jul 2021 12:15:50 -0000
Message-ID: <162514175053.40.5705892371538583115@5140844ebb15>
Order: 3947963f-1860-44d1-9b9a-4648fed04581 - product: Coffee
Order: ff449e6e-3dfd-48a8-9d5c-79a145d08253 - product: Rice
-------------------------------------------------------------------------------
E-mail Report was sent.
Celery Beat
Теперь нам нужно создать периодическую задачу для ежедневного выполнения этой команды.
Добавьте новую задачу в core/tasks.py:
from celery import shared_task
from celery.utils.log import get_task_logger
from django.core.management import call_command # NEW
logger = get_task_logger(__name__)
@shared_task
def sample_task():
logger.info("The sample task just ran.")
# NEW
@shared_task
def send_email_report():
call_command("email_report", )
Итак, сначала мы добавили импорт call_command
, который используется для программного вызова команд django-admin. Затем в новой задаче мы использовали call_command
с именем нашей пользовательской команды в качестве аргумента.
Чтобы запланировать эту задачу, откройте файл core/settings.py и обновите настройку CELERY_BEAT_SCHEDULE
, чтобы включить новую задачу:
CELERY_BEAT_SCHEDULE = {
"sample_task": {
"task": "core.tasks.sample_task",
"schedule": crontab(minute="*/1"),
},
"send_email_report": {
"task": "core.tasks.send_email_report",
"schedule": crontab(hour="*/1"),
},
}
Здесь мы добавили новую запись в CELERY_BEAT_SCHEDULE
под названием send_email_report
. Как и для предыдущей задачи, мы объявили, какую задачу она должна выполнять - например, core.tasks.send_email_report
- и использовали шаблон crontab для установки повторяемости.
Перезапустите контейнеры, чтобы убедиться, что новые настройки стали активными:
$ docker-compose up -d --build
Откройте журналы, связанные со службой celery
:
$ docker-compose logs -f 'celery'
Вы должны увидеть send_email_report
перечисленное:
celery_1 | -------------- [queues]
celery_1 | .> celery exchange=celery(direct) key=celery
celery_1 |
celery_1 |
celery_1 | [tasks]
celery_1 | . core.tasks.sample_task
celery_1 | . core.tasks.send_email_report
Через минуту или около того вы увидите, что отчет по электронной почте отправлен:
celery_1 | [2021-07-01 12:22:00,071: WARNING/ForkPoolWorker-8] Content-Type: text/plain; charset="utf-8"
celery_1 | MIME-Version: 1.0
celery_1 | Content-Transfer-Encoding: 7bit
celery_1 | Subject: [Django] Order Report for 2021-07-01 to 2021-07-02
celery_1 | From: root@localhost
celery_1 | To: test.user@email.com
celery_1 | Date: Thu, 01 Jul 2021 12:22:00 -0000
celery_1 | Message-ID: <162514212006.17.6080459299558356876@55b9883c5414>
celery_1 |
celery_1 | Order: 3947963f-1860-44d1-9b9a-4648fed04581 - product: Coffee
celery_1 | Order: ff449e6e-3dfd-48a8-9d5c-79a145d08253 - product: Rice
celery_1 |
celery_1 |
celery_1 | [2021-07-01 12:22:00,071: WARNING/ForkPoolWorker-8] -------------------------------------------------------------------------------
celery_1 | [2021-07-01 12:22:00,071: WARNING/ForkPoolWorker-8]
celery_1 |
celery_1 | [2021-07-01 12:22:00,071: WARNING/ForkPoolWorker-8] E-mail Report was sent.
Заключение
В этой статье мы провели вас через настройку Docker-контейнеров для Celery, Celery Beat и Redis. Затем мы показали, как создать пользовательскую команду Django Admin и периодическую задачу с Celery Beat для автоматического выполнения этой команды.
Ищете больше?
- Настройте Flower для мониторинга и администрирования заданий и работников Celery
- Тестирование задачи Celery с помощью модульных и интеграционных тестов
Возьмите код из repo.
Вернуться на верхDjango + Celery Series: