Протокол сообщений

Сообщения о задачах

Версия 2

Определение

properties = {
    'correlation_id': uuid task_id,
    'content_type': string mimetype,
    'content_encoding': string encoding,

    # optional
    'reply_to': string queue_or_url,
}
headers = {
    'lang': string 'py'
    'task': string task,
    'id': uuid task_id,
    'root_id': uuid root_id,
    'parent_id': uuid parent_id,
    'group': uuid group_id,

    # optional
    'meth': string method_name,
    'shadow': string alias_name,
    'eta': iso8601 ETA,
    'expires': iso8601 expires,
    'retries': int retries,
    'timelimit': (soft, hard),
    'argsrepr': str repr(args),
    'kwargsrepr': str repr(kwargs),
    'origin': str nodename,
}

body = (
    object[] args,
    Mapping kwargs,
    Mapping embed {
        'callbacks': Signature[] callbacks,
        'errbacks': Signature[] errbacks,
        'chain': Signature[] chain,
        'chord': Signature chord_callback,
    }
)

Пример

В этом примере отправляется сообщение задачи с использованием версии 2 протокола:

# chain: add(add(add(2, 2), 4), 8) == 2 + 2 + 4 + 8

import json
import os
import socket

task_id = uuid()
args = (2, 2)
kwargs = {}
basic_publish(
    message=json.dumps((args, kwargs, None)),
    application_headers={
        'lang': 'py',
        'task': 'proj.tasks.add',
        'argsrepr': repr(args),
        'kwargsrepr': repr(kwargs),
        'origin': '@'.join([os.getpid(), socket.gethostname()])
    }
    properties={
        'correlation_id': task_id,
        'content_type': 'application/json',
        'content_encoding': 'utf-8',
    }
)

Изменения по сравнению с версией 1

  • Версия протокола, определяемая по наличию заголовка сообщения task.

  • Поддержка нескольких языков с помощью заголовка lang.

    Работник может перенаправить сообщение работнику, поддерживающему данный язык.

  • Мета-данные перенесены в заголовки.

    Это означает, что рабочие/посредники могут проверять сообщение и принимать решения на основе заголовков без декодирования полезной нагрузки (которая может быть специфичной для языка, например, сериализованной сериализатором pickle, специфичным для Python).

  • Всегда UTC

    Больше нет флага utc, поэтому любая информация о времени, в которой отсутствует часовой пояс, будет ожидаться как время UTC.

  • Body - только для данных, специфичных для конкретного языка.

    • Python хранит args/kwargs и встроенные сигнатуры в body.

    • Если сообщение использует необработанную кодировку, то необработанные данные будут переданы в качестве единственного аргумента функции.

    • Java/C и т.д. могут использовать документ Thrift/protobuf в качестве тела

  • origin - имя узла, отправляющего задание.

  • Отправка актеру на основе заголовков task, << 1 >>>

    meth не используется Python, но может быть использован в будущем для указания пар класс+метод.

  • Цепь приобретает выделенное поле.

    Сокращение цепочки в рекурсивный аргумент callbacks вызывает проблемы при превышении предела рекурсии.

    Это исправлено в новом протоколе сообщений путем указания списка подписей, каждая задача затем выводит задачу из списка при отправке следующего сообщения:

    execute_task(message)
    chain = embed['chain']
    if chain:
        sig = maybe_signature(chain.pop())
        sig.apply_async(chain=chain)
    
  • correlation_id заменяет поле task_id.

  • Поля root_id и parent_id помогают отслеживать рабочие потоки.

  • shadow позволяет указать другое имя для журналов, мониторы могут использоваться для таких понятий, как задачи, вызывающие функцию, указанную в качестве аргумента:

    from celery.utils.imports import qualname
    
    class PickleTask(Task):
    
        def unpack_args(self, fun, args=()):
            return fun, args
    
        def apply_async(self, args, kwargs, **options):
            fun, real_args = self.unpack_args(*args)
            return super().apply_async(
                (fun, real_args, kwargs), shadow=qualname(fun), **options
            )
    
    @app.task(base=PickleTask)
    def call(fun, args, kwargs):
        return fun(*args, **kwargs)
    

Версия 1

В версии 1 протокола все поля хранятся в теле сообщения: это означает, что рабочие и промежуточные потребители должны десериализовать полезную нагрузку, чтобы прочитать поля.

Тело сообщения

  • task
    строка:

    Название задания. обязательно

  • id
    строка:

    Уникальный идентификатор задания (UUID). обязательно.

  • args
    список:

    Список аргументов. Будет пустым списком, если не указан.

  • kwargs
    словарь:

    Словарь аргументов ключевых слов. Будет пустым словарем, если не указан.

  • retries
    int:

    Текущее количество повторных попыток выполнения этого задания. По умолчанию равно 0, если не указано.

  • eta
    строка (ISO 8601):

    Расчетное время прибытия. Это дата и время в формате ISO 8601. Если это время не указано, сообщение не будет запланировано, но будет выполнено как можно скорее.

  • expires
    строка (ISO 8601):

    Добавлено в версии 2.0.2.

    Дата истечения срока действия. Это дата и время в формате ISO 8601. Если дата не указана, срок действия сообщения не истечет. Срок действия сообщения истечет, когда сообщение будет получено и дата истечения срока действия будет превышена.

  • taskset
    строка:

    Группа, в которую входит это задание (если есть).

  • chord
    Подпись:

    Добавлено в версии 2.3.

    Обозначает, что данное задание является одной из частей заголовка аккорда. Значение этого ключа является телом аккорда, которое должно быть выполнено после возвращения всех задач в заголовке.

  • utc
    bool:

    Добавлено в версии 2.5.

    Если время истинно, то используется часовой пояс UTC, если нет, то должен использоваться текущий местный часовой пояс.

  • callbacks
    <список>Подпись:

    Добавлено в версии 3.0.

    Список сигнатур для вызова при успешном завершении задачи.

  • errbacks
    <список>Подпись:

    Добавлено в версии 3.0.

    Список сигнатур для вызова в случае возникновения ошибки при выполнении задачи.

  • timelimit
    < кортеж>(float, float).:

    Добавлено в версии 3.1.

    Настройки ограничения времени выполнения задачи. Это кортеж из значений жесткого и мягкого лимита времени (int/float или None для отсутствия лимита).

    Пример значения, задающего мягкое ограничение времени в 3 секунды и жесткое ограничение времени в 10 секунд:

    {'timelimit': (3.0, 10.0)}
    

Пример сообщения

Это пример вызова задачи celery.task.ping в формате json:

{"id": "4cc7438e-afd4-4f8f-a2f3-f46567e7ca77",
 "task": "celery.task.PingTask",
 "args": [],
 "kwargs": {},
 "retries": 0,
 "eta": "2009-11-17T12:30:56.527191"}

Сериализация задач

С помощью заголовка сообщения content_type поддерживается несколько типов форматов сериализации.

MIME-типы, поддерживаемые по умолчанию, показаны в следующей таблице.

Схема

Тип MIME

json

приложение/json

yaml

приложение/x-yaml

маринованный огурец

application/x-python-serialize

msgpack

application/x-msgpack

Сообщения о событиях

Сообщения о событиях всегда сериализуются в формате JSON и могут содержать произвольные поля тела сообщения.

Начиная с версии 4.0. тело может состоять либо из одного отображения (одно событие), либо из списка отображений (несколько событий).

Существуют также стандартные поля, которые всегда должны присутствовать в сообщении о событии:

Стандартные поля тела

  • строка type

    Тип события. Это строка, содержащая категорию и действие, разделенные разделителем тире (например, task-succeeded).

  • строка hostname

    Полное имя хоста, на котором произошло событие.

  • unsigned long long clock

    Значение логических часов для этого события (временная метка Лампорта).

  • float timestamp

    Временная метка UNIX, соответствующая времени, когда произошло событие.

  • signed short utcoffset

    Это поле описывает часовой пояс отправляющего узла и указывается как количество часов впереди/позади UTC (например, -2 или +1).

  • unsigned long long pid

    Идентификатор процесса, в котором произошло событие.

Стандартные типы событий

Список стандартных типов событий и их полей см. в Ссылка на событие.

Пример сообщения

Это поля сообщения для события task-succeeded:

properties = {
    'routing_key': 'task.succeeded',
    'exchange': 'celeryev',
    'content_type': 'application/json',
    'content_encoding': 'utf-8',
    'delivery_mode': 1,
}
headers = {
    'hostname': 'worker1@george.vandelay.com',
}
body = {
    'type': 'task-succeeded',
    'hostname': 'worker1@george.vandelay.com',
    'pid': 6335,
    'clock': 393912923921,
    'timestamp': 1401717709.101747,
    'utcoffset': -1,
    'uuid': '9011d855-fdd1-4f8f-adb3-a413b499eafb',
    'retval': '4',
    'runtime': 0.0003212,
)
Вернуться на верх