Руководство для работников

Запуск рабочего

Вы можете запустить рабочего на переднем плане, выполнив команду:

$ celery -A proj worker -l INFO

Полный список доступных опций командной строки смотрите в worker, или просто сделайте:

$ celery worker --help

Вы можете запустить несколько рабочих на одной машине, но не забудьте дать имя каждому отдельному рабочему, указав имя узла с аргументом --hostname:

$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1@%h
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker2@%h
$ celery -A proj worker --loglevel=INFO --concurrency=10 -n worker3@%h

Аргумент hostname может расширять следующие переменные:

  • %h: Имя хоста, включая доменное имя.

  • %n: Только имя хоста.

  • %d: Только доменное имя.

Если текущее имя хоста - george.example.com, они расширятся до:

Переменная

Шаблон

Результат

%h

worker1@%h

worker1@george.example.com

%n

worker1@%n

worker1@george

%d

worker1@%d

worker1@example.com

Примечание для пользователей supervisor

Знак % должен быть экранирован добавлением второго: %%h.

Остановка работника

Выключение должно осуществляться с помощью сигнала TERM.

Когда инициируется выключение, рабочий завершит все выполняющиеся в данный момент задачи, прежде чем он действительно завершится. Если эти задачи важны, вам следует дождаться их завершения, прежде чем делать что-либо радикальное, например, посылать сигнал KILL.

Если рабочий не завершает работу по истечении определенного времени, застряв в бесконечном цикле или подобном, вы можете использовать сигнал KILL для принудительного завершения работы рабочего: но имейте в виду, что текущие выполняющиеся задания будут потеряны (т.е. если у заданий не установлена опция acks_late).

Также, поскольку процессы не могут отменить сигнал KILL, рабочий не сможет получить своих детей; убедитесь, что это можно сделать вручную. Эта команда обычно помогает:

$ pkill -9 -f 'celery worker'

Если в вашей системе нет команды pkill, вы можете использовать немного более длинную версию:

$ ps auxww | awk '/celery worker/ {print $2}' | xargs kill -9

Перезапуск рабочего

Для перезапуска рабочего необходимо послать сигнал TERM и запустить новый экземпляр. Самый простой способ управления рабочими для разработки - это использование celery multi:

$ celery multi start 1 -A proj -l INFO -c4 --pidfile=/var/run/celery/%n.pid
$ celery multi restart 1 --pidfile=/var/run/celery/%n.pid

Для производственных развертываний следует использовать init-скрипты или систему контроля процессов (см. Демонизация).

Кроме остановки, а затем запуска рабочего для перезапуска, вы также можете перезапустить его с помощью сигнала HUP. Обратите внимание, что рабочий будет отвечать за перезапуск сам, поэтому это чревато проблемами и не рекомендуется в производстве:

$ kill -HUP $pid

Примечание

Перезапуск по HUP работает только в том случае, если рабочий работает в фоновом режиме как демон (у него нет управляющего терминала).

HUP отключен на macOS из-за ограничения на этой платформе.

Сигналы процесса

Основной процесс рабочего переопределяет следующие сигналы:

TERM

Теплое выключение, дождитесь завершения задач.

QUIT

Холодное отключение, прекратить как можно скорее

USR1

Выгрузка трассировки для всех активных потоков.

USR2

Удаленная отладка, см. celery.contrib.rdb.

Переменные в путях к файлам

Аргументы пути к файлу для --logfile, --pidfile и --statedb могут содержать переменные, которые рабочий будет расширять:

Замена имен узлов

  • %p: Полное имя узла.

  • %h: Имя хоста, включая доменное имя.

  • %n: Только имя хоста.

  • %d: Только доменное имя.

  • %i: Индекс процесса пула префорка или 0, если MainProcess.

  • %I: Индекс процесса пула префорка с разделителем.

Например, если текущее имя хоста george@foo.example.com, то они расширятся до:

  • --logfile=%p.log -> george@foo.example.com.log

  • --logfile=%h.log -> foo.example.com.log

  • --logfile=%n.log -> george.log

  • --logfile=%d.log -> example.com.log

Индекс процесса пула Prefork

Спецификаторы индекса процесса prefork pool будут расширяться в другое имя файла в зависимости от процесса, который в конечном итоге должен будет открыть файл.

Это можно использовать для указания одного файла журнала для каждого дочернего процесса.

Обратите внимание, что числа будут оставаться в пределах лимита процесса, даже если процессы завершаются или используются ограничения autoscale/<<<0 >>>/time. То есть, номер - это индекс процесса, а не количество процессов или pid.

  • %i - индекс процесса бассейна или 0, если MainProcess.

    Где -n worker1@example.com -c2 -f %n-%i.log приведет к созданию трех файлов журнала:

    • worker1-0.log (основной процесс)

    • worker1-1.log (процесс пула 1)

    • worker1-2.log (процесс пула 2)

  • %I - индекс процесса пула с разделителем.

    Где -n worker1@example.com -c2 -f %n%I.log приведет к созданию трех файлов журнала:

    • worker1.log (основной процесс)

    • worker1-1.log (процесс пула 1)

    • worker1-2.log (процесс пула 2)

Concurrency

По умолчанию для одновременного выполнения задач используется многопроцессорность, но вы также можете использовать Eventlet. Количество рабочих процессов/потоков может быть изменено с помощью аргумента --concurrency и по умолчанию соответствует количеству CPU, доступных на машине.

Количество процессов (многопроцессорный пул/пул префорков)

Большее количество процессов пула обычно лучше, но есть точка отсечения, когда добавление большего количества процессов пула отрицательно сказывается на производительности. Есть даже некоторые свидетельства в пользу того, что работа нескольких рабочих экземпляров может быть лучше, чем работа одного рабочего. Например, 3 рабочих с 10 процессами пула каждый. Вам нужно поэкспериментировать, чтобы найти оптимальное для вас число, поскольку оно зависит от приложения, рабочей нагрузки, времени выполнения задач и других факторов.

Дистанционное управление

Добавлено в версии 2.0.

поддержка бассейна:

prefork, eventlet, gevent, thread, блокировка:solo (см. примечание)

поддержка брокеров:

amqp, redis

Рабочие имеют возможность удаленного управления с помощью высокоприоритетной очереди широковещательных сообщений. Команды могут быть направлены всем или определенному списку рабочих.

Команды также могут иметь ответы. Клиент может ждать и собирать эти ответы. Поскольку нет центрального органа, который бы знал, сколько работников доступно в кластере, также нет способа оценить, сколько работников могут послать ответ, поэтому клиент имеет настраиваемый таймаут - крайний срок в секундах для получения ответов. По умолчанию этот таймаут равен одной секунде. Если рабочий не отвечает в течение установленного срока, это не обязательно означает, что рабочий не ответил или, что еще хуже, умер, это может быть вызвано задержкой в сети или тем, что рабочий медленно обрабатывает команды, поэтому настройте тайм-аут соответствующим образом.

В дополнение к тайм-аутам клиент может указать максимальное количество ответов, которое следует ожидать. Если указано место назначения, этот предел устанавливается на количество хостов назначения.

Примечание

Пул solo поддерживает команды удаленного управления, но любая выполняющаяся задача будет блокировать любую ожидающую команду управления, поэтому он имеет ограниченное применение, если рабочий очень занят. В этом случае необходимо увеличить таймаут ожидания ответов в клиенте.

Функция broadcast()

Это клиентская функция, используемая для отправки команд рабочим устройствам. Некоторые команды дистанционного управления также имеют интерфейсы более высокого уровня, использующие broadcast() в фоновом режиме, например rate_limit(), и ping().

Отправка команды rate_limit и аргументов ключевых слов:

>>> app.control.broadcast('rate_limit',
...                          arguments={'task_name': 'myapp.mytask',
...                                     'rate_limit': '200/m'})

Это позволит отправить команду асинхронно, не дожидаясь ответа. Чтобы запросить ответ, необходимо использовать аргумент reply:

>>> app.control.broadcast('rate_limit', {
...     'task_name': 'myapp.mytask', 'rate_limit': '200/m'}, reply=True)
[{'worker1.example.com': 'New rate limit set successfully'},
 {'worker2.example.com': 'New rate limit set successfully'},
 {'worker3.example.com': 'New rate limit set successfully'}]

С помощью аргумента destination вы можете указать список рабочих, которые будут получать команду:

>>> app.control.broadcast('rate_limit', {
...     'task_name': 'myapp.mytask',
...     'rate_limit': '200/m'}, reply=True,
...                             destination=['worker1@example.com'])
[{'worker1.example.com': 'New rate limit set successfully'}]

Конечно, использовать интерфейс более высокого уровня для установки ограничений скорости гораздо удобнее, но есть команды, которые можно запросить только с помощью broadcast().

Команды

revoke: Отмена заданий

поддержка бассейна:

все, завершение поддерживается только prefork

поддержка брокеров:

amqp, redis

команда:

celery -A proj control revoke <task_id>

Все рабочие узлы хранят память идентификаторов отозванных задач, либо в памяти, либо на диске (см. Постоянные отзывы).

Когда рабочий получает запрос на отзыв, он пропускает выполнение задания, но не завершает уже выполняющееся задание, если не установлена опция terminate.

Примечание

Опция terminate - это последнее средство для администраторов, когда задача застряла. Она не предназначена для завершения задачи, она предназначена для завершения процесса, выполняющего задачу, и этот процесс может уже начать обработку другой задачи в момент отправки сигнала, поэтому по этой причине вы никогда не должны вызывать ее программно.

Если установлено значение terminate, дочерний рабочий процесс, обрабатывающий задание, будет завершен. По умолчанию посылается сигнал TERM, но вы можете указать его с помощью аргумента signal. Signal может быть заглавным именем любого сигнала, определенного в модуле signal в стандартной библиотеке Python.

Завершение задания также отзывает его.

Пример.

>>> result.revoke()

>>> AsyncResult(id).revoke()

>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed')

>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed',
...                    terminate=True)

>>> app.control.revoke('d9078da5-9915-40a0-bfa1-392c7bde42ed',
...                    terminate=True, signal='SIGKILL')

Отмена нескольких заданий

Добавлено в версии 3.1.

Метод revoke также принимает аргумент списка, в котором он отзывает сразу несколько задач.

Пример.

>>> app.control.revoke([
...    '7993b0aa-1f0b-4780-9af0-c47c0858b3f2',
...    'f565793e-b041-4b2b-9ca4-dca22762a55d',
...    'd9d35e03-2997-42d0-a13e-64a66b88a618',
])

Метод GroupResult.revoke использует это преимущество, начиная с версии 3.1.

Постоянные отзывы

Отзыв заданий происходит путем отправки широковещательного сообщения всем рабочим, после чего рабочие хранят список отозванных заданий в памяти. Когда рабочий запускается, он синхронизирует отозванные задания с другими рабочими в кластере.

Список отозванных заданий находится в памяти, поэтому если все рабочие перезапустятся, список отозванных идентификаторов также исчезнет. Если вы хотите сохранить этот список между перезапусками, вам нужно указать файл, в котором он будет храниться, используя аргумент –statedb к celery worker:

$ celery -A proj worker -l INFO --statedb=/var/run/celery/worker.state

или если вы используете celery multi, вы хотите создать один файл для каждого рабочего экземпляра, поэтому используйте формат %n для расширения имени текущего узла:

celery multi start 2 -l INFO --statedb=/var/run/celery/%n.state

См. также Переменные в путях к файлам

Обратите внимание, что команды удаленного управления должны работать, чтобы отзыв работал. Команды удаленного управления на данный момент поддерживаются только RabbitMQ (amqp) и Redis.

Ограничения по времени

Добавлено в версии 2.0.

поддержка бассейна:

префорк/гевент

Одна задача потенциально может выполняться бесконечно, если у вас много задач, ожидающих какого-то события, которое никогда не произойдет, вы заблокируете рабочего от обработки новых задач на неопределенное время. Лучшим способом защиты от такого сценария является включение временных ограничений.

Ограничение по времени (–time-limit) - это максимальное количество секунд, в течение которых задача может выполняться до того, как выполняющий ее процесс будет завершен и заменен новым процессом. Вы также можете включить мягкий временной лимит (–soft-time-limit), который вызывает исключение, которое задача может перехватить и устранить до того, как жесткий временной лимит уничтожит ее:

from myapp import app
from celery.exceptions import SoftTimeLimitExceeded

@app.task
def mytask():
    try:
        do_work()
    except SoftTimeLimitExceeded:
        clean_up_in_a_hurry()

Ограничения по времени можно также установить с помощью параметров task_time_limit / << 1 >>>.

Примечание

Ограничения по времени в настоящее время не работают на платформах, которые не поддерживают сигнал SIGUSR1.

Изменение временных ограничений во время выполнения

Добавлено в версии 2.3.

поддержка брокеров:

amqp, redis

Существует команда дистанционного управления, которая позволяет изменять как мягкие, так и жесткие временные ограничения для задачи - она называется time_limit.

Пример изменения лимита времени для задачи tasks.crawl_the_web на мягкий лимит времени в одну минуту и жесткий лимит времени в две минуты:

>>> app.control.time_limit('tasks.crawl_the_web',
                           soft=60, hard=120, reply=True)
[{'worker1.example.com': {'ok': 'time limits set successfully'}}]

Затронуты будут только те задания, которые начали выполняться после изменения лимита времени.

Пределы тарифов

Изменение ограничений скорости во время выполнения

Пример изменения ограничения скорости для задачи myapp.mytask на выполнение не более 200 задач этого типа каждую минуту:

>>> app.control.rate_limit('myapp.mytask', '200/m')

Выше не указано место назначения, поэтому запрос на изменение затронет все рабочие экземпляры в кластере. Если вы хотите повлиять только на определенный список рабочих, вы можете включить аргумент destination:

>>> app.control.rate_limit('myapp.mytask', '200/m',
...            destination=['celery@worker1.example.com'])

Предупреждение

Это не повлияет на работников с включенной настройкой worker_disable_rate_limits.

Максимальное количество задач на одно детское учреждение

Добавлено в версии 2.0.

поддержка бассейна:

префорк

С помощью этой опции вы можете настроить максимальное количество задач, которые может выполнить рабочий, прежде чем его заменит новый процесс.

Это полезно, если у вас есть утечки памяти, которые вы не можете контролировать, например, из расширений C с закрытым исходным кодом.

Опция может быть установлена с помощью рабочего аргумента --max-tasks-per-child или с помощью настройки worker_max_tasks_per_child.

Максимальное количество памяти на одну детскую установку

Добавлено в версии 4.0.

поддержка бассейна:

префорк

С помощью этой опции вы можете настроить максимальный объем резидентной памяти, который рабочий может выполнить до того, как его заменит новый процесс.

Это полезно, если у вас есть утечки памяти, которые вы не можете контролировать, например, из расширений C с закрытым исходным кодом.

Опция может быть установлена с помощью рабочего аргумента --max-memory-per-child или с помощью настройки worker_max_memory_per_child.

Автомасштабирование

Добавлено в версии 2.2.

поддержка бассейна:

префорк, гевент

Компонент autoscaler используется для динамического изменения размера пула в зависимости от нагрузки:

  • Автоскалер добавляет дополнительные процессы пула, когда есть работа,
    • и начинает удалять процессы при низкой нагрузке.

Она включается опцией --autoscale, для которой нужны два числа: максимальное и минимальное количество процессов пула:

--autoscale=AUTOSCALE
     Enable autoscaling by providing
     max_concurrency,min_concurrency.  Example:
       --autoscale=10,3 (always keep 3 processes, but grow to
      10 if necessary).

Вы также можете определить свои собственные правила для автоскалера, создав подкласс Autoscaler. Некоторые идеи для метрик включают среднюю нагрузку или объем доступной памяти. Вы можете задать пользовательский автоскалер с помощью параметра worker_autoscaler.

Очереди

Рабочий экземпляр может потреблять из любого количества очередей. По умолчанию он будет потреблять из всех очередей, определенных в параметре task_queues (который, если не указан, возвращается к очереди по умолчанию с именем celery).

Вы можете указать, из каких очередей потреблять данные при запуске, указав список очередей через запятую в опции -Q:

$ celery -A proj worker -l INFO -Q foo,bar,baz

Если имя очереди определено в task_queues, то будет использоваться эта конфигурация, но если она не определена в списке очередей, то Celery автоматически сгенерирует для вас новую очередь (в зависимости от опции task_create_missing_queues).

Вы также можете указать рабочему начать и прекратить потребление из очереди во время выполнения, используя команды удаленного управления add_consumer и cancel_consumer.

Очереди: Добавление потребителей

Управляющая команда add_consumer говорит одному или нескольким рабочим начать потребление из очереди. Эта операция является идемпотентной.

Чтобы сказать всем рабочим в кластере начать потребление из очереди с именем «foo», вы можете использовать программу celery control:

$ celery -A proj control add_consumer foo
-> worker1.local: OK
    started consuming from u'foo'

Если вы хотите указать конкретного рабочего, вы можете использовать аргумент --destination:

$ celery -A proj control add_consumer foo -d celery@worker1.local

То же самое можно сделать динамически, используя метод app.control.add_consumer():

>>> app.control.add_consumer('foo', reply=True)
[{u'worker1.local': {u'ok': u"already consuming from u'foo'"}}]

>>> app.control.add_consumer('foo', reply=True,
...                          destination=['worker1@example.com'])
[{u'worker1.local': {u'ok': u"already consuming from u'foo'"}}]

До сих пор мы показывали только примеры с использованием автоматических очередей, если вам нужно больше контроля, вы можете также указать обмен, ключ маршрутизации и даже другие параметры:

>>> app.control.add_consumer(
...     queue='baz',
...     exchange='ex',
...     exchange_type='topic',
...     routing_key='media.*',
...     options={
...         'queue_durable': False,
...         'exchange_durable': False,
...     },
...     reply=True,
...     destination=['w1@example.com', 'w2@example.com'])

Очереди: Отмена потребителей

Вы можете отменить потребителя по имени очереди с помощью управляющей команды cancel_consumer.

Чтобы заставить всех рабочих в кластере отменить потребление из очереди, вы можете использовать программу celery control:

$ celery -A proj control cancel_consumer foo

Аргумент --destination может быть использован для указания рабочего или списка рабочих для выполнения команды:

$ celery -A proj control cancel_consumer foo -d celery@worker1.local

Вы также можете отменить потребителей программно, используя метод app.control.cancel_consumer():

>>> app.control.cancel_consumer('foo', reply=True)
[{u'worker1.local': {u'ok': u"no longer consuming from u'foo'"}}]

Очереди: Список активных очередей

Вы можете получить список очередей, из которых потребляет рабочий, с помощью управляющей команды active_queues:

$ celery -A proj inspect active_queues
[...]

Как и все другие команды дистанционного управления, эта также поддерживает аргумент --destination, используемый для указания рабочих, которые должны ответить на запрос:

$ celery -A proj inspect active_queues -d celery@worker1.local
[...]

Это также можно сделать программно, используя метод active_queues():

>>> app.control.inspect().active_queues()
[...]

>>> app.control.inspect(['worker1.local']).active_queues()
[...]

Инспектирование работников

app.control.inspect позволяет вам проверять работающих рабочих. Он использует команды удаленного управления под капотом.

Вы также можете использовать команду celery для проверки рабочих, и она поддерживает те же команды, что и интерфейс app.control.

>>> # Inspect all nodes.
>>> i = app.control.inspect()

>>> # Specify multiple nodes to inspect.
>>> i = app.control.inspect(['worker1.example.com',
                            'worker2.example.com'])

>>> # Specify a single node to inspect.
>>> i = app.control.inspect('worker1.example.com')

Сброс зарегистрированных заданий

Список задач, зарегистрированных в рабочем, можно получить с помощью команды registered():

>>> i.registered()
[{'worker1.example.com': ['tasks.add',
                          'tasks.sleeptask']}]

Сброс текущих выполняемых задач

Вы можете получить список активных задач, используя active():

>>> i.active()
[{'worker1.example.com':
    [{'name': 'tasks.sleeptask',
      'id': '32666e9b-809c-41fa-8e93-5ae0c80afbbf',
      'args': '(8,)',
      'kwargs': '{}'}]}]

Сброс запланированных (ETA) задач

Вы можете получить список задач, ожидающих планирования, используя scheduled():

>>> i.scheduled()
[{'worker1.example.com':
    [{'eta': '2010-06-07 09:07:52', 'priority': 0,
      'request': {
        'name': 'tasks.sleeptask',
        'id': '1a7980ea-8b19-413e-91d2-0b74f3844c4d',
        'args': '[1]',
        'kwargs': '{}'}},
     {'eta': '2010-06-07 09:07:53', 'priority': 0,
      'request': {
        'name': 'tasks.sleeptask',
        'id': '49661b9a-aa22-4120-94b7-9ee8031d219d',
        'args': '[2]',
        'kwargs': '{}'}}]}]

Примечание

Это задачи с аргументом ETA/countdown, а не периодические задачи.

Сброс зарезервированных задач

Зарезервированные задания - это задания, которые были получены, но все еще ожидают выполнения.

Вы можете получить их список, используя reserved():

>>> i.reserved()
[{'worker1.example.com':
    [{'name': 'tasks.sleeptask',
      'id': '32666e9b-809c-41fa-8e93-5ae0c80afbbf',
      'args': '(8,)',
      'kwargs': '{}'}]}]

Статистика

Команда пульта inspect stats (или stats()) выдаст вам длинный список полезных (или не очень полезных) статистических данных о работнике:

$ celery -A proj inspect stats

Для получения подробной информации о выводе обратитесь к справочной документации по stats().

Дополнительные команды

Дистанционное отключение

Эта команда приведет к изящному удаленному выключению рабочего:

>>> app.control.broadcast('shutdown') # shutdown all workers
>>> app.control.broadcast('shutdown', destination='worker1@example.com')

Пинг

Эта команда запрашивает ping у живых рабочих. Рабочие отвечают строкой „pong“, и на этом все. Команда будет использовать стандартный таймаут в одну секунду для ответов, если вы не укажете пользовательский таймаут:

>>> app.control.ping(timeout=0.5)
[{'worker1.example.com': 'pong'},
 {'worker2.example.com': 'pong'},
 {'worker3.example.com': 'pong'}]

ping() также поддерживает аргумент destination, так что вы можете указать рабочих для пинга:

>>> ping(['worker2.example.com', 'worker3.example.com'])
[{'worker2.example.com': 'pong'},
 {'worker3.example.com': 'pong'}]

Включить/выключить события

Вы можете включать/выключать события с помощью команд enable_events, disable_events. Это полезно для временного мониторинга рабочего, используя celery events/celerymon.

>>> app.control.enable_events()
>>> app.control.disable_events()

Написание собственных команд дистанционного управления

Существует два типа команд пульта дистанционного управления:

  • Команда инспекции

    Не имеет побочных эффектов, обычно просто возвращает какое-либо значение, найденное в рабочем, например, список текущих зарегистрированных задач, список активных задач и т.д.

  • Команда управления

    Выполняет побочные эффекты, например, добавляет новую очередь для потребления.

Команды дистанционного управления регистрируются в панели управления и принимают один аргумент: текущий экземпляр ControlDispatch. При необходимости вы можете получить доступ к активному Consumer.

Вот пример команды управления, которая увеличивает счетчик предварительной выборки задач:

from celery.worker.control import control_command

@control_command(
    args=[('n', int)],
    signature='[N=1]',  # <- used for help on the command-line.
)
def increase_prefetch_count(state, n=1):
    state.consumer.qos.increment_eventually(n)
    return {'ok': 'prefetch count incremented'}

Убедитесь, что вы добавили этот код в модуль, который импортируется рабочим: это может быть тот же модуль, в котором определено ваше приложение Celery, или вы можете добавить модуль в настройку imports.

Перезапустите рабочий, чтобы управляющая команда была зарегистрирована, и теперь вы можете вызвать свою команду с помощью утилиты celery control:

$ celery -A proj control increase_prefetch_count 3

Вы также можете добавить действия в программу celery inspect, например, считывание текущего количества префетчей:

from celery.worker.control import inspect_command

@inspect_command()
def current_prefetch_count(state):
    return {'prefetch_count': state.consumer.qos.value}

После перезапуска рабочего вы можете запросить это значение с помощью программы celery inspect:

$ celery -A proj inspect current_prefetch_count
Вернуться на верх