Вызов задач

Основы

Этот документ описывает единый «API вызова» Celery, используемый экземплярами задач и canvas.

API определяет стандартный набор опций выполнения, а также три метода:

  • apply_async(args[, kwargs[, …]])

    Отправляет сообщение о задаче.

  • delay(*args, **kwargs)

    Ярлык для отправки сообщения о задаче, но не поддерживает параметры выполнения.

  • вызов (__call__)

    Применение объекта, поддерживающего вызывающий API (например, add(2, 2)), означает, что задание будет выполнено не рабочим, а текущим процессом (сообщение не будет отправлено).

Пример

Метод delay() удобен тем, что выглядит как вызов обычной функции:

task.delay(arg1, arg2, kwarg1='x', kwarg2='y')

Используя apply_async() вместо этого, вы должны написать:

task.apply_async(args=[arg1, arg2], kwargs={'kwarg1': 'x', 'kwarg2': 'y'})

Таким образом, delay явно удобен, но если вы хотите задать дополнительные параметры выполнения, вам придется использовать apply_async.

В остальной части этого документа будут подробно рассмотрены варианты выполнения задачи. Во всех примерах используется задача add, возвращающая сумму двух аргументов:

@app.task
def add(x, y):
    return x + y

Связывание (обратные вызовы/возвраты)

Celery поддерживает связывание задач между собой таким образом, что одна задача следует за другой. Задача обратного вызова будет применена с результатом родительской задачи в качестве частичного аргумента:

add.apply_async((2, 2), link=add.s(16))

Здесь результат первой задачи (4) будет отправлен в новую задачу, которая добавляет 16 к предыдущему результату, формируя выражение (2 + 2) + 16 = 20.

Вы также можете вызвать обратный вызов, если задача вызовет исключение (errback). Рабочий не будет вызывать обратный вызов как задачу, а вместо этого вызовет функцию errback напрямую, чтобы передать ей необработанные объекты запроса, исключения и обратного отслеживания.

Это пример обратного вызова ошибки:

@app.task
def error_handler(request, exc, traceback):
    print('Task {0} raised exception: {1!r}\n{2!r}'.format(
          request.id, exc, traceback))

он может быть добавлен к заданию с помощью опции выполнения link_error:

add.apply_async((2, 2), link_error=error_handler.s())

Кроме того, оба варианта link и link_error могут быть выражены в виде списка:

add.apply_async((2, 2), link=[add.s(16), other_task.s()])

Затем обратные вызовы/возвратные вызовы будут вызываться по порядку, и все обратные вызовы будут вызываться с возвращаемым значением родительской задачи в качестве частичного аргумента.

В сообщении

Celery поддерживает перехват всех изменений состояния путем установки обратного вызова on_message.

Например, для долго выполняющихся задач, чтобы отправить прогресс выполнения задачи, вы можете сделать что-то вроде этого:

@app.task(bind=True)
def hello(self, a, b):
    time.sleep(1)
    self.update_state(state="PROGRESS", meta={'progress': 50})
    time.sleep(1)
    self.update_state(state="PROGRESS", meta={'progress': 90})
    time.sleep(1)
    return 'hello world: %i' % (a+b)
def on_raw_message(body):
    print(body)

a, b = 1, 1
r = hello.apply_async(args=(a, b))
print(r.get(on_message=on_raw_message, propagate=False))

Вывод будет выглядеть следующим образом:

{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
 'result': {'progress': 50},
 'children': [],
 'status': 'PROGRESS',
 'traceback': None}
{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
 'result': {'progress': 90},
 'children': [],
 'status': 'PROGRESS',
 'traceback': None}
{'task_id': '5660d3a3-92b8-40df-8ccc-33a5d1d680d7',
 'result': 'hello world: 10',
 'children': [],
 'status': 'SUCCESS',
 'traceback': None}
hello world: 10

Расчетное время прибытия и обратный отсчет

ETA (расчетное время прибытия) позволяет вам установить конкретную дату и время, которое является самым ранним временем, когда ваша задача будет выполнена. countdown - это ярлык для установки ETA на секунды в будущее.

>>> result = add.apply_async((2, 2), countdown=3)
>>> result.get()    # this takes at least 3 seconds to return
20

Задача гарантированно будет выполнена в некоторое время после указанной даты и времени, но не обязательно в это точное время. Возможными причинами нарушения сроков могут быть множество элементов, ожидающих в очереди, или высокая задержка в сети. Чтобы убедиться, что ваши задания выполняются своевременно, необходимо следить за перегрузкой очереди. Используйте Munin или аналогичные инструменты для получения предупреждений, чтобы можно было принять соответствующие меры для снижения нагрузки. См. Мунин.

В то время как countdown является целым числом, eta должна быть объектом datetime, указывающим точную дату и время (включая миллисекундную точность и информацию о часовом поясе):

>>> from datetime import datetime, timedelta

>>> tomorrow = datetime.utcnow() + timedelta(days=1)
>>> add.apply_async((2, 2), eta=tomorrow)

Срок действия

Аргумент expires определяет необязательное время истечения срока действия, либо как секунды после публикации задания, либо как конкретную дату и время, используя datetime:

>>> # Task expires after one minute from now.
>>> add.apply_async((10, 10), expires=60)

>>> # Also supports datetime
>>> from datetime import datetime, timedelta
>>> add.apply_async((10, 10), kwargs,
...                 expires=datetime.now() + timedelta(days=1)

Когда рабочий получает просроченное задание, он пометит задание как REVOKED (TaskRevokedError).

Повторная отправка сообщения

Celery будет автоматически повторять отправку сообщений в случае разрыва соединения, и поведение повтора может быть настроено - например, как часто повторять, или максимальное количество повторов - или отключено совсем.

Чтобы отключить повторную попытку, можно установить параметр выполнения retry на False:

add.apply_async((2, 2), retry=False)

Политика повторных попыток

Политика повторных попыток - это отображение, которое управляет поведением повторных попыток, и может содержать следующие ключи:

  • max_retries

    Максимальное количество повторных попыток перед тем, как сдаться, в этом случае будет поднято исключение, вызвавшее неудачу повторной попытки.

    Значение None означает, что попытка будет повторяться вечно.

    По умолчанию повторная попытка выполняется 3 раза.

  • interval_start

    Определяет количество секунд (плавающее или целое число) для ожидания между повторными попытками. По умолчанию равно 0 (первая повторная попытка будет мгновенной).

  • интервал_шаг

    При каждом последующем повторе это число будет добавляться к задержке повтора (плавающее или целое число). По умолчанию 0,2.

  • interval_max

    Максимальное количество секунд (плавающее или целое число) для ожидания между повторными попытками. По умолчанию 0,2.

Например, политика по умолчанию соотносится с:

add.apply_async((2, 2), retry=True, retry_policy={
    'max_retries': 3,
    'interval_start': 0,
    'interval_step': 0.2,
    'interval_max': 0.2,
})

максимальное время повторных попыток составит 0,4 секунды. По умолчанию оно установлено относительно коротким, так как обрыв соединения может привести к эффекту кучи повторных попыток, если соединение брокера не работает. Например, многие процессы веб-сервера, ожидающие повторной попытки, блокируют другие входящие запросы.

Обработка ошибок подключения

Когда вы отправляете задание, а транспортное соединение теряется или соединение не может быть инициировано, будет выдана ошибка OperationalError:

>>> from proj.tasks import add
>>> add.delay(2, 2)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "celery/app/task.py", line 388, in delay
        return self.apply_async(args, kwargs)
  File "celery/app/task.py", line 503, in apply_async
    **options
  File "celery/app/base.py", line 662, in send_task
    amqp.send_task_message(P, name, message, **options)
  File "celery/backends/rpc.py", line 275, in on_task_call
    maybe_declare(self.binding(producer.channel), retry=True)
  File "/opt/celery/kombu/kombu/messaging.py", line 204, in _get_channel
    channel = self._channel = channel()
  File "/opt/celery/py-amqp/amqp/connection.py", line 272, in connect
    self.transport.connect()
  File "/opt/celery/py-amqp/amqp/transport.py", line 100, in connect
    self._connect(self.host, self.port, self.connect_timeout)
  File "/opt/celery/py-amqp/amqp/transport.py", line 141, in _connect
    self.sock.connect(sa)
  kombu.exceptions.OperationalError: [Errno 61] Connection refused

Если у вас включено retries, это произойдет только после исчерпания повторных попыток или при немедленном отключении.

Вы можете справиться и с этой ошибкой:

>>> from celery.utils.log import get_logger
>>> logger = get_logger(__name__)

>>> try:
...     add.delay(2, 2)
... except add.OperationalError as exc:
...     logger.exception('Sending task raised: %r', exc)

Сериализаторы

Данные, передаваемые между клиентами и рабочими, должны быть сериализованы, поэтому каждое сообщение в Celery имеет заголовок content_type, который описывает метод сериализации, используемый для его кодирования.

Сериализатор по умолчанию - JSON, но вы можете изменить его с помощью параметра task_serializer, или для каждой отдельной задачи, или даже для каждого сообщения.

Имеется встроенная поддержка JSON, pickle, YAML и msgpack, а также вы можете добавить свои собственные сериализаторы, зарегистрировав их в реестре сериализаторов Kombu.

См.также

Message Serialization в руководстве пользователя Kombu.

Каждый вариант имеет свои преимущества и недостатки.

json – JSON поддерживается во многих языках программирования, сейчас это

является стандартной частью Python (начиная с версии 2.6) и довольно быстро декодируется с помощью современных библиотек Python, таких как simplejson.

Основным недостатком JSON является то, что он ограничивает вас следующими типами данных: строки, Unicode, плавающие числа, булевы выражения, словари и списки. Десятичные числа и даты отсутствуют.

Двоичные данные будут передаваться с использованием кодировки Base64, что увеличивает размер передаваемых данных на 34% по сравнению с форматом кодировки, в котором поддерживаются собственные двоичные типы.

Однако, если ваши данные вписываются в вышеуказанные ограничения и вам нужна кросс-языковая поддержка, установка по умолчанию JSON, вероятно, будет лучшим выбором.

Дополнительную информацию см. на сайте http://json.org.

Примечание

(Из официальных документов Python https://docs.python.org/3.6/library/json.html) Ключи в парах ключ/значение JSON всегда имеют тип str. Когда словарь преобразуется в JSON, все ключи словаря преобразуются в строки. В результате этого, если словарь преобразуется в JSON, а затем обратно в словарь, словарь может быть не равен исходному. То есть, loads(dumps(x)) != x, если x имеет нестроковые ключи.

pickle - Если у вас нет желания поддерживать какой-либо язык, кроме

Python, то использование кодировки pickle обеспечит вам поддержку всех встроенных типов данных Python (кроме экземпляров классов), меньший размер сообщений при отправке двоичных файлов и небольшое ускорение по сравнению с обработкой JSON.

Дополнительную информацию см. в разделе pickle.

yaml – YAML обладает многими теми же характеристиками, что и json,

за исключением того, что он изначально поддерживает больше типов данных (включая даты, рекурсивные ссылки и т.д.).

Однако библиотеки Python для YAML работают немного медленнее, чем библиотеки для JSON.

Если вам нужен более выразительный набор типов данных и необходимо поддерживать межъязыковую совместимость, то YAML может подойти лучше, чем вышеперечисленные.

Дополнительную информацию см. на сайте http://yaml.org/.

msgpack – msgpack - это формат двоичной сериализации, который ближе к JSON

в функциях. Однако он еще очень молод, и на данный момент его поддержку следует считать экспериментальной.

Дополнительную информацию см. на сайте http://msgpack.org/.

Используемая кодировка доступна в заголовке сообщения, поэтому рабочий знает, как десериализовать любую задачу. Если вы используете пользовательский сериализатор, этот сериализатор должен быть доступен для рабочего.

Следующий порядок используется для определения сериализатора, используемого при отправке задачи:

  1. Опция выполнения serializer.

  2. Атрибут Task.serializer

  3. Настройка task_serializer.

Пример установки пользовательского сериализатора для одного вызова задачи:

>>> add.apply_async((10, 10), serializer='json')

Компрессия

Celery может сжимать сообщения, используя следующие встроенные схемы:

  • `бротли

    brotli оптимизирован для работы в Интернете, в частности, для небольших текстовых документов. Он наиболее эффективен для обслуживания статического содержимого, такого как шрифты и html-страницы.

    Чтобы использовать его, установите Celery с:

    $ pip install celery[brotli]
    
  • bzip2

    bzip2 создает файлы меньшего размера, чем gzip, но скорость сжатия и распаковки заметно ниже, чем у gzip.

    Чтобы использовать его, убедитесь, что ваш исполняемый файл Python был скомпилирован с поддержкой bzip2.

    Если вы получите следующее ImportError:

    >>> import bz2
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
    ImportError: No module named 'bz2'
    

    означает, что вам следует перекомпилировать вашу версию Python с поддержкой bzip2.

  • gzip

    gzip подходит для систем, которым требуется небольшой объем памяти, что делает его идеальным для систем с ограниченной памятью. Он часто используется для создания файлов с расширением «.tar.gz».

    Чтобы использовать его, убедитесь, что ваш исполняемый файл Python был скомпилирован с поддержкой gzip.

    Если вы получите следующее ImportError:

    >>> import gzip
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
    ImportError: No module named 'gzip'
    

    означает, что вам следует перекомпилировать вашу версию Python с поддержкой gzip.

  • lzma

    lzma обеспечивает хороший коэффициент сжатия и работает с высокой скоростью сжатия и распаковки за счет более высокого использования памяти.

    Чтобы использовать его, убедитесь, что ваш исполняемый файл Python был скомпилирован с поддержкой lzma и что ваша версия Python - 3.3 и выше.

    Если вы получите следующее ImportError:

    >>> import lzma
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
    ImportError: No module named 'lzma'
    

    означает, что вам следует перекомпилировать вашу версию Python с поддержкой lzma.

    Кроме того, вы можете установить бэкпорт, используя:

    $ pip install celery[lzma]
    
  • zlib

    zlib - это абстракция алгоритма Deflate в виде библиотеки, которая включает в свой API поддержку как формата файлов gzip, так и облегченного формата потоков. Это важнейший компонент многих программных систем - ядра Linux и Git VCS, и это лишь некоторые из них.

    Чтобы использовать его, убедитесь, что ваш исполняемый файл Python был скомпилирован с поддержкой zlib.

    Если вы получите следующее ImportError:

    >>> import zlib
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
    ImportError: No module named 'zlib'
    

    это означает, что вам следует перекомпилировать вашу версию Python с поддержкой zlib.

  • zstd

    zstd нацелен на сценарии сжатия в реальном времени на уровне zlib и лучшие коэффициенты сжатия. Он поддерживается очень быстрым энтропийным этапом, обеспечиваемым библиотекой Huff0 и FSE.

    Чтобы использовать его, установите Celery с:

    $ pip install celery[zstd]
    

Вы также можете создавать свои собственные схемы сжатия и регистрировать их в kombu compression registry.

Для выбора схемы сжатия, используемой при отправке задания, используется следующий порядок:

  1. Опция выполнения compression.

  2. Атрибут Task.compression.

  3. Атрибут task_compression.

Пример указания сжатия, используемого при вызове задачи:

>>> add.apply_async((2, 2), compression='zlib')

Соединения

Вы можете обрабатывать соединение вручную, создав издателя:

results = []
with add.app.pool.acquire(block=True) as connection:
    with add.get_publisher(connection) as publisher:
        try:
            for args in numbers:
                res = add.apply_async((2, 2), publisher=publisher)
                results.append(res)
print([res.get() for res in results])

Хотя этот конкретный пример гораздо лучше выражен как группа:

>>> from celery import group

>>> numbers = [(2, 2), (4, 4), (8, 8), (16, 16)]
>>> res = group(add.s(i, j) for i, j in numbers).apply_async()

>>> res.get()
[4, 8, 16, 32]

Параметры маршрутизации

Celery может направлять задачи в разные очереди.

Простая маршрутизация (имя <-> имя) осуществляется с помощью опции queue:

add.apply_async(queue='priority.high')

Затем вы можете назначить рабочих в очередь priority.high, используя аргумент workers -Q:

$ celery -A proj worker -l INFO -Q celery,priority.high

См.также

Жесткое кодирование имен очередей в коде не рекомендуется, лучше всего использовать маршрутизаторы конфигурации (task_routes).

Чтобы узнать больше о маршрутизации, смотрите Задачи маршрутизации.

Варианты результатов

Вы можете включить или выключить сохранение результатов с помощью параметра task_ignore_result или с помощью параметра ignore_result:

>>> result = add.apply_async((1, 2), ignore_result=True)
>>> result.get()
None

>>> # Do not ignore result (default)
...
>>> result = add.apply_async((1, 2), ignore_result=False)
>>> result.get()
3

Если вы хотите хранить дополнительные метаданные о задаче в бэкенде результатов, установите для параметра result_extended значение True.

См.также

Более подробную информацию о задачах можно найти в разделе Задачи.

Дополнительные параметры

Эти опции предназначены для опытных пользователей, которые хотят использовать все возможности маршрутизации AMQP. Заинтересованные лица могут прочитать routing guide.

  • обмен

    Имя обмена (или kombu.entity.Exchange), на который нужно отправить сообщение.

  • ключ_маршрутизации

    Ключ маршрутизации, используемый для определения.

  • приоритет

    Число от 0 до 255, где 255 - наивысший приоритет.

    Поддерживаются: RabbitMQ, Redis (приоритет обратный, 0 - наивысший).

Вернуться на верх