Асинхронные задачи с Django и Celery

Если долго выполняющийся процесс является частью рабочего процесса вашего приложения, вместо того чтобы блокировать ответ, вы должны обрабатывать его в фоновом режиме, вне обычного потока запросов/ответов.

Возможно, ваше веб-приложение требует от пользователей отправки миниатюры (которую, вероятно, придется изменить в размере) и подтверждения электронной почты при регистрации. Если бы ваше приложение обрабатывало изображение и отправляло подтверждение по электронной почте непосредственно в обработчике запроса, то конечному пользователю пришлось бы без необходимости ждать, пока они оба закончат обработку, прежде чем страница загрузится или обновится. Вместо этого вы захотите передать эти процессы в очередь задач и позволить отдельному рабочему процессу заняться ими, чтобы вы могли немедленно отправить ответ клиенту. Конечный пользователь может заниматься другими делами на стороне клиента, пока идет обработка. Ваше приложение также может свободно отвечать на запросы других пользователей и клиентов.

Для достижения этой цели мы проведем вас через процесс установки и настройки Celery и Redis для обработки долгоиграющих процессов в приложении Django. Мы также будем использовать Docker и Docker Compose, чтобы связать все вместе. Наконец, мы рассмотрим, как тестировать задачи Celery с помощью модульных и интеграционных тестов.

Серия Django + Celery:

  1. Асинхронные задачи с Django и Celery (эта статья)
  2. Обработка периодических задач в Django с помощью Celery и Docker
  3. Автоматический повтор невыполненных задач Celery
  4. Работа с транзакциями базы данных Celery и Django

Цели

После завершения этого урока вы сможете:

  1. Интегрировать Celery в приложение Django и создавать задачи
  2. Создавать контейнеры для Django, Celery и Redis с помощью Docker
  3. Запускать процессы в фоновом режиме с отдельным рабочим процессом
  4. Сохранять журналы Celery в файл
  5. Настраивать Flower для мониторинга и управления рабочими местами и рабочими Celery.
  6. Тестировать задачи Celery с помощью модульных и интеграционных тестов

Фоновые задачи

Кроме того, для повышения удобства пользователей длительные процессы следует запускать вне обычного потока HTTP-запросов/ответов, в фоновом процессе.

Примеры:

  1. Запуск моделей машинного обучения
  2. Отправка писем с подтверждением
  3. Скрапинг и просмотр сайтов
  4. Анализ данных
  5. Обработка изображений
  6. Создание отчетов

При создании приложения постарайтесь разграничить задачи, которые должны выполняться во время жизненного цикла запроса/ответа, например CRUD-операции, и задачи, которые должны выполняться в фоновом режиме.

Рабочий процесс

Нашей целью является разработка приложения Django, которое работает в связке с Celery для обработки длительных процессов вне обычного цикла запрос/ответ.

  1. Конечный пользователь запускает новую задачу через POST-запрос на стороне сервера.
  2. В рамках представления задача добавляется в очередь, а идентификатор задачи отправляется обратно на сторону клиента.
  3. Используя AJAX, клиент продолжает опрашивать сервер для проверки статуса задачи, в то время как сама задача выполняется в фоновом режиме.

django and celery queue user flow

Настройка проекта

Склонируйте базовый проект из django-celery, а затем перейдите по тегу v1 в ветку master:

$ git clone https://github.com/testdrivenio/django-celery --branch v1 --single-branch
$ cd django-celery
$ git checkout v1 -b master

Поскольку нам понадобится управлять тремя процессами (Django, Redis, worker), мы используем Docker для упрощения рабочего процесса, соединив их так, чтобы все они могли запускаться из одного окна терминала с помощью одной команды.

В корне проекта создайте образы и запустите контейнеры Docker:

$ docker-compose up -d --build
 

После завершения сборки перейдите по адресу http://localhost:1337:

django project

Убедитесь, что тесты также пройдены:

$ docker-compose exec web python -m pytest

=============================== test session starts ===============================
platform linux -- Python 3.9.5, pytest-6.2.4, py-1.10.0, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, configfile: pytest.ini
plugins: django-4.4.0
collected 1 item

tests/test_tasks.py .                                                       [100%]

================================ 1 passed in 0.63s ================================

Посмотрите на структуру проекта, прежде чем двигаться дальше:

├── .gitignore
├── LICENSE
├── README.md
├── docker-compose.yml
└── project
    ├── Dockerfile
    ├── core
    │   ├── __init__.py
    │   ├── asgi.py
    │   ├── settings.py
    │   ├── urls.py
    │   └── wsgi.py
    ├── entrypoint.sh
    ├── manage.py
    ├── pytest.ini
    ├── requirements.txt
    ├── static
    │   ├── bulma.min.css
    │   ├── jquery-3.4.1.min.js
    │   ├── main.css
    │   └── main.js
    ├── tasks
    │   ├── __init__.py
    │   ├── apps.py
    │   ├── migrations
    │   │   └── __init__.py
    │   ├── templates
    │   │   └── home.html
    │   └── views.py
    └── tests
        ├── __init__.py
        └── test_tasks.py

Хотите узнать, как создать этот проект? Посмотрите статью Dockerizing Django with Postgres, Gunicorn, and Nginx.

Запуск задачи

Настроен обработчик событий в project/static/main.js, который прослушивает нажатие кнопки. При нажатии на сервер отправляется запрос AJAX POST с соответствующим типом задачи: 1, 2 или 3.

$('.button').on('click', function() {
  $.ajax({
    url: '/tasks/',
    data: { type: $(this).data('type') }
    method: 'POST',
  })
  .done((res) => {
    getStatus(res.task_id);
  })
  .fail((err) => {
    console.log(err);
  });
});

На стороне сервера представление уже настроено для обработки запроса в project/tasks/views.py:

@csrf_exempt
def run_task(request):
    if request.POST:
        task_type = request.POST.get("type")
        return JsonResponse({"task_type": task_type}, status=202) 

Наступает самое интересное: подключение Celery!

Настройка Celery

Начните с добавления Celery и Redis в файл project/requirements.txt:

celery==4.4.7
Django==3.2.4
redis==3.5.3

pytest==6.2.4
pytest-django==4.4.0

Celery использует брокер сообщений - RabbitMQ, Redis, или AWS Simple Queue Service (SQS) - для облегчения связи между рабочим Celery и веб-приложением. Сообщения добавляются в брокер, которые затем обрабатываются рабочим(и). После этого результаты добавляются в бэкэнд.

Redis будет использоваться и как брокер, и как бэкенд. Добавьте Redis и Celery worker в файл docker-compose.yml следующим образом:

version: '3.8'

services:
  web:
    build: ./project
    command: python manage.py runserver 0.0.0.0:8000
    volumes:
      - ./project:/usr/src/app/
    ports:
      - 1337:8000
    environment:
      - DEBUG=1
      - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
      - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
      - CELERY_BROKER=redis://redis:6379/0
      - CELERY_BACKEND=redis://redis:6379/0
    depends_on:
      - redis

  celery:
    build: ./project
    command: celery worker --app=core --loglevel=info
    volumes:
      - ./project:/usr/src/app
    environment:
      - DEBUG=1
      - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
      - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
      - CELERY_BROKER=redis://redis:6379/0
      - CELERY_BACKEND=redis://redis:6379/0
    depends_on:
      - web
      - redis

  redis:
    image: redis:6-alpine

Обратите внимание на celery worker --app=core --loglevel=info:

  1. celery workerиспользуется для запуска сельдерея
  2. --app=core runs the core Celery приложение (который мы вскоре определим)
  3. --loglevel=infoустанавливает уровень ведения журнала как info

В модуле настроек проекта добавьте следующее, чтобы указать Celery использовать Redis в качестве брокера и бэкенда:

CELERY_BROKER_URL = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")
CELERY_RESULT_BACKEND = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")
 

Затем создайте новый файл с именем sample_tasks.py в «project/tasks»:

# project/tasks/sample_tasks.py

import time

from celery import shared_task


@shared_task
def create_task(task_type):
    time.sleep(int(task_type) * 10)
    return True

Здесь, используя декоратор shared_task, мы определили новую функцию задачи Celery под названием create_task.

Имейте в виду, что сама задача не будет выполняться из процесса Django; она будет выполнена Celery.

Теперь добавьте файл celery.py в «project/core»:

import os

from celery import Celery


os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings")
app = Celery("core")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks() 

Что здесь происходит?

  1. Сначала мы устанавливаем значение по умолчанию для переменной среды DJANGO_SETTINGS_MODULE, чтобы Celery знал, как найти проект Django.
  2. Затем мы создали новый экземпляр Celery с именем core и присвоили значение переменной app.
  3. Затем мы загрузили значения конфигурации сельдерея из объекта настроек из django.conf. Мы использовали namespace = "CELERY", чтобы предотвратить конфликты с другими настройками Django. Другими словами, все настройки конфигурации для сельдерея должны иметь префикс CELERY_.
  4. Наконец, app.autodiscover_tasks() сообщает Celery искать задачи Celery из приложений, определенных в settings.INSTALLED_APPS.

Обновите project/core/__init__.py, чтобы приложение Celery автоматически импортировалось при запуске Django:

from .celery import app as celery_app


__all__ = ("celery_app",) 

Запуск задачи

Обновите представление, чтобы запустить задачу и ответить на нее с идентификатором:

@csrf_exempt
def run_task(request):
    if request.POST:
        task_type = request.POST.get("type")
        task = create_task.delay(int(task_type))
        return JsonResponse({"task_id": task.id}, status=202)

Не забудьте импортировать задание:

from tasks.sample_tasks import create_task 

Создайте образы и запустите новые контейнеры:

$ docker-compose up -d --build 

Чтобы запустить новую задачу, выполните:

$ curl -F type=0 http://localhost:1337/tasks/ 

Вы должны увидеть что-то вроде:

{
  "task_id": "6f025ed9-09be-4cbb-be10-1dce919797de"
}

Статус задачи

Вернемся к обработчику событий на стороне клиента:

$('.button').on('click', function() {
  $.ajax({
    url: '/tasks/',
    data: { type: $(this).data('type') }
    method: 'POST',
  })
  .done((res) => {
    getStatus(res.task_id);
  })
  .fail((err) => {
    console.log(err);
  });
}); 

Когда ответ возвращается от первоначального AJAX-запроса, мы продолжаем вызывать getStatus() с идентификатором задачи каждую секунду:

function getStatus(taskID) {
  $.ajax({
    url: `/tasks/${taskID}/`,
    method: 'GET'
  })
  .done((res) => {
    const html = `
      <tr>
        <td>${res.task_id}</td>
        <td>${res.task_status}</td>
        <td>${res.task_result}</td>
      </tr>`
    $('#tasks').prepend(html);

    const taskStatus = res.task_status;

    if (taskStatus === 'SUCCESS' || taskStatus === 'FAILURE') return false;
    setTimeout(function() {
      getStatus(res.task_id);
    }, 1000);
  })
  .fail((err) => {
    console.log(err)
  });
} 

Если ответ успешный, в таблицу на DOM добавляется новая строка.

Обновите представление get_status, чтобы вернуть статус:

@csrf_exempt
def get_status(request, task_id):
    task_result = AsyncResult(task_id)
    результат = {
        "task_id": task_id,
        "task_status": task_result.status,
        "task_result": task_result.result
    }
    return JsonResponse(result, status=200) 

Импортируйте AsyncResult:

from celery.result import AsyncResult 

Обновите контейнеры:

$ docker-compose up -d --build 

Выполнение нового задания:

$ curl -F type=1 http://localhost:1337/tasks/ 

Затем возьмите task_id из ответа и вызовите обновленную конечную точку, чтобы просмотреть статус:

$ curl http://localhost:1337/tasks/25278457-0957-4b0b-b1da-2600525f812f/

{
    "task_id": "25278457-0957-4b0b-b1da-2600525f812f",
    "task_status": "SUCCESS",
    "task_result": "true
} 

Протестируйте его также в браузере:

django, celery, docker

Журналы Celery

Обновите службу celery в docker-compose.yml, чтобы журналы Celery сбрасывались в файл журнала:

celery:
  build: ./project
  command: celery worker --app=core --loglevel=info --logfile=logs/celery.log
  volumes:
    - ./project:/usr/src/app
  environment:
    - DEBUG=1
    - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
    - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
    - CELERY_BROKER=redis://redis:6379/0
    - CELERY_BACKEND=redis://redis:6379/0
  depends_on:
    - web
    - redis

Добавьте новый каталог в «project» под названием «logs». Затем добавьте новый файл с именем celery.log в этот вновь созданный каталог.

Обновление:

$ docker-compose up -d --build 

Вы должны увидеть, что файл журнала заполняется локально, поскольку мы установили том:

[2021-06-20 00:00:31,333: INFO/MainProcess] Connected to redis://redis:6379/0
[2021-06-20 00:00:31,343: INFO/MainProcess] mingle: searching for neighbors
[2021-06-20 00:00:32,375: INFO/MainProcess] mingle: all alone
[2021-06-20 00:00:32,392: WARNING/MainProcess]
    /usr/local/lib/python3.9/site-packages/celery/fixups/django.py:205:
    UserWarning: Using settings.DEBUG leads to a memory
    leak, never use this setting in production environments!
    warnings.warn('''Using settings.DEBUG leads to a memory
[2021-06-20 00:00:32,394: INFO/MainProcess] celery@eb0f99f2fad9 ready.

[2021-06-20 00:00:44,488: INFO/MainProcess]
    Received task: tasks.sample_tasks.create_task[570bb712-bf2c-4406-b205-07f2e83e113d]
[2021-06-20 00:00:54,479: INFO/ForkPoolWorker-7]
    Task tasks.sample_tasks.create_task[570bb712-bf2c-4406-b205-07f2e83e113d]
    succeeded in 10.023200300012832s: True

Панель Flower

Flower - это легкий веб-инструмент мониторинга Celery в реальном времени. Вы можете отслеживать текущие задачи, увеличивать или уменьшать пул рабочих, просматривать графики и ряд статистических данных.

Добавьте его в requirements.txt:

celery==4.4.7
Django==3.2.4
flower==0.9.7
redis==3.5.3

pytest==6.2.4
pytest-django==4.4.0 

Затем добавьте новую службу в docker-compose.yml:

dashboard:
  build: ./project
  command:  flower -A core --port=5555 --broker=redis://redis:6379/0
  ports:
    - 5555:5555
  environment:
    - DEBUG=1
    - SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
    - DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
    - CELERY_BROKER=redis://redis:6379/0
    - CELERY_BACKEND=redis://redis:6379/0
  depends_on:
    - web
    - redis
    - celery

Протестируйте это:

$ docker-compose up -d --build 

Перейдите по адресу http://localhost:5555 для просмотра панели. Вы должны увидеть одного работника, готового к работе:

flower dashboard

Выполните еще несколько задач, чтобы полностью протестировать панель:

flower dashboard

Попробуйте добавить еще несколько работников и посмотреть, как это повлияет на ситуацию:

$ docker-compose up -d --build --scale celery=3 

Тесты

Начнем с самого базового теста:

def test_task():
    assert sample_tasks.create_task.run(1)
    assert sample_tasks.create_task.run(2)
    assert sample_tasks.create_task.run(3) 

Добавьте приведенный выше тестовый пример в project/tests/test_tasks.py, а затем добавьте следующий импорт:

from tasks import sample_tasks 

Выполните этот тест индивидуально:

$ docker-compose exec web python -m pytest -k "test_task and not test_home"
 

Запуск должен занять около одной минуты:

=============================== test session starts ===============================
platform linux -- Python 3.9.5, pytest-6.2.4, py-1.10.0, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, configfile: pytest.ini
plugins: django-4.4.0, celery-4.4.7
collected 2 items / 1 deselected / 1 selected

tests/test_tasks.py .                                                       [100%]

=================== 1 passed, 1 deselected in 60.69s (0:01:00) ====================

Стоит отметить, что в приведенных выше утверждениях мы использовали метод .run (а не .delay) для запуска задачи напрямую без рабочего Celery.

Хотите добавить mock в метод .run, чтобы ускорить процесс?

@patch("tasks.sample_tasks.create_task.run")
def test_mock_task(mock_run):
    assert sample_tasks.create_task.run(1)
    sample_tasks.create_task.run.assert_called_once_with(1)

    assert sample_tasks.create_task.run(2)
    assert sample_tasks.create_task.run.call_count == 2

    assert sample_tasks.create_task.run(3)
    assert sample_tasks.create_task.run.call_count == 3 

Импорт:

from unittest.mock import patch 

Тест:

$ docker-compose exec web python -m pytest -k "test_mock_task"
 
$ docker-compose exec web python -m pytest -k "test_mock_task"

=============================== test session starts ===============================
platform linux -- Python 3.9.5, pytest-6.2.4, py-1.10.0, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, configfile: pytest.ini
plugins: django-4.4.0, celery-4.4.7
collected 3 items / 2 deselected / 1 selected

tests/test_tasks.py .                                                       [100%]

========================= 1 passed, 2 deselected in 0.64s =========================

Более быстро!

Как насчет полного интеграционного теста?

def test_task_status(client):
    response = client.post(reverse("run_task"), {"type": 0})
    content = json.loads(response.content)
    task_id = content["task_id"]
    assert response.status_code == 202
    assert task_id
 
    response = client.get(reverse("get_status", args=[task_id]))
    content = json.loads(response.content)
    assert content == {"task_id": task_id, "task_status": "PENDING", "task_result": Нет}
    assert response.status_code == 200

    while content["task_status"] == "PENDING":
        response = client.get(reverse("get_status", args=[task_id]))
        content = json.loads(response.content)
    assert content == {"task_id": task_id, "task_status": "SUCCESS", "task_result": True} 

Не забывайте, что в этом тесте используется тот же брокер и бэкенд, что и при разработке. Возможно, вы захотите создать новое приложение Celery для тестирования:

app = celery.Celery('tests', broker=CELERY_TEST_BROKER, backend=CELERY_TEST_BACKEND) 

Добавьте импорт:

import json 

Убедитесь, что тест пройден.

Заключение

Это было базовое руководство по настройке Celery для выполнения длительных задач в приложении Django. Вы должны позволить очереди обрабатывать любые процессы, которые могут заблокировать или замедлить работу кода, обращенного к пользователю.

Celery также можно использовать для выполнения повторяющихся задач и разбиения сложных, ресурсоемких задач, чтобы распределить вычислительную нагрузку между несколькими машинами для сокращения времени выполнения и нагрузки на машину, обрабатывающую клиентские запросы.

И наконец, если вам интересно, как использовать Websockets (через Django Channels) для проверки статуса задачи Celery вместо использования AJAX-опроса, ознакомьтесь с другими руководствами по этой теме.

 

Серия Django + Celery:

  1. Асинхронные задачи с Django и Celery (эта статья)
  2. Обработка периодических задач в Django с помощью Celery и Docker
  3. Автоматический повтор невыполненных задач Celery
  4. Работа с транзакциями базы данных Celery и Django

Возьмите код из репозитория.

Вернуться на верх