Обработка периодических задач в Django с помощью Celery и Docker

По мере создания и масштабирования приложения Django вам неизбежно потребуется периодически и автоматически выполнять определенные задачи в фоновом режиме.

Некоторые примеры:

  • Генерирование периодических отчетов
  • Очистка кэша
  • Отправка пакетных уведомлений по электронной почте
  • Запуск ночных заданий по обслуживанию

Это одна из немногих функциональных частей, необходимых для создания и масштабирования веб-приложений, которая не является частью ядра Django. К счастью, Celery предлагает мощное решение, которое довольно легко реализовать под названием Celery Beat.

В следующей статье мы покажем вам, как настроить Django, Celery и Redis с помощью Docker, чтобы периодически запускать пользовательскую команду Django Admin с помощью Celery Beat.

Django + Celery Series:

  1. Асинхронные задачи с Django и Celery
  2. Обработка периодических задач в Django с помощью Celery и Docker (эта статья)
  3. Автоматический повтор невыполненных задач Celery
  4. Работа с транзакциями базы данных Celery и Django

Цели

К концу этого урока вы должны уметь:

  1. Контейнеризируйте Django, Celery и Redis с помощью Docker
  2. Интегрируйте Celery в приложение Django и создавайте задачи
  3. Напишите пользовательскую команду администратора Django
  4. Запланируйте периодическое выполнение пользовательской команды администратора Django через Celery Beat.

Настройка проекта

Склонируйте базовый проект из репо django-celery-beat, а затем проверьте ветку base:

$ git clone https://github.com/testdrivenio/django-celery-beat --branch base --single-branch
$ cd django-celery-beat

Поскольку нам потребуется управлять четырьмя процессами (Django, Redis, worker и scheduler), мы используем Docker для упрощения рабочего процесса, соединив их так, чтобы все они могли запускаться из одного окна терминала одной командой.

Из корня проекта создайте образы и запустите контейнеры Docker:

$ docker-compose up -d --build

Далее, примените миграции:

$ docker-compose exec web python manage.py migrate

После завершения сборки перейдите по адресу http://localhost:1337, чтобы убедиться, что приложение работает так, как ожидалось. Вы должны увидеть следующий текст:

Orders

No orders found!

Ознакомьтесь со структурой проекта, прежде чем двигаться дальше:

├── .gitignore
├── docker-compose.yml
└── project
    ├── Dockerfile
    ├── core
    │   ├── __init__.py
    │   ├── asgi.py
    │   ├── settings.py
    │   ├── urls.py
    │   └── wsgi.py
    ├── entrypoint.sh
    ├── manage.py
    ├── orders
    │   ├── __init__.py
    │   ├── admin.py
    │   ├── apps.py
    │   ├── migrations
    │   │   ├── 0001_initial.py
    │   │   └── __init__.py
    │   ├── models.py
    │   ├── tests.py
    │   ├── urls.py
    │   └── views.py
    ├── products.json
    ├── requirements.txt
    └── templates
        └── orders
            └── order_list.html

Хотите узнать, как построить этот проект? Просмотрите статью Dockerizing Django with Postgres, Gunicorn, and Nginx в блоге

Celery и Redis

Теперь нам нужно добавить контейнеры для Celery, Celery Beat и Redis.

Начнем с добавления зависимостей в файл requirements.txt:

Django==3.2.4
celery==5.1.2
redis==3.5.3

Далее, добавьте следующее в конец файла docker-compose.yml:

redis:
  image: redis:alpine
celery:
  build: ./project
  command: celery -A core worker -l info
  volumes:
    - ./project/:/usr/src/app/
  environment:
    - DEBUG=1
    - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
    - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
  depends_on:
    - redis
celery-beat:
  build: ./project
  command: celery -A core beat -l info
  volumes:
    - ./project/:/usr/src/app/
  environment:
    - DEBUG=1
    - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
    - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
  depends_on:
    - redis

Нам также необходимо обновить раздел depends_on веб-сервиса:

web:
  build: ./project
  command: python manage.py runserver 0.0.0.0:8000
  volumes:
    - ./project/:/usr/src/app/
  ports:
    - 1337:8000
  environment:
    - DEBUG=1
    - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
    - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
  depends_on:
    - redis # NEW

Полный файл docker-compose.yml теперь должен выглядеть следующим образом:

version: '3.8'

services:
  web:
    build: ./project
    command: python manage.py runserver 0.0.0.0:8000
    volumes:
      - ./project/:/usr/src/app/
    ports:
      - 1337:8000
    environment:
      - DEBUG=1
      - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
      - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
    depends_on:
      - redis
  redis:
    image: redis:alpine
  celery:
    build: ./project
    command: celery -A core worker -l info
    volumes:
      - ./project/:/usr/src/app/
    environment:
      - DEBUG=1
      - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
      - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
    depends_on:
      - redis
  celery-beat:
    build: ./project
    command: celery -A core beat -l info
    volumes:
      - ./project/:/usr/src/app/
    environment:
      - DEBUG=1
      - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
      - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
    depends_on:
      - redis

Перед созданием новых контейнеров нам необходимо настроить Celery в нашем приложении Django.

Конфигурация Celery

Настройка

В каталоге "core" создайте файл celery.py и добавьте следующий код:

import os

from celery import Celery


os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings")

app = Celery("core")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

Что здесь происходит?

  1. First, we set a default value for the DJANGO_SETTINGS_MODULE environment variable so that the Celery will know how to find the Django project.
  2. Next, we created a new Celery instance, with the name core, and assigned the value to a variable called app.
  3. We then loaded the celery configuration values from the settings object from django.conf. We used namespace="CELERY" to prevent clashes with other Django settings. All config settings for Celery must be prefixed with CELERY_, in other words.
  4. Finally, app.autodiscover_tasks() tells Celery to look for Celery tasks from applications defined in settings.INSTALLED_APPS.

Добавьте следующий код в core/__init__.py:

from .celery import app as celery_app

__all__ = ("celery_app",)

В заключение, обновите файл core/settings.py со следующими настройками Celery, чтобы он мог подключаться к Redis:

CELERY_BROKER_URL = "redis://redis:6379"
CELERY_RESULT_BACKEND = "redis://redis:6379"

Соберите новые контейнеры, чтобы убедиться, что все работает:

$ docker-compose up -d --build

Просмотрите журналы для каждой службы, чтобы убедиться, что они готовы, без ошибок:

$ docker-compose logs 'web'
$ docker-compose logs 'celery'
$ docker-compose logs 'celery-beat'
$ docker-compose logs 'redis'

Если все прошло хорошо, у нас теперь есть четыре контейнера, каждый с различными службами.

Теперь мы готовы создать пример задания, чтобы убедиться, что оно работает так, как нужно.

Создать задачу

Создайте новый файл под названием core/tasks.py и добавьте следующий код для примера задачи, которая просто ведет журнал в консоль:

from celery import shared_task
from celery.utils.log import get_task_logger


logger = get_task_logger(__name__)


@shared_task
def sample_task():
    logger.info("The sample task just ran.")

Запланировать задачу

В конце файла settings.py добавьте следующий код, чтобы запланировать выполнение sample_task раз в минуту, используя Celery Beat:

CELERY_BEAT_SCHEDULE = {
    "sample_task": {
        "task": "core.tasks.sample_task",
        "schedule": crontab(minute="*/1"),
    },
}

Здесь мы определили периодическую задачу, используя настройку CELERY_BEAT_SCHEDULE. Мы дали задаче имя, sample_task, а затем объявили две настройки:

  1. task объявляет, какую задачу выполнять.
  2. Schedule устанавливает интервал, в течение которого задача должна выполняться. Это может быть целое число, timedelta или crontab. Мы использовали шаблон crontab для нашей задачи, чтобы заставить ее запускаться каждую минуту. Вы можете найти больше информации о расписании Celery здесь.

Убедитесь, что добавили импорты:

from celery.schedules import crontab

import core.tasks

Перезапустите контейнер, чтобы применить новые настройки:

$ docker-compose up -d --build

После этого посмотрите на поленья сельдерея в контейнере:

$ docker-compose logs -f 'celery'

Вы должны увидеть что-то похожее на:

celery_1  |  -------------- [queues]
celery_1  |                 .> celery           exchange=celery(direct) key=celery
celery_1  |
celery_1  |
celery_1  | [tasks]
celery_1  |   . core.tasks.sample_task

Мы видим, что Celery подхватил наш образец задачи, core.tasks.sample_task.

Каждую минуту вы должны видеть строку в журнале, которая заканчивается словами "Только что была выполнена задача-образец.":

celery_1  | [2021-07-01 03:06:00,003: INFO/MainProcess]
              Task core.tasks.sample_task[b8041b6c-bf9b-47ce-ab00-c37c1e837bc7] received
celery_1  | [2021-07-01 03:06:00,004: INFO/ForkPoolWorker-8]
              core.tasks.sample_task[b8041b6c-bf9b-47ce-ab00-c37c1e837bc7]:
              The sample task just ran.

Пользовательская команда администратора Django

Django предоставляет множество встроенных django-admin команд, например:

  • migrate
  • startproject
  • startapp
  • dumpdata
  • makemigrations

Наряду со встроенными командами, Django также предоставляет нам возможность создавать собственные пользовательские команды:

Пользовательские команды управления особенно полезны для запуска автономных сценариев или сценариев, которые периодически выполняются из кронтаба UNIX или из панели управления запланированными задачами Windows.

Итак, сначала мы настроим новую команду, а затем используем Celery Beat для ее автоматического запуска.

Начните с создания нового файла orders/management/commands/my_custom_command.py. Затем добавьте минимально необходимый код для его запуска:

from django.core.management.base import BaseCommand, CommandError


class Command(BaseCommand):
    help = "A description of the command"

    def handle(self, *args, **options):
        pass

У BaseCommand есть несколько методов, которые могут быть переопределены, но единственный метод, который необходим, это handle. handle является точкой входа для пользовательских команд. Другими словами, когда мы запускаем команду, вызывается этот метод.

Для тестирования мы обычно просто добавляем быстрый оператор print. Однако, согласно документации Django, рекомендуется использовать stdout.write:

Когда вы используете команды управления и хотите обеспечить консольный вывод, вам следует писать в self.stdout и self.stderr, вместо того чтобы печатать в stdout и stderr напрямую. Использование этих прокси-серверов значительно упрощает тестирование вашей пользовательской команды. Обратите внимание, что вам не нужно заканчивать сообщения символом новой строки, он будет добавлен автоматически, если вы не укажете параметр окончания.

Итак, добавьте команду self.stdout.write:

from django.core.management.base import BaseCommand, CommandError


class Command(BaseCommand):
    help = "A description of the command"

    def handle(self, *args, **options):
        self.stdout.write("My sample command just ran.") # NEW

Для проверки, из командной строки, выполните:

$ docker-compose exec web python manage.py my_custom_command

Вы должны увидеть:

My sample command just ran.

С этим давайте свяжем все вместе!

Запланировать пользовательскую команду с Celery Beat

Теперь, когда мы запустили контейнеры, проверили, что можем запланировать периодическое выполнение задачи, и написали пользовательскую команду Django Admin, пришло время настроить Celery Beat на периодическое выполнение пользовательской команды.

Настройка

В проекте у нас есть очень базовое приложение под названием orders. Оно содержит две модели, Product и Order. Давайте создадим пользовательскую команду, которая отправляет по электронной почте отчет о подтвержденных заказах за день.

Для начала мы добавим несколько товаров и заказов в базу данных с помощью приспособления, включенного в этот проект:

$ docker-compose exec web python manage.py loaddata products.json

Далее добавьте несколько образцов заказов через интерфейс Django Admin. Для этого сначала создайте суперпользователя:

$ docker-compose exec web python manage.py createsuperuser

Введите имя пользователя, электронную почту и пароль, когда появится запрос. Затем перейдите по адресу http://127.0.0.1:1337/admin в своем веб-браузере. Войдите в систему под только что созданным суперпользователем и создайте несколько заказов. Убедитесь, что хотя бы один из них имеет дату confirmed_date сегодня.

Давайте создадим новую пользовательскую команду для нашего отчета по электронной почте.

Создайте файл с именем orders/management/commands/email_report.py:

from datetime import timedelta, time, datetime

from django.core.mail import mail_admins
from django.core.management import BaseCommand
from django.utils import timezone
from django.utils.timezone import make_aware

from orders.models import Order

today = timezone.now()
tomorrow = today + timedelta(1)
today_start = make_aware(datetime.combine(today, time()))
today_end = make_aware(datetime.combine(tomorrow, time()))


class Command(BaseCommand):
    help = "Send Today's Orders Report to Admins"

    def handle(self, *args, **options):
        orders = Order.objects.filter(confirmed_date__range=(today_start, today_end))

        if orders:
            message = ""

            for order in orders:
                message += f"{order} \n"

            subject = (
                f"Order Report for {today_start.strftime('%Y-%m-%d')} "
                f"to {today_end.strftime('%Y-%m-%d')}"
            )

            mail_admins(subject=subject, message=message, html_message=None)

            self.stdout.write("E-mail Report was sent.")
        else:
            self.stdout.write("No orders confirmed today.")

В коде мы запрашивали базу данных на предмет заказов с датой confirmed_date сегодня, объединяли заказы в одно сообщение для тела письма и использовали встроенную в Django команду mail_admins для отправки писем администраторам.

Добавьте фиктивный email администратора и настройте EMAIL_BACKEND на использование бэкенда Console backend, чтобы email отправлялся в stdout, в файле настроек:

EMAIL_BACKEND = "django.core.mail.backends.console.EmailBackend"
DEFAULT_FROM_EMAIL = "noreply@email.com"
ADMINS = [("testuser", "test.user@email.com"), ]

Теперь должно быть возможно запустить нашу новую команду из терминала.

$ docker-compose exec web python manage.py email_report

Вывод должен выглядеть примерно так:

Content-Type: text/plain; charset="utf-8"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
Subject: [Django] Order Report for 2021-07-01 to 2021-07-02
From: root@localhost
To: test.user@email.com
Date: Thu, 01 Jul 2021 12:15:50 -0000
Message-ID: <162514175053.40.5705892371538583115@5140844ebb15>

Order: 3947963f-1860-44d1-9b9a-4648fed04581 - product: Coffee
Order: ff449e6e-3dfd-48a8-9d5c-79a145d08253 - product: Rice

-------------------------------------------------------------------------------
E-mail Report was sent.

Celery Beat

Теперь нам нужно создать периодическую задачу для ежедневного выполнения этой команды.

Добавьте новую задачу в core/tasks.py:

from celery import shared_task
from celery.utils.log import get_task_logger
from django.core.management import call_command # NEW


logger = get_task_logger(__name__)


@shared_task
def sample_task():
    logger.info("The sample task just ran.")


# NEW
@shared_task
def send_email_report():
    call_command("email_report", )

Итак, сначала мы добавили импорт call_command, который используется для программного вызова команд django-admin. Затем в новой задаче мы использовали call_command с именем нашей пользовательской команды в качестве аргумента.

Чтобы запланировать эту задачу, откройте файл core/settings.py и обновите настройку CELERY_BEAT_SCHEDULE, чтобы включить новую задачу:

CELERY_BEAT_SCHEDULE = {
    "sample_task": {
        "task": "core.tasks.sample_task",
        "schedule": crontab(minute="*/1"),
    },
    "send_email_report": {
        "task": "core.tasks.send_email_report",
        "schedule": crontab(hour="*/1"),
    },
}

Здесь мы добавили новую запись в CELERY_BEAT_SCHEDULE под названием send_email_report. Как и для предыдущей задачи, мы объявили, какую задачу она должна выполнять - например, core.tasks.send_email_report - и использовали шаблон crontab для установки повторяемости.

Перезапустите контейнеры, чтобы убедиться, что новые настройки стали активными:

$ docker-compose up -d --build

Откройте журналы, связанные со службой celery:

$ docker-compose logs -f 'celery'

Вы должны увидеть send_email_report перечисленное:

celery_1  |  -------------- [queues]
celery_1  |                 .> celery           exchange=celery(direct) key=celery
celery_1  |
celery_1  |
celery_1  | [tasks]
celery_1  |   . core.tasks.sample_task
celery_1  |   . core.tasks.send_email_report

Через минуту или около того вы увидите, что отчет по электронной почте отправлен:

celery_1  | [2021-07-01 12:22:00,071: WARNING/ForkPoolWorker-8] Content-Type: text/plain; charset="utf-8"
celery_1  | MIME-Version: 1.0
celery_1  | Content-Transfer-Encoding: 7bit
celery_1  | Subject: [Django] Order Report for 2021-07-01 to 2021-07-02
celery_1  | From: root@localhost
celery_1  | To: test.user@email.com
celery_1  | Date: Thu, 01 Jul 2021 12:22:00 -0000
celery_1  | Message-ID: <162514212006.17.6080459299558356876@55b9883c5414>
celery_1  |
celery_1  | Order: 3947963f-1860-44d1-9b9a-4648fed04581 - product: Coffee
celery_1  | Order: ff449e6e-3dfd-48a8-9d5c-79a145d08253 - product: Rice
celery_1  |
celery_1  |
celery_1  | [2021-07-01 12:22:00,071: WARNING/ForkPoolWorker-8] -------------------------------------------------------------------------------
celery_1  | [2021-07-01 12:22:00,071: WARNING/ForkPoolWorker-8]
celery_1  |
celery_1  | [2021-07-01 12:22:00,071: WARNING/ForkPoolWorker-8] E-mail Report was sent.

Заключение

В этой статье мы провели вас через настройку Docker-контейнеров для Celery, Celery Beat и Redis. Затем мы показали, как создать пользовательскую команду Django Admin и периодическую задачу с Celery Beat для автоматического выполнения этой команды.

Ищете больше?

  1. Настройте Flower для мониторинга и администрирования заданий и работников Celery
  2. Тестирование задачи Celery с помощью модульных и интеграционных тестов

Возьмите код из repo.

Django + Celery Series:

  1. Асинхронные задачи с Django и Celery
  2. Обработка периодических задач в Django с помощью Celery и Docker (эта статья)
  3. Автоматический повтор невыполненных задач Celery
  4. Работа с транзакциями базы данных Celery и Django
Вернуться на верх