Первые шаги с сельдереем

Celery - это очередь задач с батарейками в комплекте. Она проста в использовании, поэтому вы можете начать работу без изучения всей сложности проблемы, которую она решает. Он разработан на основе лучших практик, чтобы ваш продукт мог масштабироваться и интегрироваться с другими языками, и поставляется с инструментами и поддержкой, необходимыми для запуска такой системы в производство.

В этом руководстве вы узнаете абсолютные основы использования Celery.

Узнайте о;

  • Выбор и установка транспорта сообщений (брокера).

  • Установка Celery и создание вашей первой задачи.

  • Запуск рабочего и вызов задач.

  • Отслеживание задач по мере их перехода через различные состояния и проверка возвращаемых значений.

Поначалу сельдерей может показаться пугающим, но не волнуйтесь - это руководство поможет вам быстро начать работу. Он намеренно упрощен, чтобы не запутать вас в расширенных возможностях. После того, как вы закончите это руководство, вам стоит просмотреть остальную документацию. Например, учебник Следующие шаги продемонстрирует возможности Celery.

Выбор брокера

Celery требует решения для отправки и получения сообщений; обычно это происходит в виде отдельной службы, называемой message broker.

На выбор предлагается несколько вариантов, в том числе:

RabbitMQ

RabbitMQ обладает полным набором функций, стабильностью, долговечностью и простотой установки. Это отличный выбор для производственной среды. Подробная информация об использовании RabbitMQ с Celery:

Если вы используете Ubuntu или Debian, установите RabbitMQ, выполнив эту команду:

$ sudo apt-get install rabbitmq-server

Или, если вы хотите запустить его на Docker, выполните следующее:

$ docker run -d -p 5672:5672 rabbitmq

Когда команда завершится, брокер уже будет работать в фоновом режиме, готовый перемещать сообщения для вас: Starting rabbitmq-server: SUCCESS.

Не волнуйтесь, если вы не используете Ubuntu или Debian, вы можете перейти на этот сайт, чтобы найти такие же простые инструкции по установке для других платформ, включая Microsoft Windows:

Redis

Redis также обладает полным набором функций, но более подвержен потере данных в случае резкого завершения работы или сбоев питания. Подробная информация об использовании Redis:

Использование Redis

Если вы хотите запустить его на Docker, выполните следующее:

$ docker run -d -p 6379:6379 redis

Другие брокеры

Помимо вышеперечисленных, существуют и другие экспериментальные реализации транспорта, включая Amazon SQS.

Полный список см. в Обзор брокера.

Установка сельдерея

Celery находится в индексе пакетов Python (PyPI), поэтому его можно установить с помощью стандартных инструментов Python, таких как pip или easy_install:

$ pip install celery

Приложение

Первое, что вам нужно, это экземпляр Celery. Мы называем его приложение Celery или просто app для краткости. Поскольку этот экземпляр используется как точка входа для всего, что вы хотите делать в Celery, например, создавать задачи и управлять рабочими, другие модули должны иметь возможность импортировать его.

В этом учебнике мы держим все в одном модуле, но для больших проектов вы захотите создать dedicated module.

Давайте создадим файл tasks.py:

from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task
def add(x, y):
    return x + y

Первым аргументом Celery является имя текущего модуля. Это необходимо только для того, чтобы имена могли быть автоматически сгенерированы, когда задачи определены в модуле __main__.

Второй аргумент - это аргумент ключевого слова broker, указывающий URL брокера сообщений, который вы хотите использовать. Здесь используется RabbitMQ (также вариант по умолчанию).

Смотрите Выбор брокера выше для большего выбора - для RabbitMQ вы можете использовать amqp://localhost, или для Redis вы можете использовать redis://localhost.

Вы определили единственную задачу с именем add, возвращающую сумму двух чисел.

Запуск рабочего сервера Celery

Теперь вы можете запустить рабочий, выполнив нашу программу с аргументом worker:

$ celery -A tasks worker --loglevel=INFO

Примечание

См. раздел Устранение неполадок, если рабочий не запускается.

На производстве вы захотите запустить рабочий в фоновом режиме в качестве демона. Для этого вам нужно использовать инструменты, предоставляемые вашей платформой, или что-то вроде supervisord (см. Демонизация для получения дополнительной информации).

Для получения полного списка доступных опций командной строки выполните следующие действия:

$  celery worker --help

Имеется также несколько других команд, а также доступна справка:

$ celery --help

Вызов задания

Для вызова нашей задачи можно использовать метод delay().

Это удобное сокращение метода apply_async(), которое дает больший контроль над выполнением задачи (см. << 1 >>>):

>>> from tasks import add
>>> add.delay(4, 4)

Теперь задание обработано рабочим, который вы запустили ранее. Вы можете убедиться в этом, посмотрев на консольный вывод рабочего.

Вызов задачи возвращает экземпляр AsyncResult. Его можно использовать для проверки состояния задачи, ожидания завершения задачи или получения ее возвращаемого значения (или, если задача не выполнилась, для получения исключения и трассировки).

Результаты не включены по умолчанию. Чтобы выполнять удаленные вызовы процедур или отслеживать результаты выполнения задач в базе данных, вам необходимо настроить Celery на использование бэкенда результатов. Это описано в следующем разделе.

Поддержание результатов

Если вы хотите отслеживать состояния задач, Celery необходимо где-то хранить или отправлять эти состояния. Есть несколько встроенных бэкендов результатов на выбор: SQLAlchemy/Django ORM, MongoDB, Memcached, Redis, RPC (RabbitMQ/AMQP), и – или вы можете определить свой собственный.

В данном примере мы используем бэкенд результатов rpc, который отправляет состояния обратно в виде переходных сообщений. Бэкенд задается через аргумент backend в Celery, (или через параметр result_backend, если вы решили использовать модуль конфигурации):

app = Celery('tasks', backend='rpc://', broker='pyamqp://')

Или если вы хотите использовать Redis в качестве бэкенда результатов, но при этом использовать RabbitMQ в качестве брокера сообщений (популярная комбинация):

app = Celery('tasks', backend='redis://localhost', broker='pyamqp://')

Более подробную информацию о бэкендах результатов можно найти в разделе Бэкенды результатов.

Теперь, когда бэкенд результатов настроен, давайте снова вызовем задачу. На этот раз вы будете удерживать экземпляр AsyncResult, возвращаемый при вызове задачи:

>>> result = add.delay(4, 4)

Метод ready() возвращает, закончила ли задача обработку или нет:

>>> result.ready()
False

Вы можете дождаться завершения результата, но это редко используется, поскольку превращает асинхронный вызов в синхронный:

>>> result.get(timeout=1)
8

В случае, если задача вызвала исключение, get() вызовет исключение повторно, но вы можете отменить это, указав аргумент propagate:

>>> result.get(propagate=False)

Если задача вызвала исключение, вы также можете получить доступ к исходной трассировке:

>>> result.traceback

Предупреждение

Бэкенды используют ресурсы для хранения и передачи результатов. Чтобы гарантировать освобождение ресурсов, вы должны в конечном итоге вызвать get() или forget() на КАЖДОМ экземпляре AsyncResult, возвращенном после вызова задачи.

Полное описание объекта результата см. в celery.result.

Конфигурация

Сельдерей, как бытовой прибор, не требует особой настройки для работы. У него есть вход и выход. Вход должен быть подключен к брокеру, а выход может быть опционально подключен к бэкенду результатов. Однако, если вы внимательно посмотрите на заднюю часть, там есть крышка, на которой находится множество ползунков, циферблатов и кнопок: это и есть конфигурация.

Конфигурация по умолчанию должна быть достаточно хорошей для большинства случаев использования, но есть много опций, которые можно настроить, чтобы Celery работал именно так, как нужно. Чтение о доступных опциях - хорошая идея, чтобы ознакомиться с тем, что можно настроить. Вы можете прочитать об опциях в справке Конфигурация и настройки по умолчанию.

Конфигурация может быть задана непосредственно в приложении или с помощью специального модуля конфигурации. В качестве примера вы можете настроить сериализатор по умолчанию, используемый для сериализации полезной нагрузки задачи, изменив параметр task_serializer:

app.conf.task_serializer = 'json'

Если вы настраиваете много параметров одновременно, вы можете использовать update:

app.conf.update(
    task_serializer='json',
    accept_content=['json'],  # Ignore other content
    result_serializer='json',
    timezone='Europe/Oslo',
    enable_utc=True,
)

Для крупных проектов рекомендуется использовать специальный модуль конфигурации. Не рекомендуется жестко кодировать периодические интервалы между заданиями и варианты маршрутизации заданий. Гораздо лучше хранить их в централизованном месте. Это особенно актуально для библиотек, поскольку позволяет пользователям контролировать поведение своих задач. Централизованная конфигурация также позволит вашему системному администратору вносить простые изменения в случае неполадок в системе.

Вы можете указать своему экземпляру Celery использовать модуль конфигурации, вызвав метод app.config_from_object():

app.config_from_object('celeryconfig')

Этот модуль часто называют «celeryconfig», но вы можете использовать любое имя модуля.

В приведенном выше случае модуль с именем celeryconfig.py должен быть доступен для загрузки из текущего каталога или по пути Python. Это может выглядеть примерно так:

celeryconfig.py:

broker_url = 'pyamqp://'
result_backend = 'rpc://'

task_serializer = 'json'
result_serializer = 'json'
accept_content = ['json']
timezone = 'Europe/Oslo'
enable_utc = True

Чтобы убедиться, что ваш файл конфигурации работает правильно и не содержит синтаксических ошибок, вы можете попробовать импортировать его:

$ python -m celeryconfig

Полный перечень опций конфигурации приведен в разделе Конфигурация и настройки по умолчанию.

Чтобы продемонстрировать возможности конфигурационных файлов, вот как можно направить непослушную задачу в специальную очередь:

celeryconfig.py:

task_routes = {
    'tasks.add': 'low-priority',
}

Или вместо маршрутизации вы можете ограничить задание по скорости, так что только 10 заданий этого типа могут быть обработаны в минуту (10/m):

celeryconfig.py:

task_annotations = {
    'tasks.add': {'rate_limit': '10/m'}
}

Если вы используете RabbitMQ или Redis в качестве брокера, вы также можете направить рабочих на установку нового предела скорости для задачи во время выполнения:

$ celery -A tasks control rate_limit tasks.add 10/m
worker@example.com: OK
    new rate limit set successfully

Смотрите Задачи маршрутизации, чтобы узнать больше о маршрутизации заданий, и настройку task_annotations, чтобы узнать больше об аннотациях, или Руководство по мониторингу и управлению, чтобы узнать больше о командах удаленного управления и о том, как следить за тем, что делают ваши работники.

Куда двигаться дальше

Если вы хотите узнать больше, вам следует перейти к учебнику Next Steps, а после него вы можете прочитать << 1 >>>.

Устранение неполадок

В разделе Часто задаваемые вопросы также есть раздел по устранению неисправностей.

Рабочий не запускается: Ошибка разрешения

  • Если вы используете Debian, Ubuntu или другие дистрибутивы на базе Debian:

    Debian недавно переименовал специальный файл /dev/shm в /run/shm.

    Простым обходным решением является создание символической ссылки:

    # ln -s /run/shm /dev/shm
    
  • Другие:

    Если вы предоставляете любой из аргументов --pidfile, --logfile или --statedb, то вы должны убедиться, что они указывают на файл или каталог, который доступен для записи и чтения пользователю, запускающему рабочий.

Бэкенд результатов не работает или задачи всегда находятся в состоянии PENDING

По умолчанию все задачи имеют значение PENDING, поэтому состояние лучше было бы назвать «неизвестно». Celery не обновляет состояние, когда задача отправляется, и предполагается, что любая задача без истории находится в ожидании (в конце концов, вы знаете id задачи).

  1. Убедитесь, что в задании не включена функция ignore_result.

    Включение этой опции заставит работника пропустить обновление состояний.

  2. Убедитесь, что параметр task_ignore_result не включен.

  3. Убедитесь, что у вас нет старых работников, которые все еще работают.

    Легко случайно запустить несколько рабочих, поэтому убедитесь, что предыдущий рабочий был правильно выключен перед запуском нового.

    Возможно, запущен старый рабочий, который не настроен на бэкенд ожидаемых результатов, и перехватывает задания.

    Аргумент --pidfile может быть установлен на абсолютный путь, чтобы этого не произошло.

  4. Убедитесь, что клиент настроен на правильный бэкенд.

    Если по какой-то причине клиент настроен на использование бэкенда, отличного от рабочего, вы не сможете получить результат. Убедитесь, что бэкенд настроен правильно:

    >>> result = task.delay()
    >>> print(result.backend)
    
Вернуться на верх