Фоновые задачи с помощью Celery

Если в вашем приложении есть долго выполняющаяся задача, например, обработка загруженных данных или отправка электронной почты, вы не хотите ждать ее завершения во время запроса. Вместо этого используйте очередь задач для отправки необходимых данных другому процессу, который будет выполнять задачу в фоновом режиме, пока запрос возвращается немедленно.

Celery - это мощная очередь задач, которую можно использовать как для простых фоновых задач, так и для сложных многоэтапных программ и расписаний. Это руководство покажет вам, как настроить Celery с помощью Flask. Прочитайте руководство First Steps with Celery по Celery, чтобы узнать, как использовать сам Celery.

Репозиторий Flask содержит an example на основе информации на этой странице, где также показано, как использовать JavaScript для отправки заданий и опроса прогресса и результатов.

Установите

Установите Celery из PyPI, например, с помощью pip:

$ pip install celery

Интеграция Celery с Flask

Вы можете использовать Celery без какой-либо интеграции с Flask, но удобно настроить его через конфиг Flask и позволить задачам обращаться к приложению Flask.

Celery использует идеи, схожие с Flask, с объектом приложения Celery, который имеет задачи конфигурации и регистров. Создавая приложение Flask, используйте следующий код для создания и настройки приложения Celery.

from celery import Celery, Task

def celery_init_app(app: Flask) -> Celery:
    class FlaskTask(Task):
        def __call__(self, *args: object, **kwargs: object) -> object:
            with app.app_context():
                return self.run(*args, **kwargs)

    celery_app = Celery(app.name, task_cls=FlaskTask)
    celery_app.config_from_object(app.config["CELERY"])
    celery_app.set_default()
    app.extensions["celery"] = celery_app
    return celery_app

Это создает и возвращает объект приложения Celery. Celery configuration берется из ключа CELERY в конфигурации Flask. Приложение Celery устанавливается по умолчанию, чтобы оно отображалось при каждом запросе. Подкласс Task автоматически запускает функции задачи с активным контекстом приложения Flask, чтобы были доступны такие сервисы, как соединения с базой данных.

Вот базовый example.py, который настраивает Celery на использование Redis для связи. Мы включаем бэкенд результатов, но по умолчанию игнорируем результаты. Это позволяет нам хранить результаты только для задач, в которых нам важен результат.

from flask import Flask

app = Flask(__name__)
app.config.from_mapping(
    CELERY=dict(
        broker_url="redis://localhost",
        result_backend="redis://localhost",
        task_ignore_result=True,
    ),
)
celery_app = celery_init_app(app)

Наведите на него команду celery worker, и она найдет объект celery_app.

$ celery -A example worker --loglevel INFO

Вы также можете выполнить команду celery beat для запуска задач по расписанию. Более подробную информацию об определении расписаний см. в документации Celery.

$ celery -A example beat --loglevel INFO

Фабрика по производству аппликаций

При использовании шаблона фабрики приложений Flask вызовите функцию celery_init_app внутри фабрики. Она устанавливает app.extensions["celery"] в объект Celery app, который можно использовать для получения Celery app из Flask app, возвращаемого фабрикой.

def create_app() -> Flask:
    app = Flask(__name__)
    app.config.from_mapping(
        CELERY=dict(
            broker_url="redis://localhost",
            result_backend="redis://localhost",
            task_ignore_result=True,
        ),
    )
    app.config.from_prefixed_env()
    celery_init_app(app)
    return app

Чтобы использовать команды celery, Celery нужен объект app, но он больше не доступен напрямую. Создайте файл make_celery.py, который вызывает фабрику приложений Flask и получает приложение Celery из возвращаемого приложения Flask.

from example import create_app

flask_app = create_app()
celery_app = flask_app.extensions["celery"]

Направьте команду celery на этот файл.

$ celery -A make_celery worker --loglevel INFO
$ celery -A make_celery beat --loglevel INFO

Определение задач

Использование @celery_app.task для украшения функций задач требует доступа к объекту celery_app, который недоступен при использовании паттерна фабрики. Это также означает, что декорированные задачи привязаны к конкретным экземплярам приложений Flask и Celery, что может стать проблемой во время тестирования, если вы измените конфигурацию теста.

Вместо этого используйте декоратор Celery @shared_task. Он создает объекты задач, которые будут обращаться к любому «текущему приложению», что аналогично концепции чертежей и контекста приложения во Flask. Вот почему мы назвали celery_app.set_default() выше.

Вот пример задачи, которая складывает два числа и возвращает результат.

from celery import shared_task

@shared_task(ignore_result=False)
def add_together(a: int, b: int) -> int:
    return a + b

Ранее мы настроили Celery на игнорирование результатов задачи по умолчанию. Поскольку мы хотим знать возвращаемое значение этой задачи, мы установили ignore_result=False. С другой стороны, задача, которой не нужен результат, например, отправка электронного письма, не будет устанавливать это значение.

Вызов задач

Декорированная функция становится объектом задачи с методами для ее вызова в фоновом режиме. Самый простой способ - использовать метод delay(*args, **kwargs). Дополнительные методы см. в документации Celery.

Для выполнения задания должен быть запущен рабочий Celery. Запуск рабочего показан в предыдущих разделах.

from flask import request

@app.post("/add")
def start_add() -> dict[str, object]:
    a = request.form.get("a", type=int)
    b = request.form.get("b", type=int)
    result = add_together.delay(a, b)
    return {"result_id": result.id}

Маршрут не получает результат задачи немедленно. Это нарушило бы цель, заблокировав ответ. Вместо этого мы возвращаем идентификатор результата запущенной задачи, который мы можем использовать позже для получения результата.

Получение результатов

Чтобы получить результат задачи, которую мы запустили выше, мы добавим еще один маршрут, который примет идентификатор результата, который мы вернули ранее. Мы возвращаем, завершена ли задача (готова ли она), успешно ли она завершилась, и каково было возвращаемое значение (или ошибка), если она завершилась.

from celery.result import AsyncResult

@app.get("/result/<id>")
def task_result(id: str) -> dict[str, object]:
    result = AsyncResult(id)
    return {
        "ready": result.ready(),
        "successful": result.successful(),
        "value": result.result if result.ready() else None,
    }

Теперь вы можете запустить задачу с помощью первого маршрута, а затем опросить результат с помощью второго маршрута. Это позволяет не блокировать рабочие запросы Flask в ожидании завершения задач.

Репозиторий Flask содержит an example, использующий JavaScript для отправки заданий и опроса о ходе выполнения и результатах.

Передача данных в задачи

Приведенная выше задача «add» принимала в качестве аргументов два целых числа. Чтобы передать аргументы задачам, Celery должен сериализовать их в формат, который он может передать другим процессам. Поэтому передавать сложные объекты не рекомендуется. Например, невозможно передать объект модели SQLAlchemy, поскольку этот объект, вероятно, не сериализуется и привязан к сессии, которая его запрашивала.

Передайте минимальное количество данных, необходимых для получения или воссоздания любых сложных данных в рамках задачи. Рассмотрим задачу, которая запускается, когда вошедший в систему пользователь запрашивает архив своих данных. Запрос Flask знает зарегистрированного пользователя и запрашивает объект пользователя из базы данных. Он получил его путем запроса базы данных по заданному идентификатору, поэтому задача может сделать то же самое. Передайте идентификатор пользователя, а не объект user.

@shared_task
def generate_user_archive(user_id: str) -> None:
    user = db.session.get(User, user_id)
    ...

generate_user_archive.delay(current_user.id)
Вернуться на верх