Следующие шаги

Руководство Первые шаги с сельдереем намеренно минимально. В этом руководстве я покажу, что предлагает Celery, более подробно, включая то, как добавить поддержку Celery для вашего приложения и библиотеки.

В этом документе описаны не все возможности и лучшие практики Celery, поэтому рекомендуем вам также прочитать User Guide.

Использование Celery в вашем приложении

Наш проект

Планировка проекта:

proj/__init__.py
    /celery.py
    /tasks.py

proj/celery.py

В этом модуле вы создали наш экземпляр Celery (иногда называемый app). Чтобы использовать Celery в своем проекте, вы просто импортируете этот экземпляр.

  • Аргумент broker указывает URL брокера, который необходимо использовать.

    Дополнительную информацию см. в разделе Выбор брокера.

  • Аргумент backend задает используемый бэкенд результата.

    Он используется для отслеживания состояния задачи и результатов. Хотя результаты по умолчанию отключены, я использую здесь бэкенд результатов RPC, потому что позже я продемонстрирую, как работает получение результатов. Возможно, вы захотите использовать другой бэкенд для своего приложения. Все они имеют различные сильные и слабые стороны. Если результаты вам не нужны, лучше их отключить. Результаты также можно отключить для отдельных задач, установив опцию @task(ignore_result=True).

    Дополнительную информацию см. в разделе Поддержание результатов.

  • Аргумент include - это список модулей, которые нужно импортировать при запуске рабочего. Вам нужно добавить сюда наш модуль задач, чтобы рабочий смог найти наши задачи.

proj/tasks.py

Запуск рабочего

Программа celery может быть использована для запуска рабочего (необходимо запустить рабочий в директории над proj):

$ celery -A proj worker -l INFO

Когда рабочий запустится, вы увидите баннер и несколько сообщений:

--------------- celery@halcyon.local v4.0 (latentcall)
--- ***** -----
-- ******* ---- [Configuration]
- *** --- * --- . broker:      amqp://guest@localhost:5672//
- ** ---------- . app:         __main__:0x1012d8590
- ** ---------- . concurrency: 8 (processes)
- ** ---------- . events:      OFF (enable -E to monitor this worker)
- ** ----------
- *** --- * --- [Queues]
-- ******* ---- . celery:      exchange:celery(direct) binding:celery
--- ***** -----

[2012-06-08 16:23:51,078: WARNING/MainProcess] celery@halcyon.local has started.

broker - это URL, который вы указали в аргументе broker в нашем модуле celery. Вы также можете указать другого брокера в командной строке, используя опцию -b.

Concurrency - это количество рабочих процессов prefork, используемых для одновременной обработки ваших задач. Когда все они заняты работой, новые задачи будут вынуждены ждать завершения одной из них, прежде чем она сможет быть обработана.

Число параллелизма по умолчанию - это число CPU на данной машине (включая ядра). Вы можете указать пользовательское число, используя опцию celery worker -c. Рекомендуемого значения нет, так как оптимальное число зависит от ряда факторов, но если ваши задачи в основном связаны с вводом-выводом, то вы можете попробовать увеличить его. Эксперименты показали, что добавление более чем вдвое большего числа CPU редко бывает эффективным и, скорее всего, приведет к снижению производительности.

Помимо пула prefork по умолчанию, Celery также поддерживает использование Eventlet, Gevent и работу в одном потоке (см. Конкурентность (Параллелизм)).

Events - это опция, которая заставляет Celery посылать сообщения мониторинга (события) для действий, происходящих в рабочем. Они могут быть использованы программами мониторинга, такими как celery events, и Flower – монитором Celery в реальном времени, о котором вы можете прочитать в Monitoring and Management guide.

Queues - это список очередей, из которых рабочий будет потреблять задания. Рабочему можно указать потреблять из нескольких очередей одновременно, и это используется для маршрутизации сообщений к определенным рабочим в качестве средства для обеспечения качества обслуживания, разделения задач и приоритетов, все это описано в Routing Guide.

Вы можете получить полный список аргументов командной строки, передав флаг --help:

$ celery worker --help

Более подробно эти опции описаны в разделе Workers Guide.

Остановка работника

Для остановки рабочего просто нажмите Control-c. Список сигналов, поддерживаемых рабочим, подробно описан в Workers Guide.

На заднем плане

На производстве вы захотите запустить рабочий в фоновом режиме, что подробно описано в daemonization tutorial.

Сценарии демонизации используют команду celery multi для запуска одного или нескольких рабочих в фоновом режиме:

$ celery multi start w1 -A proj -l INFO
celery multi v4.0.0 (latentcall)
> Starting nodes...
    > w1.halcyon.local: OK

Вы также можете перезапустить его:

$ celery  multi restart w1 -A proj -l INFO
celery multi v4.0.0 (latentcall)
> Stopping nodes...
    > w1.halcyon.local: TERM -> 64024
> Waiting for 1 node.....
    > w1.halcyon.local: OK
> Restarting node w1.halcyon.local: OK
celery multi v4.0.0 (latentcall)
> Stopping nodes...
    > w1.halcyon.local: TERM -> 64052

или остановить его:

$ celery multi stop w1 -A proj -l INFO

Команда stop является асинхронной, поэтому она не будет ждать выключения рабочего. Вместо нее лучше использовать команду stopwait, которая гарантирует, что все выполняющиеся в данный момент задачи будут завершены перед выходом:

$ celery multi stopwait w1 -A proj -l INFO

Примечание

celery multi не хранит информацию о рабочих, поэтому при перезапуске необходимо использовать те же аргументы командной строки. При остановке необходимо использовать только те же аргументы pidfile и logfile.

По умолчанию он создает файлы pid и log в текущем каталоге. Для защиты от запуска нескольких рабочих поверх друг друга рекомендуется поместить их в специальный каталог:

$ mkdir -p /var/run/celery
$ mkdir -p /var/log/celery
$ celery multi start w1 -A proj -l INFO --pidfile=/var/run/celery/%n.pid \
                                        --logfile=/var/log/celery/%n%I.log

С помощью команды multi вы можете запустить несколько рабочих, и есть мощный синтаксис командной строки для указания аргументов для разных рабочих, например:

$ celery multi start 10 -A proj -l INFO -Q:1-3 images,video -Q:4,5 data \
    -Q default -L:4,5 debug

Дополнительные примеры смотрите в модуле multi в справочнике API.

Об аргументе --app

Аргумент --app указывает экземпляр приложения Celery, который будет использоваться, в форме module.path:attribute.

Но он также поддерживает форму быстрого поиска. Если указано только имя пакета, он попытается найти экземпляр приложения в следующем порядке:

С --app=proj:

  1. атрибут с именем proj.app, или

  2. атрибут с именем proj.celery, или

  3. любой атрибут в модуле proj, где значение является приложением Celery, или

Если ни один из них не найден, он попробует подмодуль с именем proj.celery:

  1. атрибут с именем proj.celery.app, или

  2. атрибут с именем proj.celery.celery, или

  3. Любой атрибут в модуле proj.celery, где значение является приложением Celery.

Эта схема имитирует практику, используемую в документации - то есть proj:app для одного содержащегося модуля, и proj.celery:app для более крупных проектов.

Вызов задач

Вы можете вызвать задачу с помощью метода delay():

>>> from proj.tasks import add

>>> add.delay(2, 2)

Этот метод на самом деле является сокращением звездообразного аргумента для другого метода под названием apply_async():

>>> add.apply_async((2, 2))

Последний позволяет указать параметры выполнения, такие как время выполнения (обратный отсчет), очередь, в которую он должен быть отправлен, и так далее:

>>> add.apply_async((2, 2), queue='lopri', countdown=10)

В приведенном выше примере задание будет отправлено в очередь с именем lopri, и задание будет выполнено, самое раннее, через 10 секунд после отправки сообщения.

При непосредственном применении задания оно будет выполнено в текущем процессе, так что сообщение не будет отправлено:

>>> add(2, 2)
4

Эти три метода - delay(), apply_async() и применение (__call__), составляют API вызова Celery, который также используется для подписей.

Более подробный обзор API вызова можно найти в разделе Calling User Guide.

Каждому вызову задачи присваивается уникальный идентификатор (UUID) - это и есть идентификатор задачи.

Методы delay и apply_async возвращают экземпляр AsyncResult, который можно использовать для отслеживания состояния выполнения задач. Но для этого необходимо включить result backend, чтобы состояние можно было где-то хранить.

Результаты отключены по умолчанию, потому что не существует бэкенда результатов, который подходит для всех приложений; чтобы выбрать его, необходимо рассмотреть недостатки каждого отдельного бэкенда. Для многих задач сохранение возвращаемого значения даже не очень полезно, так что это разумное значение по умолчанию. Также обратите внимание, что бэкенды результатов не используются для мониторинга задач и рабочих: для этого Celery использует специальные сообщения о событиях (см. Руководство по мониторингу и управлению).

Если у вас настроен бэкенд результатов, вы можете получить возвращаемое значение задачи:

>>> res = add.delay(2, 2)
>>> res.get(timeout=1)
4

Вы можете найти id задачи, посмотрев на атрибут id:

>>> res.id
d6b3aea2-fb9b-4ebc-8da4-848818db9114

Вы также можете просмотреть исключение и traceback, если задача вызвала исключение, на самом деле result.get() будет распространять любые ошибки по умолчанию:

>>> res = add.delay(2, '2')
>>> res.get(timeout=1)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "celery/result.py", line 221, in get
    return self.backend.wait_for_pending(
  File "celery/backends/asynchronous.py", line 195, in wait_for_pending
    return result.maybe_throw(callback=callback, propagate=propagate)
  File "celery/result.py", line 333, in maybe_throw
    self.throw(value, self._to_remote_traceback(tb))
  File "celery/result.py", line 326, in throw
    self.on_ready.throw(*args, **kwargs)
  File "vine/promises.py", line 244, in throw
    reraise(type(exc), exc, tb)
  File "vine/five.py", line 195, in reraise
    raise value
TypeError: unsupported operand type(s) for +: 'int' and 'str'

Если вы не хотите, чтобы ошибки распространялись, вы можете отключить это, передав propagate:

>>> res.get(propagate=False)
TypeError("unsupported operand type(s) for +: 'int' and 'str'")

В этом случае вместо него будет возвращен экземпляр исключения - поэтому для проверки успешности или неуспешности задачи вам придется использовать соответствующие методы на экземпляре результата:

>>> res.failed()
True

>>> res.successful()
False

Как же он узнает, провалилась задача или нет? Он может узнать это, посмотрев на состояние задачи:

>>> res.state
'FAILURE'

Задача может находиться только в одном состоянии, но может проходить через несколько состояний. Этапы типичной задачи могут быть следующими:

PENDING -> STARTED -> SUCCESS

Запущенное состояние - это особое состояние, которое записывается только в том случае, если включен параметр task_track_started, или если для задачи установлен параметр @task(track_started=True).

Состояние ожидания на самом деле не является записанным состоянием, а скорее состоянием по умолчанию для любого неизвестного идентификатора задачи: это видно из данного примера:

>>> from proj.celery import app

>>> res = app.AsyncResult('this-id-does-not-exist')
>>> res.state
'PENDING'

Если задание повторяется, этапы могут стать еще более сложными. Для примера, для задачи, которая повторяется два раза, этапы будут следующими:

PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS

Чтобы узнать больше о состояниях задач, вам следует обратиться к разделу Государства в руководстве пользователя задач.

Вызов задач подробно описан в Calling Guide.

Canvas: Проектирование рабочих потоков

Вы только что узнали, как вызвать задачу с помощью метода tasks delay, и часто это все, что вам нужно. Но иногда вам может понадобиться передать сигнатуру вызова задачи другому процессу или в качестве аргумента другой функции, для чего Celery использует нечто, называемое подписи.

Сигнатура оборачивает аргументы и параметры выполнения одного вызова задачи таким образом, что их можно передавать функциям или даже сериализовать и передавать по проводам.

Вы можете создать сигнатуру для задачи add, используя аргументы (2, 2), и обратный отсчет 10 секунд следующим образом:

>>> add.signature((2, 2), countdown=10)
tasks.add(2, 2)

Существует также короткий путь с использованием аргументов звезды:

>>> add.s(2, 2)
tasks.add(2, 2)

И снова этот вызывающий API…

Экземпляры сигнатур также поддерживают API вызова, то есть имеют методы delay и << 1 >>>.

Но есть разница в том, что в сигнатуре уже может быть указана сигнатура аргумента. Задача add принимает два аргумента, поэтому сигнатура, указывающая два аргумента, будет полной сигнатурой:

>>> s1 = add.s(2, 2)
>>> res = s1.delay()
>>> res.get()
4

Но вы также можете делать неполные подписи для создания того, что мы называем частицами:

# incomplete partial: add(?, 2)
>>> s2 = add.s(2)

s2 теперь является неполной сигнатурой, которой для полноты нужен еще один аргумент, и это может быть решено при вызове сигнатуры:

# resolves the partial: add(8, 2)
>>> res = s2.delay(8)
>>> res.get()
10

Здесь вы добавили аргумент 8, который был добавлен к существующему аргументу 2, образуя полную сигнатуру add(8, 2).

Аргументы ключевых слов также могут быть добавлены позже; тогда они объединяются с любыми существующими аргументами ключевых слов, но новые аргументы имеют приоритет:

>>> s3 = add.s(2, 2, debug=True)
>>> s3.delay(debug=False)   # debug is now False.

Как уже было сказано, подписи поддерживают вызывающий API: это означает, что

  • sig.apply_async(args=(), kwargs={}, **options)

    Вызывает сигнатуру с необязательными частичными аргументами и частичными аргументами ключевых слов. Также поддерживает опции частичного выполнения.

  • sig.delay(*args, **kwargs)

    Версия звездного аргумента apply_async. Любые аргументы будут добавлены к аргументам в сигнатуре, а ключевое слово arguments будет объединено с любыми существующими ключами.

Итак, все это кажется очень полезным, но что вы можете сделать с ними на самом деле? Чтобы разобраться с этим, я должен представить примитивы холста…

Примитивы

Эти примитивы сами по себе являются сигнатурными объектами, поэтому их можно комбинировать любым способом для составления сложных рабочих потоков.

Примечание

Эти примеры получают результаты, поэтому, чтобы опробовать их, необходимо настроить бэкенд результатов. В приведенном выше примере проекта это уже сделано (см. аргумент backend в Celery).

Давайте рассмотрим несколько примеров:

Группы

group вызывает список задач параллельно, и возвращает специальный экземпляр результата, который позволяет просматривать результаты как группу и извлекать возвращаемые значения по порядку.

>>> from celery import group
>>> from proj.tasks import add

>>> group(add.s(i, i) for i in range(10))().get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
  • Частичная группа

>>> g = group(add.s(i) for i in range(10))
>>> g(10).get()
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]

Цепи

Задачи могут быть связаны между собой так, чтобы после возвращения одной задачи вызывалась другая:

>>> from celery import chain
>>> from proj.tasks import add, mul

# (4 + 4) * 8
>>> chain(add.s(4, 4) | mul.s(8))().get()
64

или неполной цепи:

>>> # (? + 4) * 8
>>> g = chain(add.s(4) | mul.s(8))
>>> g(4).get()
64

Цепочки также могут быть записаны следующим образом:

>>> (add.s(4, 4) | mul.s(8))().get()
64

Аккорды

Аккорд - это группа с обратным вызовом:

>>> from celery import chord
>>> from proj.tasks import add, xsum

>>> chord((add.s(i, i) for i in range(10)), xsum.s())().get()
90

Группа, прикованная к другой задаче, автоматически преобразуется в аккорд:

>>> (group(add.s(i, i) for i in range(10)) | xsum.s())().get()
90

Поскольку все эти примитивы относятся к типу сигнатуры, их можно комбинировать практически как угодно, например:

>>> upload_document.s(file) | group(apply_filter.s() for filter in filters)

Обязательно прочитайте больше о рабочих потоках в руководстве пользователя Canvas.

Маршрутизация

Celery поддерживает все средства маршрутизации, предоставляемые AMQP, но он также поддерживает простую маршрутизацию, при которой сообщения отправляются в именованные очереди.

Настройка task_routes позволяет маршрутизировать задачи по имени и держать все в одном месте:

app.conf.update(
    task_routes = {
        'proj.tasks.add': {'queue': 'hipri'},
    },
)

Вы также можете указать очередь во время выполнения с помощью аргумента queue к apply_async:

>>> from proj.tasks import add
>>> add.apply_async((2, 2), queue='hipri')

Затем вы можете заставить рабочего потреблять из этой очереди, указав опцию celery worker -Q:

$ celery -A proj worker -Q hipri

Вы можете указать несколько очередей, используя список, разделенный запятыми. Например, вы можете заставить рабочего потреблять данные из очереди по умолчанию и очереди hipri, где очередь по умолчанию названа celery по историческим причинам:

$ celery -A proj worker -Q hipri,celery

Порядок очередей не имеет значения, так как работник будет придавать очередям одинаковый вес.

Чтобы узнать больше о маршрутизации, включая использование всех возможностей маршрутизации AMQP, смотрите Routing Guide.

Дистанционное управление

Если вы используете RabbitMQ (AMQP), Redis или Qpid в качестве брокера, то вы можете контролировать и проверять рабочего во время выполнения.

Например, вы можете увидеть, над какими заданиями в настоящее время работает сотрудник:

$ celery -A proj inspect active

Это реализовано с помощью широковещательного обмена сообщениями, поэтому все команды удаленного управления получают все рабочие в кластере.

Вы также можете указать одного или нескольких рабочих, которые будут действовать по запросу, используя опцию --destination. Это список имен рабочих хостов, разделенных запятыми:

$ celery -A proj inspect active --destination=celery@example.com

Если место назначения не указано, то каждый работник будет действовать и отвечать на запрос.

Команда celery inspect содержит команды, которые ничего не меняют в рабочем; она только возвращает информацию и статистику о том, что происходит внутри рабочего. Список команд инспекции, которые вы можете выполнить:

$ celery -A proj inspect --help

Затем есть команда celery control, которая содержит команды, которые фактически изменяют вещи в рабочем во время выполнения:

$ celery -A proj control --help

Например, вы можете заставить рабочих включать сообщения о событиях (используются для мониторинга заданий и рабочих):

$ celery -A proj control enable_events

Когда события включены, вы можете запустить дампер событий, чтобы посмотреть, что делают рабочие:

$ celery -A proj events --dump

или вы можете запустить интерфейс curses:

$ celery -A proj events

когда вы закончите мониторинг, вы можете снова отключить события:

$ celery -A proj control disable_events

Команда celery status также использует команды удаленного управления и показывает список online рабочих в кластере:

$ celery -A proj status

Подробнее о команде celery и мониторинге вы можете прочитать в Monitoring Guide.

Часовой пояс

Все времена и даты, внутри сайта и в сообщениях, используют часовой пояс UTC.

Когда рабочий получает сообщение, например, с установленным обратным отсчетом, он конвертирует время UTC в местное время. Если вы хотите использовать часовой пояс, отличный от системного, то вы должны настроить его с помощью параметра timezone:

app.conf.timezone = 'Europe/London'

Оптимизация

Конфигурация по умолчанию не оптимизирована для пропускной способности. По умолчанию она пытается идти по среднему пути между большим количеством коротких задач и меньшим количеством длинных задач, что является компромиссом между пропускной способностью и справедливым планированием.

Если у вас есть строгие требования к справедливому планированию или вы хотите оптимизировать пропускную способность, то вам следует прочитать Optimizing Guide.

Если вы используете RabbitMQ, вы можете установить модуль librabbitmq, AMQP-клиент, реализованный на языке C:

$ pip install librabbitmq

Что делать дальше?

Теперь, когда вы прочитали этот документ, вам следует перейти к User Guide.

Есть также API reference, если вам так хочется.

Вернуться на верх