Асинхронные задачи с Django и Celery
Если долго выполняющийся процесс является частью рабочего процесса вашего приложения, вместо того чтобы блокировать ответ, вы должны обрабатывать его в фоновом режиме, вне обычного потока запросов/ответов.
Возможно, ваше веб-приложение требует от пользователей отправки миниатюры (которую, вероятно, придется изменить в размере) и подтверждения электронной почты при регистрации. Если бы ваше приложение обрабатывало изображение и отправляло подтверждение по электронной почте непосредственно в обработчике запроса, то конечному пользователю пришлось бы без необходимости ждать, пока они оба закончат обработку, прежде чем страница загрузится или обновится. Вместо этого вы захотите передать эти процессы в очередь задач и позволить отдельному рабочему процессу заняться ими, чтобы вы могли немедленно отправить ответ клиенту. Конечный пользователь может заниматься другими делами на стороне клиента, пока идет обработка. Ваше приложение также может свободно отвечать на запросы других пользователей и клиентов.
Для достижения этой цели мы проведем вас через процесс установки и настройки Celery и Redis для обработки долгоиграющих процессов в приложении Django. Мы также будем использовать Docker и Docker Compose, чтобы связать все вместе. Наконец, мы рассмотрим, как тестировать задачи Celery с помощью модульных и интеграционных тестов.
Серия Django + Celery:
Цели
После завершения этого урока вы сможете:
- Интегрировать Celery в приложение Django и создавать задачи
- Создавать контейнеры для Django, Celery и Redis с помощью Docker
- Запускать процессы в фоновом режиме с отдельным рабочим процессом
- Сохранять журналы Celery в файл
- Настраивать Flower для мониторинга и управления рабочими местами и рабочими Celery.
- Тестировать задачи Celery с помощью модульных и интеграционных тестов
Фоновые задачи
Кроме того, для повышения удобства пользователей длительные процессы следует запускать вне обычного потока HTTP-запросов/ответов, в фоновом процессе.
Примеры:
- Запуск моделей машинного обучения
- Отправка писем с подтверждением
- Скрапинг и просмотр сайтов
- Анализ данных
- Обработка изображений
- Создание отчетов
При создании приложения постарайтесь разграничить задачи, которые должны выполняться во время жизненного цикла запроса/ответа, например CRUD-операции, и задачи, которые должны выполняться в фоновом режиме.
Рабочий процесс
Нашей целью является разработка приложения Django, которое работает в связке с Celery для обработки длительных процессов вне обычного цикла запрос/ответ.
- Конечный пользователь запускает новую задачу через POST-запрос на стороне сервера.
- В рамках представления задача добавляется в очередь, а идентификатор задачи отправляется обратно на сторону клиента.
- Используя AJAX, клиент продолжает опрашивать сервер для проверки статуса задачи, в то время как сама задача выполняется в фоновом режиме.
Настройка проекта
Склонируйте базовый проект из django-celery, а затем перейдите по тегу v1 в ветку master:
$ git clone https://github.com/testdrivenio/django-celery --branch v1 --single-branch
$ cd django-celery
$ git checkout v1 -b master
Поскольку нам понадобится управлять тремя процессами (Django, Redis, worker), мы используем Docker для упрощения рабочего процесса, соединив их так, чтобы все они могли запускаться из одного окна терминала с помощью одной команды.
В корне проекта создайте образы и запустите контейнеры Docker:
$ docker-compose up -d --build
После завершения сборки перейдите по адресу http://localhost:1337
:
Убедитесь, что тесты также пройдены:
$ docker-compose exec web python -m pytest
=============================== test session starts ===============================
platform linux -- Python 3.9.5, pytest-6.2.4, py-1.10.0, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, configfile: pytest.ini
plugins: django-4.4.0
collected 1 item
tests/test_tasks.py . [100%]
================================ 1 passed in 0.63s ================================
Посмотрите на структуру проекта, прежде чем двигаться дальше:
├── .gitignore
├── LICENSE
├── README.md
├── docker-compose.yml
└── project
├── Dockerfile
├── core
│ ├── __init__.py
│ ├── asgi.py
│ ├── settings.py
│ ├── urls.py
│ └── wsgi.py
├── entrypoint.sh
├── manage.py
├── pytest.ini
├── requirements.txt
├── static
│ ├── bulma.min.css
│ ├── jquery-3.4.1.min.js
│ ├── main.css
│ └── main.js
├── tasks
│ ├── __init__.py
│ ├── apps.py
│ ├── migrations
│ │ └── __init__.py
│ ├── templates
│ │ └── home.html
│ └── views.py
└── tests
├── __init__.py
└── test_tasks.py
Хотите узнать, как создать этот проект? Посмотрите статью Dockerizing Django with Postgres, Gunicorn, and Nginx.
Запуск задачи
Настроен обработчик событий в project/static/main.js, который прослушивает нажатие кнопки. При нажатии на сервер отправляется запрос AJAX POST с соответствующим типом задачи: 1, 2 или 3.
$('.button').on('click', function() {
$.ajax({
url: '/tasks/',
data: { type: $(this).data('type') }
method: 'POST',
})
.done((res) => {
getStatus(res.task_id);
})
.fail((err) => {
console.log(err);
});
});
На стороне сервера представление уже настроено для обработки запроса в project/tasks/views.py:
@csrf_exempt
def run_task(request):
if request.POST:
task_type = request.POST.get("type")
return JsonResponse({"task_type": task_type}, status=202)
Наступает самое интересное: подключение Celery!
Настройка Celery
Начните с добавления Celery и Redis в файл project/requirements.txt:
celery==4.4.7
Django==3.2.4
redis==3.5.3
pytest==6.2.4
pytest-django==4.4.0
Celery использует брокер сообщений - RabbitMQ, Redis, или AWS Simple Queue Service (SQS) - для облегчения связи между рабочим Celery и веб-приложением. Сообщения добавляются в брокер, которые затем обрабатываются рабочим(и). После этого результаты добавляются в бэкэнд.
Redis будет использоваться и как брокер, и как бэкенд. Добавьте Redis и Celery worker в файл docker-compose.yml следующим образом:
version: '3.8'
services:
web:
build: ./project
command: python manage.py runserver 0.0.0.0:8000
volumes:
- ./project:/usr/src/app/
ports:
- 1337:8000
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- redis
celery:
build: ./project
command: celery worker --app=core --loglevel=info
volumes:
- ./project:/usr/src/app
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- web
- redis
redis:
image: redis:6-alpine
Обратите внимание на celery worker --app=core --loglevel=info
:
celery worker
используется для запуска сельдерея--app=core
runs thecore
Celery приложение (который мы вскоре определим)--loglevel=info
устанавливает уровень ведения журнала какinfo
В модуле настроек проекта добавьте следующее, чтобы указать Celery использовать Redis в качестве брокера и бэкенда:
CELERY_BROKER_URL = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")
CELERY_RESULT_BACKEND = os.environ.get("CELERY_BROKER", "redis://redis:6379/0")
Затем создайте новый файл с именем sample_tasks.py в «project/tasks»:
# project/tasks/sample_tasks.py
import time
from celery import shared_task
@shared_task
def create_task(task_type):
time.sleep(int(task_type) * 10)
return True
Здесь, используя декоратор shared_task, мы определили новую функцию задачи Celery под названием create_task
.
Имейте в виду, что сама задача не будет выполняться из процесса Django; она будет выполнена Celery.
Теперь добавьте файл celery.py в «project/core»:
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "core.settings")
app = Celery("core")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
Что здесь происходит?
- Сначала мы устанавливаем значение по умолчанию для переменной среды
DJANGO_SETTINGS_MODULE
, чтобы Celery знал, как найти проект Django. - Затем мы создали новый экземпляр Celery с именем core и присвоили значение переменной app.
- Затем мы загрузили значения конфигурации сельдерея из объекта настроек из
django.conf
. Мы использовалиnamespace = "CELERY"
, чтобы предотвратить конфликты с другими настройками Django. Другими словами, все настройки конфигурации для сельдерея должны иметь префиксCELERY_
. - Наконец,
app.autodiscover_tasks()
сообщает Celery искать задачи Celery из приложений, определенных вsettings.INSTALLED_APPS
.
Обновите project/core/__init__.py, чтобы приложение Celery автоматически импортировалось при запуске Django:
from .celery import app as celery_app
__all__ = ("celery_app",)
Запуск задачи
Обновите представление, чтобы запустить задачу и ответить на нее с идентификатором:
@csrf_exempt
def run_task(request):
if request.POST:
task_type = request.POST.get("type")
task = create_task.delay(int(task_type))
return JsonResponse({"task_id": task.id}, status=202)
Не забудьте импортировать задание:
from tasks.sample_tasks import create_task
Создайте образы и запустите новые контейнеры:
$ docker-compose up -d --build
Чтобы запустить новую задачу, выполните:
$ curl -F type=0 http://localhost:1337/tasks/
Вы должны увидеть что-то вроде:
{
"task_id": "6f025ed9-09be-4cbb-be10-1dce919797de"
}
Статус задачи
Вернемся к обработчику событий на стороне клиента:
$('.button').on('click', function() {
$.ajax({
url: '/tasks/',
data: { type: $(this).data('type') }
method: 'POST',
})
.done((res) => {
getStatus(res.task_id);
})
.fail((err) => {
console.log(err);
});
});
Когда ответ возвращается от первоначального AJAX-запроса, мы продолжаем вызывать getStatus()
с идентификатором задачи каждую секунду:
function getStatus(taskID) {
$.ajax({
url: `/tasks/${taskID}/`,
method: 'GET'
})
.done((res) => {
const html = `
<tr>
<td>${res.task_id}</td>
<td>${res.task_status}</td>
<td>${res.task_result}</td>
</tr>`
$('#tasks').prepend(html);
const taskStatus = res.task_status;
if (taskStatus === 'SUCCESS' || taskStatus === 'FAILURE') return false;
setTimeout(function() {
getStatus(res.task_id);
}, 1000);
})
.fail((err) => {
console.log(err)
});
}
Если ответ успешный, в таблицу на DOM добавляется новая строка.
Обновите представление get_status
, чтобы вернуть статус:
@csrf_exempt
def get_status(request, task_id):
task_result = AsyncResult(task_id)
результат = {
"task_id": task_id,
"task_status": task_result.status,
"task_result": task_result.result
}
return JsonResponse(result, status=200)
Импортируйте AsyncResult:
from celery.result import AsyncResult
Обновите контейнеры:
$ docker-compose up -d --build
Выполнение нового задания:
$ curl -F type=1 http://localhost:1337/tasks/
Затем возьмите task_id
из ответа и вызовите обновленную конечную точку, чтобы просмотреть статус:
$ curl http://localhost:1337/tasks/25278457-0957-4b0b-b1da-2600525f812f/
{
"task_id": "25278457-0957-4b0b-b1da-2600525f812f",
"task_status": "SUCCESS",
"task_result": "true
}
Протестируйте его также в браузере:
Журналы Celery
Обновите службу celery
в docker-compose.yml, чтобы журналы Celery сбрасывались в файл журнала:
celery:
build: ./project
command: celery worker --app=core --loglevel=info --logfile=logs/celery.log
volumes:
- ./project:/usr/src/app
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- web
- redis
Добавьте новый каталог в «project» под названием «logs». Затем добавьте новый файл с именем celery.log в этот вновь созданный каталог.
Обновление:
$ docker-compose up -d --build
Вы должны увидеть, что файл журнала заполняется локально, поскольку мы установили том:
[2021-06-20 00:00:31,333: INFO/MainProcess] Connected to redis://redis:6379/0
[2021-06-20 00:00:31,343: INFO/MainProcess] mingle: searching for neighbors
[2021-06-20 00:00:32,375: INFO/MainProcess] mingle: all alone
[2021-06-20 00:00:32,392: WARNING/MainProcess]
/usr/local/lib/python3.9/site-packages/celery/fixups/django.py:205:
UserWarning: Using settings.DEBUG leads to a memory
leak, never use this setting in production environments!
warnings.warn('''Using settings.DEBUG leads to a memory
[2021-06-20 00:00:32,394: INFO/MainProcess] celery@eb0f99f2fad9 ready.
[2021-06-20 00:00:44,488: INFO/MainProcess]
Received task: tasks.sample_tasks.create_task[570bb712-bf2c-4406-b205-07f2e83e113d]
[2021-06-20 00:00:54,479: INFO/ForkPoolWorker-7]
Task tasks.sample_tasks.create_task[570bb712-bf2c-4406-b205-07f2e83e113d]
succeeded in 10.023200300012832s: True
Панель Flower
Flower - это легкий веб-инструмент мониторинга Celery в реальном времени. Вы можете отслеживать текущие задачи, увеличивать или уменьшать пул рабочих, просматривать графики и ряд статистических данных.
Добавьте его в requirements.txt:
celery==4.4.7
Django==3.2.4
flower==0.9.7
redis==3.5.3
pytest==6.2.4
pytest-django==4.4.0
Затем добавьте новую службу в docker-compose.yml:
dashboard:
build: ./project
command: flower -A core --port=5555 --broker=redis://redis:6379/0
ports:
- 5555:5555
environment:
- DEBUG=1
- SECRET_KEY=dbaa1_i7%*3r9-=z-+_mz4r-!qeed@(-a_r(g@k8jo8y3r27%m
- DJANGO_ALLOWED_HOSTS=localhost 127.0.0.1 [::1]
- CELERY_BROKER=redis://redis:6379/0
- CELERY_BACKEND=redis://redis:6379/0
depends_on:
- web
- redis
- celery
Протестируйте это:
$ docker-compose up -d --build
Перейдите по адресу http://localhost:5555 для просмотра панели. Вы должны увидеть одного работника, готового к работе:
Выполните еще несколько задач, чтобы полностью протестировать панель:
Попробуйте добавить еще несколько работников и посмотреть, как это повлияет на ситуацию:
$ docker-compose up -d --build --scale celery=3
Тесты
Начнем с самого базового теста:
def test_task():
assert sample_tasks.create_task.run(1)
assert sample_tasks.create_task.run(2)
assert sample_tasks.create_task.run(3)
Добавьте приведенный выше тестовый пример в project/tests/test_tasks.py, а затем добавьте следующий импорт:
from tasks import sample_tasks
Выполните этот тест индивидуально:
$ docker-compose exec web python -m pytest -k "test_task and not test_home"
Запуск должен занять около одной минуты:
=============================== test session starts ===============================
platform linux -- Python 3.9.5, pytest-6.2.4, py-1.10.0, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, configfile: pytest.ini
plugins: django-4.4.0, celery-4.4.7
collected 2 items / 1 deselected / 1 selected
tests/test_tasks.py . [100%]
=================== 1 passed, 1 deselected in 60.69s (0:01:00) ====================
Стоит отметить, что в приведенных выше утверждениях мы использовали метод .run
(а не .delay
) для запуска задачи напрямую без рабочего Celery.
Хотите добавить mock в метод .run, чтобы ускорить процесс?
@patch("tasks.sample_tasks.create_task.run")
def test_mock_task(mock_run):
assert sample_tasks.create_task.run(1)
sample_tasks.create_task.run.assert_called_once_with(1)
assert sample_tasks.create_task.run(2)
assert sample_tasks.create_task.run.call_count == 2
assert sample_tasks.create_task.run(3)
assert sample_tasks.create_task.run.call_count == 3
Импорт:
from unittest.mock import patch
Тест:
$ docker-compose exec web python -m pytest -k "test_mock_task"
$ docker-compose exec web python -m pytest -k "test_mock_task"
=============================== test session starts ===============================
platform linux -- Python 3.9.5, pytest-6.2.4, py-1.10.0, pluggy-0.13.1
django: settings: core.settings (from ini)
rootdir: /usr/src/app, configfile: pytest.ini
plugins: django-4.4.0, celery-4.4.7
collected 3 items / 2 deselected / 1 selected
tests/test_tasks.py . [100%]
========================= 1 passed, 2 deselected in 0.64s =========================
Более быстро!
Как насчет полного интеграционного теста?
def test_task_status(client):
response = client.post(reverse("run_task"), {"type": 0})
content = json.loads(response.content)
task_id = content["task_id"]
assert response.status_code == 202
assert task_id
response = client.get(reverse("get_status", args=[task_id]))
content = json.loads(response.content)
assert content == {"task_id": task_id, "task_status": "PENDING", "task_result": Нет}
assert response.status_code == 200
while content["task_status"] == "PENDING":
response = client.get(reverse("get_status", args=[task_id]))
content = json.loads(response.content)
assert content == {"task_id": task_id, "task_status": "SUCCESS", "task_result": True}
Не забывайте, что в этом тесте используется тот же брокер и бэкенд, что и при разработке. Возможно, вы захотите создать новое приложение Celery для тестирования:
app = celery.Celery('tests', broker=CELERY_TEST_BROKER, backend=CELERY_TEST_BACKEND)
Добавьте импорт:
import json
Убедитесь, что тест пройден.
Заключение
Это было базовое руководство по настройке Celery для выполнения длительных задач в приложении Django. Вы должны позволить очереди обрабатывать любые процессы, которые могут заблокировать или замедлить работу кода, обращенного к пользователю.
Celery также можно использовать для выполнения повторяющихся задач и разбиения сложных, ресурсоемких задач, чтобы распределить вычислительную нагрузку между несколькими машинами для сокращения времени выполнения и нагрузки на машину, обрабатывающую клиентские запросы.
И наконец, если вам интересно, как использовать Websockets (через Django Channels) для проверки статуса задачи Celery вместо использования AJAX-опроса, ознакомьтесь с другими руководствами по этой теме.
Серия Django + Celery:
Возьмите код из репозитория.
Вернуться на верх