Задачи маршрутизации

Примечание

Альтернативные концепции маршрутизации, такие как topic и fanout, доступны не для всех транспортов, пожалуйста, обратитесь к transport comparison table.

Основы

Автоматическая маршрутизация

Самый простой способ маршрутизации - использовать настройку task_create_missing_queues (по умолчанию включена).

При включении этой настройки именованная очередь, которая еще не определена в task_queues, будет создана автоматически. Это облегчает выполнение простых задач маршрутизации.

Допустим, у вас есть два сервера, x и y, которые выполняют обычные задачи, и один сервер z, который выполняет только задачи, связанные с кормами. Вы можете использовать следующую конфигурацию:

task_routes = {'feed.tasks.import_feed': {'queue': 'feeds'}}

При включении этого маршрута задачи, связанные с импортными фидами, будут направляться в очередь фиды, а все остальные задачи будут направляться в очередь по умолчанию (названную сельдерей по историческим причинам).

В качестве альтернативы можно использовать сопоставление шаблонов glob или даже регулярные выражения для сопоставления всех задач в пространстве имен feed.tasks:

app.conf.task_routes = {'feed.tasks.*': {'queue': 'feeds'}}

Если порядок соответствия шаблонов важен, то вместо этого следует указать маршрутизатор в формате items:

task_routes = ([
    ('feed.tasks.*', {'queue': 'feeds'}),
    ('web.tasks.*', {'queue': 'web'}),
    (re.compile(r'(video|image)\.tasks\..*'), {'queue': 'media'}),
],)

Примечание

Настройка task_routes может быть либо словарем, либо списком объектов маршрутизатора, поэтому в данном случае нам нужно указать настройку в виде кортежа, содержащего список.

После установки маршрутизатора вы можете запустить сервер z для обработки только очереди фидов следующим образом:

user@z:/$ celery -A proj worker -Q feeds

Вы можете указать столько очередей, сколько хотите, поэтому вы можете сделать этот серверный процесс очередью по умолчанию:

user@z:/$ celery -A proj worker -Q feeds,celery

Изменение имени очереди по умолчанию

Вы можете изменить имя очереди по умолчанию с помощью следующей конфигурации:

app.conf.task_default_queue = 'default'

Как определяются очереди

Смысл этой функции в том, чтобы скрыть сложный протокол AMQP для пользователей с базовыми потребностями. Тем не менее, вам может быть интересно, как объявляются эти очереди.

Очередь с именем видео будет создана со следующими настройками:

{'exchange': 'video',
 'exchange_type': 'direct',
 'routing_key': 'video'}

НеAMQP-бэкенды, такие как Redis или SQS, не поддерживают обмены, поэтому они требуют, чтобы обмен имел то же имя, что и очередь. Использование этой конструкции гарантирует, что она будет работать и для них.

Ручная маршрутизация

Скажем, у вас есть два сервера, x и y, которые обрабатывают обычные задачи, и один сервер z, который обрабатывает только задачи, связанные с кормами, вы можете использовать эту конфигурацию:

from kombu import Queue

app.conf.task_default_queue = 'default'
app.conf.task_queues = (
    Queue('default',    routing_key='task.#'),
    Queue('feed_tasks', routing_key='feed.#'),
)
app.conf.task_default_exchange = 'tasks'
app.conf.task_default_exchange_type = 'topic'
app.conf.task_default_routing_key = 'task.default'

task_queues - это список экземпляров Queue. Если вы не задали значения обмена или типа обмена для ключа, они будут взяты из параметров task_default_exchange и << 3 >>>.

Чтобы направить задачу в очередь feed_tasks, вы можете добавить запись в настройку task_routes:

task_routes = {
        'feeds.tasks.import_feed': {
            'queue': 'feed_tasks',
            'routing_key': 'feed.import',
        },
}

Вы также можете переопределить это, используя аргумент routing_key Task.apply_async(), или send_task():

>>> from feeds.tasks import import_feed
>>> import_feed.apply_async(args=['http://cnn.com/rss'],
...                         queue='feed_tasks',
...                         routing_key='feed.import')

Чтобы заставить сервер z потреблять только из очереди фидов, вы можете запустить его с опцией celery worker -Q:

user@z:/$ celery -A proj worker -Q feed_tasks --hostname=z@%h

Серверы x и y должны быть настроены на потребление из очереди по умолчанию:

user@x:/$ celery -A proj worker -Q default --hostname=x@%h
user@y:/$ celery -A proj worker -Q default --hostname=y@%h

Если хотите, вы можете даже поручить своему работнику по переработке кормов выполнять и обычные задачи, возможно, в периоды, когда работы много:

user@z:/$ celery -A proj worker -Q feed_tasks,default --hostname=z@%h

Если у вас есть другая очередь, но на другой бирже, которую вы хотите добавить, просто укажите пользовательский обмен и тип обмена:

from kombu import Exchange, Queue

app.conf.task_queues = (
    Queue('feed_tasks',    routing_key='feed.#'),
    Queue('regular_tasks', routing_key='task.#'),
    Queue('image_tasks',   exchange=Exchange('mediatasks', type='direct'),
                           routing_key='image.compress'),
)

Если вы запутались в этих терминах, вам стоит почитать об AMQP.

См.также

В дополнение к Приоритеты сообщений Redis ниже, есть Rabbits and Warrens, отличная статья в блоге, описывающая очереди и обмены. Есть также CloudAMQP tutorial, For users of RabbitMQ the RabbitMQ FAQ, который может быть полезен как источник информации.

Специальные варианты маршрутизации

Приоритеты сообщений RabbitMQ

поддерживаемые транспорты:

RabbitMQ

Добавлено в версии 4.0.

Очереди могут быть настроены на поддержку приоритетов путем установки аргумента x-max-priority:

from kombu import Exchange, Queue

app.conf.task_queues = [
    Queue('tasks', Exchange('tasks'), routing_key='tasks',
          queue_arguments={'x-max-priority': 10}),
]

Значение по умолчанию для всех очередей может быть установлено с помощью параметра task_queue_max_priority:

app.conf.task_queue_max_priority = 10

Приоритет по умолчанию для всех задач также может быть задан с помощью параметра task_default_priority:

app.conf.task_default_priority = 5

Приоритеты сообщений Redis

поддерживаемые транспорты:

Redis

Хотя транспорт Celery Redis учитывает поле приоритета, сам Redis не имеет понятия о приоритетах. Пожалуйста, прочитайте это примечание, прежде чем пытаться реализовать приоритеты в Redis, поскольку вы можете столкнуться с неожиданным поведением.

Чтобы начать планировать задания на основе приоритетов, необходимо настроить транспортную опцию queue_order_strategy.

app.conf.broker_transport_options = {
    'queue_order_strategy': 'priority',
}

Поддержка приоритетов реализована путем создания n списков для каждой очереди. Это означает, что хотя существует 10 (0-9) уровней приоритета, по умолчанию они объединены в 4 уровня для экономии ресурсов. Это означает, что очередь с именем celery в действительности будет разделена на 4 очереди:

['celery0', 'celery3', 'celery6', 'celery9']

Если вам нужно больше уровней приоритета, вы можете установить транспортный параметр priority_steps:

app.conf.broker_transport_options = {
    'priority_steps': list(range(10)),
    'queue_order_strategy': 'priority',
}

При этом учтите, что это никогда не будет так же хорошо, как приоритеты, реализованные на уровне сервера, и может быть в лучшем случае приблизительным. Но это может быть достаточно хорошо для вашего приложения.

AMQP Primer

Сообщения

Сообщение состоит из заголовков и тела. Celery использует заголовки для хранения типа содержимого сообщения и его кодировки. Тип содержимого обычно является форматом сериализации, используемым для сериализации сообщения. Тело содержит имя задания для выполнения, идентификатор задания (UUID), аргументы для его применения и некоторые дополнительные мета-данные - например, количество повторных попыток или ETA.

Это пример сообщения задачи, представленного в виде словаря Python:

{'task': 'myapp.tasks.add',
 'id': '54086c5e-6193-4575-8308-dbab76798756',
 'args': [4, 4],
 'kwargs': {}}

Производители, потребители и брокеры

Клиент, отправляющий сообщения, обычно называется издателем, или производителем, а организация, получающая сообщения, называется потребителем.

Брокер* - это сервер сообщений, который направляет сообщения от производителей к потребителям.

Скорее всего, вы увидите, что эти термины часто используются в материалах, связанных с AMQP.

Обмены, очереди и ключи маршрутизации

  1. Сообщения отправляются на биржи.

  2. Обмен направляет сообщения в одну или несколько очередей. Существует несколько типов обменов, обеспечивающих различные способы маршрутизации или реализующих различные сценарии обмена сообщениями.

  3. Сообщение ожидает в очереди до тех пор, пока кто-то его не употребит.

  4. Сообщение удаляется из очереди, когда оно было подтверждено.

Для отправки и получения сообщений необходимо выполнить следующие действия:

  1. Создать обмен

  2. Создайте очередь

  3. Привязать очередь к обмену.

Celery автоматически создает сущности, необходимые для работы очередей в task_queues (за исключением случаев, когда параметр auto_declare очереди установлен в False).

Вот пример конфигурации очереди с тремя очередями: одна для видео, одна для изображений и одна очередь по умолчанию для всего остального:

from kombu import Exchange, Queue

app.conf.task_queues = (
    Queue('default', Exchange('default'), routing_key='default'),
    Queue('videos',  Exchange('media'),   routing_key='media.video'),
    Queue('images',  Exchange('media'),   routing_key='media.image'),
)
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange_type = 'direct'
app.conf.task_default_routing_key = 'default'

Типы обмена

Тип обмена определяет, как сообщения направляются через обмен. Типы обмена, определенные в стандарте: direct, topic, fanout and headers. Also non-standard exchange types are available as plug-ins to RabbitMQ, like the last-value-cache plug-in Майкла Бриджена.

Прямые обмены

Прямые обмены совпадают по точным ключам маршрутизации, поэтому очередь, связанная ключом маршрутизации video, получает только сообщения с этим ключом маршрутизации.

Тематические обмены

Обмен темами соответствует ключам маршрутизации, используя слова, разделенные точками, и символы подстановки: * (соответствует одному слову) и # (соответствует нулю или более слов).

С такими клавишами маршрутизации, как usa.news, usa.weather, norway.news и norway.weather, привязки могут быть такими: *.news (все новости), usa.# (все события в США) или usa.weather (все погодные события в США).

Практическая работа с API

Celery поставляется с инструментом celery amqp, который используется для доступа к AMQP API через командную строку, обеспечивая доступ к таким задачам администрирования, как создание/удаление очередей и обменов, очистка очередей или отправка сообщений. Он также может быть использован для брокеров, не относящихся к AMQP, но различные реализации могут не реализовать все команды.

Вы можете записывать команды непосредственно в аргументы celery amqp, или просто запустить без аргументов, чтобы запустить его в shell-режиме:

$ celery -A proj amqp
-> connecting to amqp://guest@localhost:5672/.
-> connected.
1>

Здесь 1> - это подсказка. Число 1 - это количество команд, которые вы выполнили на данный момент. Введите help для получения списка доступных команд. Также поддерживается автозавершение, поэтому вы можете начать вводить команду, а затем нажать клавишу tab для отображения списка возможных совпадений.

Давайте создадим очередь, в которую можно отправлять сообщения:

$ celery -A proj amqp
1> exchange.declare testexchange direct
ok.
2> queue.declare testqueue
ok. queue:testqueue messages:0 consumers:0.
3> queue.bind testqueue testexchange testkey
ok.

Это создало прямой обмен testexchange, и очередь с именем testqueue. Очередь привязана к обмену с помощью ключа маршрутизации testkey.

С этого момента все сообщения, отправленные на обмен testexchange с ключом маршрутизации testkey, будут перемещаться в эту очередь. Вы можете отправить сообщение с помощью команды basic.publish:

4> basic.publish 'This is a message!' testexchange testkey
ok.

Теперь, когда сообщение отправлено, вы можете получить его снова. Здесь можно использовать команду basic.get, которая синхронно опрашивает новые сообщения в очереди (это нормально для задач обслуживания, но для сервисов лучше использовать basic.consume).

Вытащить сообщение из очереди:

5> basic.get testqueue
{'body': 'This is a message!',
 'delivery_info': {'delivery_tag': 1,
                   'exchange': u'testexchange',
                   'message_count': 0,
                   'redelivered': False,
                   'routing_key': u'testkey'},
 'properties': {}}

AMQP использует квитирование для подтверждения того, что сообщение было получено и успешно обработано. Если сообщение не было подтверждено и канал потребителя закрыт, сообщение будет доставлено другому потребителю.

Обратите внимание на тег доставки, указанный в структуре выше; в канале соединения каждое полученное сообщение имеет уникальный тег доставки, этот тег используется для подтверждения сообщения. Также обратите внимание, что теги доставки не являются уникальными для всех соединений, поэтому в другом клиенте тег доставки 1 может указывать на другое сообщение, чем в этом канале.

Вы можете подтвердить полученное сообщение с помощью basic.ack:

6> basic.ack 1
ok.

Для очистки после нашей тестовой сессии необходимо удалить созданные вами сущности:

7> queue.delete testqueue
ok. 0 messages deleted.
8> exchange.delete testexchange
ok.

Задачи маршрутизации

Определение очередей

В Celery доступные очереди определяются параметром task_queues.

Вот пример конфигурации очереди с тремя очередями: одна для видео, одна для изображений и одна очередь по умолчанию для всего остального:

default_exchange = Exchange('default', type='direct')
media_exchange = Exchange('media', type='direct')

app.conf.task_queues = (
    Queue('default', default_exchange, routing_key='default'),
    Queue('videos', media_exchange, routing_key='media.video'),
    Queue('images', media_exchange, routing_key='media.image')
)
app.conf.task_default_queue = 'default'
app.conf.task_default_exchange = 'default'
app.conf.task_default_routing_key = 'default'

Здесь task_default_queue будет использоваться для маршрутизации задач, у которых нет явного маршрута.

Обмен по умолчанию, тип обмена и ключ маршрутизации будут использоваться в качестве значений маршрутизации по умолчанию для задач, а также в качестве значений по умолчанию для записей в task_queues.

Также поддерживается несколько привязок к одной очереди. Вот пример двух ключей маршрутизации, которые привязаны к одной очереди:

from kombu import Exchange, Queue, binding

media_exchange = Exchange('media', type='direct')

CELERY_QUEUES = (
    Queue('media', [
        binding(media_exchange, routing_key='media.video'),
        binding(media_exchange, routing_key='media.image'),
    ]),
)

Указание места назначения задачи

Место назначения задания определяется следующим (по порядку):

  1. Аргументы маршрутизации для Task.apply_async().

  2. Атрибуты, связанные с маршрутизацией, определенные на самом Task.

  3. Маршрутизаторы определено в task_routes.

Считается лучшей практикой не кодировать эти параметры жестко, а оставлять их в качестве опций конфигурации, используя Маршрутизаторы; Это наиболее гибкий подход, но разумные значения по умолчанию все же могут быть установлены в качестве атрибутов задачи.

Маршрутизаторы

Маршрутизатор - это функция, которая определяет варианты маршрутизации для задачи.

Все, что вам нужно для определения нового маршрутизатора - это определить функцию с сигнатурой (name, args, kwargs, options, task=None, **kw):

def route_task(name, args, kwargs, options, task=None, **kw):
        if name == 'myapp.tasks.compress_video':
            return {'exchange': 'video',
                    'exchange_type': 'topic',
                    'routing_key': 'video.compress'}

Если вы вернете клавишу queue, она расширится с заданными настройками этой очереди в task_queues:

{'queue': 'video', 'routing_key': 'video.compress'}

становится –>

{'queue': 'video',
 'exchange': 'video',
 'exchange_type': 'topic',
 'routing_key': 'video.compress'}

Вы устанавливаете классы маршрутизатора, добавляя их в настройку task_routes:

task_routes = (route_task,)

Функции маршрутизатора также могут быть добавлены по имени:

task_routes = ('myapp.routers.route_task',)

Для простых сопоставлений имя задачи -> маршрут, как в примере с маршрутизатором выше, вы можете просто подставить dict в task_routes, чтобы получить такое же поведение:

task_routes = {
    'myapp.tasks.compress_video': {
        'queue': 'video',
        'routing_key': 'video.compress',
    },
}

Затем маршрутизаторы будут пройдены по порядку, он остановится на первом маршрутизаторе, вернувшем истинное значение, и использует его в качестве конечного маршрута для задания.

Можно также определить несколько маршрутизаторов в последовательности:

task_routes = [
    route_task,
    {
        'myapp.tasks.compress_video': {
            'queue': 'video',
            'routing_key': 'video.compress',
    },
]

Затем маршрутизаторы будут посещаться по очереди, и будет выбран первый, который вернет значение.

Если вы используете Redis или RabbitMQ, вы также можете указать приоритет очередипо умолчанию в маршруте.

task_routes = {
    'myapp.tasks.compress_video': {
        'queue': 'video',
        'routing_key': 'video.compress',
        'priority': 10,
    },
}

Аналогично, вызов apply_async для задачи отменяет приоритет по умолчанию.

task.apply_async(priority=0)

Порядок приоритетов и реакция кластера

Важно отметить, что из-за предварительной выборки рабочих задач, если несколько задач подаются одновременно, они могут сначала оказаться вне порядка приоритета. Отключение предварительной выборки для рабочих предотвратит эту проблему, но может привести к менее чем идеальной производительности для небольших и быстрых задач. В большинстве случаев простое уменьшение worker_prefetch_multiplier до 1 является более простым и чистым способом повысить отзывчивость вашей системы без затрат на полное отключение предварительной выборки.

Обратите внимание, что при использовании брокера redis значения приоритетов сортируются в обратном порядке: 0 - наивысший приоритет.

Трансляция

Celery также может поддерживать широковещательную маршрутизацию. Вот пример обмена broadcast_tasks, который доставляет копии заданий всем подключенным к нему работникам:

from kombu.common import Broadcast

app.conf.task_queues = (Broadcast('broadcast_tasks'),)
app.conf.task_routes = {
    'tasks.reload_cache': {
        'queue': 'broadcast_tasks',
        'exchange': 'broadcast_tasks'
    }
}

Теперь задание tasks.reload_cache будет отправлено каждому рабочему, потребляющему из этой очереди.

Вот еще один пример широковещательной маршрутизации, на этот раз с расписанием celery beat:

from kombu.common import Broadcast
from celery.schedules import crontab

app.conf.task_queues = (Broadcast('broadcast_tasks'),)

app.conf.beat_schedule = {
    'test-task': {
        'task': 'tasks.reload_cache',
        'schedule': crontab(minute=0, hour='*/3'),
        'options': {'exchange': 'broadcast_tasks'}
    },
}

Трансляция и результаты

Обратите внимание, что Celery result не определяет, что произойдет, если две задачи будут иметь одинаковый task_id. Если одна и та же задача распределена между несколькими работниками, то история состояний может не сохраниться.

В этом случае целесообразно установить атрибут task.ignore_result.

Вернуться на верх