Что нового в Celery 3.1 (Cipater)

Автор:

Спросите Солема (ask at celeryproject.org)

Celery - это простая, гибкая и надежная распределенная система для обработки огромного количества сообщений, предоставляющая операторам инструменты, необходимые для обслуживания такой системы.

Это очередь задач, ориентированная на обработку в реальном времени, а также поддерживающая планирование задач.

Celery имеет большое и разнообразное сообщество пользователей и разработчиков, вам стоит присоединиться к нам on IRC или << 1 >>>.

Чтобы узнать больше о сельдерее, вам следует перейти по ссылке introduction.

Хотя эта версия обратно совместима с предыдущими версиями, важно, чтобы вы прочитали следующий раздел.

Эта версия официально поддерживается на CPython 2.6, 2.7 и 3.3, а также поддерживается на PyPy.

Предисловие

Тупики давно мучают наших рабочих, и хотя они редкость, они неприемлемы. Они также печально известны тем, что их очень трудно диагностировать и воспроизвести, поэтому, чтобы облегчить эту работу, я написал набор стресс-тестов, который бомбардирует рабочий различными задачами в попытке сломать его.

Что произойдет, если тысячи дочерних процессов рабочего будут убиваться каждую секунду? Что, если мы также будем убивать соединение с брокером каждые 10 секунд? Это примеры того, что пакет стресс-тестов будет делать с рабочим, и он повторно проводит эти тесты, используя различные комбинации конфигурации, чтобы найти крайние ошибки.

В итоге мне пришлось переписать пул prefork, чтобы избежать использования семафора POSIX. Это было чрезвычайно сложно, но после нескольких месяцев напряженной работы рабочий наконец-то прошел набор стресс-тестов.

Возможно, мы найдем еще больше ошибок, но хорошая новость заключается в том, что теперь у нас есть инструмент для их воспроизведения, так что если вам не повезет и вы столкнетесь с ошибкой, мы напишем тест для нее и устраним ее!

Обратите внимание, что я также перевел многие брокерские транспорты в статус экспериментальных: единственными транспортами, рекомендованными для использования в производстве, на сегодняшний день являются RabbitMQ и Redis.

У меня нет ресурсов, чтобы поддерживать их все, поэтому ошибки остаются нерешенными. Я бы хотел, чтобы кто-нибудь взял на себя ответственность за эти транспорты или пожертвовал ресурсы для их улучшения, но в нынешней ситуации я не думаю, что качество соответствует остальной кодовой базе, поэтому я не могу рекомендовать их для использования в производстве.

В следующей версии Celery 4.0 основное внимание будет уделено производительности и удалению редко используемых частей библиотеки. Также началась работа над новым протоколом сообщений, поддержкой нескольких языков и многим другим. Первоначальный проект можно найти here.

Это, вероятно, был самый трудный релиз, над которым я работал, поэтому ни одно вступление к этому журналу изменений не будет полным без огромного спасибо всем, кто внес свой вклад и помог мне протестировать его!

Спасибо за вашу поддержку!

- Спроси Солема

Важные замечания

Отказано в поддержке Python 2.5

Для Celery теперь требуется Python 2.6 или более поздней версии.

Новая двойная кодовая база работает как на Python 2, так и на 3, не требуя инструмента переноса 2to3.

Примечание

Это также последняя версия, поддерживающая Python 2.6! Начиная с Celery 4.0 и далее потребуется Python 2.7 или более поздняя версия.

Последняя версия, включающая Pickle по умолчанию

Начиная с Celery 4.0 сериализатором по умолчанию будет json.

Если вы зависите от принятия pickle, вам следует подготовиться к этому изменению, явно разрешив своему работнику потреблять pickled-сообщения с помощью параметра CELERY_ACCEPT_CONTENT:

CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']

Убедитесь, что вы выбрали только те форматы сериализации, которые вы действительно будете использовать, и убедитесь, что вы должным образом защитили свой брокер от нежелательного доступа (см. Security Guide).

Рабочий выдаст предупреждение об устаревании, если вы не определите этот параметр.

Старые программы командной строки удалены и устарели

Все должны перейти на новую команду celery umbrella, поэтому мы постепенно сокращаем старые названия команд.

В этой версии мы удалили все команды, которые не используются в init-скриптах. Остальные будут удалены в версии 4.0.

Программа

Новый статус

Замена

celeryd

ДЕПРЕКАТИРОВАНО

celery worker

celerybeat

ДЕПРЕКАТИРОВАНО

celery beat

celeryd-multi

ДЕПРЕКАТИРОВАНО

celery multi

celeryctl

УДАЛЕНО

celery inspect|control

celeryev

УДАЛЕНО

celery events

camqadm

УДАЛЕНО

celery amqp

Если это не новая установка, то вы можете удалить старые команды:

$ pip uninstall celery
$ # repeat until it fails
# ...
$ pip uninstall celery
$ pip install celery

Пожалуйста, выполните celery --help для получения помощи с помощью команды umbrella.

Новости

Улучшение бассейна в Префорке

Эти улучшения активны только в том случае, если вы используете транспорт с поддержкой async. Это означает, что на данный момент поддерживаются только RabbitMQ (AMQP) и Redis, а для других транспортов по-прежнему будет использоваться реализация резервного копирования на основе потоков.

  • Теперь пул использует одну очередь IPC на каждый дочерний процесс.

    Ранее пул разделял одну очередь между всеми дочерними процессами, используя семафор POSIX в качестве мьютекса для достижения эксклюзивного доступа на чтение и запись.

    Семафор POSIX теперь удален, и каждый дочерний процесс получает выделенную очередь. Это означает, что рабочему потребуется больше файловых дескрипторов (два дескриптора на процесс), но это также означает, что производительность повысилась, и мы можем отправлять работу отдельным дочерним процессам.

    Семафоры POSIX не освобождаются при завершении процесса, поэтому уничтожение процессов может привести к тупику, если это произойдет во время получения семафора. Хорошего решения для исправления этого не существует, поэтому лучшим вариантом было удалить семафор.

  • Асинхронные операции записи

    Теперь пул использует асинхронный ввод-вывод для отправки работы дочерним процессам.

  • Обнаружение потерянного процесса теперь происходит мгновенно.

    Если дочерний процесс был убит или загадочно завершен, пул ранее должен был ждать 30 секунд, прежде чем пометить задание символом WorkerLostError. Он должен был это делать, потому что out-queue была общей для всех процессов, и пул не мог быть уверен, выполнил ли процесс задачу или нет. Поэтому был выбран произвольный тайм-аут в 30 секунд, так как считалось, что к этому моменту out-queue будет исчерпан.

    Этот тайм-аут больше не нужен, и поэтому задачу можно пометить как неудавшуюся, как только пул получит уведомление о завершении процесса.

  • Исправлены редкие состояния гонки

    О большинстве из этих ошибок нам не сообщали, но они были обнаружены во время проведения нового набора стресс-тестов.

Оговорки

Django поддерживается из коробки

Celery 3.0 представил новый блестящий API, но, к сожалению, в нем не было решения для пользователей Django.

В этой версии ситуация меняется, поскольку Django теперь поддерживается в ядре, и новые пользователи Django, приходящие в Celery, теперь должны использовать новый API напрямую.

В сообществе Django принято, что для каждой библиотеки существует отдельный пакет django-x, действующий как мост между Django и библиотекой.

Наличие отдельного проекта для пользователей Django было болью для Celery, с несколькими трекерами проблем и несколькими источниками документации, и, наконец, начиная с версии 3.0, у нас даже были разные API.

В этой версии мы бросаем вызов тому, что конвенция и пользователи Django будут использовать ту же библиотеку, тот же API и ту же документацию, что и все остальные.

Не стоит торопиться переносить существующий код для использования нового API, но если вы хотите поэкспериментировать с ним, вам следует знать об этом:

  • Вам необходимо использовать экземпляр приложения Celery.

    Новый API Celery, представленный в версии 3.0, требует от пользователей инстанцирования библиотеки путем создания приложения:

    from celery import Celery
    
    app = Celery()
    
  • Вам необходимо явно интегрировать Celery с Django

    Celery не будет автоматически использовать настройки Django, поэтому вы можете либо настроить Celery отдельно, либо указать ему использовать настройки Django:

    app.config_from_object('django.conf:settings')
    

    Он также не будет автоматически обходить установленные приложения в поисках модулей задач. Если вам нужно такое поведение, вы должны явно передать список экземпляров Django в приложение Celery:

    from django.conf import settings
    app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
    
  • Вы больше не используете manage.py

    Вместо этого вы используете непосредственно команду celery:

    $ celery -A proj worker -l info
    

    Чтобы это работало, ваш модуль приложения должен хранить переменную окружения DJANGO_SETTINGS_MODULE, см. пример в Django guide.

Чтобы начать работу с новым API, вам следует сначала прочитать учебник Первые шаги с сельдереем, а затем - инструкции по работе с Django в Первые шаги в работе с Django.

Исправления и улучшения, применяемые библиотекой django-celery, теперь автоматически применяются ядром Celery, когда оно обнаруживает, что установлена переменная окружения DJANGO_SETTINGS_MODULE.

Дистрибутив поставляется с новым примером проекта, использующего Django в examples/django:

https://github.com/celery/celery/tree/3.1/examples/django

Для некоторых функций по-прежнему требуется библиотека django-celery:

  • Celery не реализует базу данных Django и бэкенды результатов кэширования.

  • Celery не поставляется с периодической задачей на основе базы данных

    планировщик.

Примечание

Если вы все еще используете старый API при обновлении до Celery 3.1, то вы должны убедиться, что ваш модуль настроек содержит строку djcelery.setup_loader(), поскольку это больше не будет происходить как побочный эффект импорта модуля django-celery.

Новым пользователям (или если вы перешли на новый API) строка setup_loader больше не нужна, и вы должны убедиться, что она удалена.

События теперь упорядочиваются с использованием логического времени

Идеальная синхронизация физических часов невозможна, поэтому использование временных меток для упорядочивания событий в распределенной системе ненадежно.

Сообщения о событиях Celery уже некоторое время включают логическое значение часов, но начиная с этой версии это поле также используется для их упорядочивания.

Кроме того, события теперь записывают информацию о часовом поясе, включая новое поле utcoffset в сообщение о событии. Это знаковое целое число, показывающее разницу с временем UTC в часах, поэтому, например, событие, отправленное из часового пояса Европа/Лондон в летнее время, будет иметь смещение в 1.

app.events.Receiver будет автоматически преобразовывать временные метки в местный часовой пояс.

Примечание

Логические часы синхронизируются с другими узлами в том же кластере (соседями), поэтому это означает, что логическая эпоха начнется в тот момент, когда запустится первый рабочий в кластере.

Если все рабочие будут выключены, значение часов будет потеряно и сброшено на 0. Для защиты от этого, вы должны указать опцию celery worker --statedb, чтобы рабочий мог сохранить значение часов при выключении.

Вы можете заметить, что логические часы представляют собой целочисленное значение и очень быстро увеличиваются. Однако не беспокойтесь о том, что значение переполнится, поскольку даже в самых загруженных кластерах может пройти несколько тысячелетий, прежде чем часы превысят значение в 64 бита.

Новый формат имени рабочего узла (name@host)

Имена узлов теперь строятся из двух элементов: name и host-name, разделенных „@“.

Это изменение было сделано для более легкого определения нескольких экземпляров, работающих на одной машине.

Если пользовательское имя не указано, то рабочий будет использовать имя „celery“ по умолчанию, в результате чего полное имя узла будет „celery@hostname“:

$ celery worker -n example.com
celery@example.com

Чтобы также задать имя, необходимо включить символ @:

$ celery worker -n worker1@example.com
worker1@example.com

Рабочий будет идентифицировать себя, используя полное имя узла в событиях и широковещательных сообщениях, поэтому если раньше рабочий идентифицировал себя как „worker1.example.com“, то теперь он будет использовать „celery@worker1.example.com“.

Помните, что аргумент -n также поддерживает простые замены переменных, поэтому если текущее имя хоста george.example.com, то макрос %h расширится до него:

$ celery worker -n worker1@%h
worker1@george.example.com

Возможные замены следующие:

Переменная

Замена

%h

Полное имя хоста (включая доменное имя)

%d

Только доменное имя

%n

Только имя хоста (без доменного имени)

%%

Символ %

Связанные задачи

Декоратор задач теперь может создавать «связанные задачи», что означает, что задача будет получать аргумент self.

@app.task(bind=True)
def send_twitter_status(self, oauth, tweet):
    try:
        twitter = Twitter(oauth)
        twitter.update_status(tweet)
    except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
        raise self.retry(exc=exc)

Использование связанных задач теперь является рекомендуемым подходом, когда вам нужен доступ к экземпляру задачи или контексту запроса. Ранее вместо этого приходилось обращаться к имени задачи (send_twitter_status.retry), но это могло привести к проблемам в некоторых конфигурациях.

Mingle: Синхронизация рабочих

Теперь рабочий будет пытаться синхронизироваться с другими рабочими в том же кластере.

Синхронизированные данные в настоящее время включают отозванные задания и логические часы.

Это происходит только при запуске и вызывает секундную задержку при запуске для сбора широковещательных ответов от других рабочих.

Вы можете отключить этот шаг загрузки с помощью опции celery worker --without-mingle.

Сплетни: Рабочий <-> Рабочее общение

Рабочие теперь пассивно подписываются на события, связанные с рабочими, такие как сердцебиение.

Это означает, что рабочий знает, что делают другие рабочие, и может обнаружить, если они уходят в оффлайн. В настоящее время это используется только для синхронизации часов, но есть много возможностей для будущих дополнений, и вы можете написать расширения, которые используют это преимущество уже сейчас.

Некоторые идеи включают протоколы консенсуса, перенаправление задачи лучшему работнику (на основе использования ресурсов или локальности данных) или перезапуск работников при сбое.

Мы считаем, что хотя это небольшое дополнение, оно открывает удивительные возможности.

Вы можете отключить этот шаг загрузки с помощью опции celery worker --without-gossip.

Бутстепы: Расширение рабочего

Написав bootsteps, вы теперь можете легко расширить потребительскую часть worker для добавления дополнительных возможностей, например, пользовательских потребителей сообщений.

Рабочий уже некоторое время использует bootsteps, но они никогда не были документированы. В этой версии потребительская часть рабочего также была переписана для использования bootsteps, а в новом руководстве Расширения и бутстепы документированы примеры расширения рабочего, включая добавление пользовательских потребителей сообщений.

Дополнительную информацию см. в руководстве Расширения и бутстепы.

Примечание

Шаги загрузки, написанные для старых версий, не будут совместимы с этой версией, так как API значительно изменился.

Старый API был экспериментальным и внутренним, но если вам не повезло использовать его, пожалуйста, свяжитесь с нами в списке рассылки, и мы поможем вам перенести bootstep на новый API.

Новый бэкенд результатов RPC

Эта новая экспериментальная версия бэкенда результатов amqp является хорошей альтернативой для использования в классических сценариях RPC, где процесс, инициирующий задачу, всегда является процессом для получения результата.

Он использует Kombu для отправки и получения результатов, и каждый клиент использует уникальную очередь для отправки ответов. Это позволяет избежать значительных накладных расходов оригинального бэкенда результатов amqp, который создает одну очередь для каждой задачи.

По умолчанию результаты, отправленные с помощью этого бэкенда, не сохраняются, поэтому они не переживут перезапуск брокера. Вы можете включить параметр CELERY_RESULT_PERSISTENT, чтобы изменить это.

CELERY_RESULT_BACKEND = 'rpc'
CELERY_RESULT_PERSISTENT = True

Обратите внимание, что аккорды в настоящее время не поддерживаются бэкендом RPC.

Временные ограничения теперь могут быть установлены клиентом

В Calling API были добавлены две новые опции: time_limit и soft_time_limit:

>>> res = add.apply_async((2, 2), time_limit=10, soft_time_limit=8)

>>> res = add.subtask((2, 2), time_limit=10, soft_time_limit=8).delay()

>>> res = add.s(2, 2).set(time_limit=10, soft_time_limit=8).delay()

При участии Мгера Мовсисяна.

Redis: широковещательные сообщения и виртуальные хосты

В настоящее время широковещательные сообщения видны всем виртуальным хостам при использовании транспорта Redis. Теперь это можно исправить, включив префикс для всех каналов, чтобы сообщения разделялись:

BROKER_TRANSPORT_OPTIONS = {'fanout_prefix': True}

Обратите внимание, что вы не сможете общаться с рабочими, работающими на более старых версиях, или с рабочими, у которых эта настройка не включена.

Эта настройка будет использоваться по умолчанию в будущей версии.

Связано с проблемой #1490.

pytz заменяет python-dateutil зависимость

Celery больше не зависит от библиотеки python-dateutil, вместо этого была добавлена новая зависимость от библиотеки pytz.

Библиотека pytz уже была рекомендована для точной поддержки часовых поясов.

Это также означает, что зависимости одинаковы как для Python 2, так и для Python 3, и что файл requirements/default-py3k.txt был удален.

Поддержка дополнительных требований setuptools

Pip теперь поддерживает формат дополнительных требований setuptools, поэтому мы убрали старое понятие bundles, и вместо этого указываем setuptools extras.

Вы устанавливаете дополнения, указывая их в скобках:

$ pip install celery[redis,mongodb]

Вышеперечисленное установит зависимости для Redis и MongoDB. Вы можете перечислить столько дополнений, сколько захотите.

Предупреждение

Вы больше не можете использовать пакеты celery-with-*, так как они не будут обновлены для использования Celery 3.1.

Расширение

Ввод требований

Тип

Redis

celery[redis]

транспорт, бэкенд результатов

MongoDB

celery[mongodb]

транспорт, бэкенд результатов

CouchDB

celery[couchdb]

транспорт

Beanstalk

celery[beanstalk]

транспорт

ZeroMQ

celery[zeromq]

транспорт

Смотритель зоопарка

celery[zookeeper]

транспорт

SQLAlchemy

celery[sqlalchemy]

транспорт, бэкенд результатов

librabbitmq

celery[librabbitmq]

транспорт (C amqp клиент)

Полный список с примерами находится в разделе Пакеты.

subtask.__call__() теперь выполняет задание напрямую

Недоразумение привело к тому, что Signature.__call__ является псевдонимом .delay, но это не соответствует API вызова Task, который вызывает базовый метод задачи.

Это означает, что:

@app.task
def add(x, y):
    return x + y

add.s(2, 2)()

теперь делает то же самое, что и прямой вызов задачи:

>>> add(2, 2)

Другие новости

  • Теперь зависит от Kombu 3.0.

  • Теперь зависит от billiard версии 3.3.

  • Worker теперь будет аварийно завершаться, если запущен от имени пользователя root с включенным pickle.

  • Canvas: group.apply_async и chain.apply_async больше не запускают отдельное задание.

    То, что примитивы групп и аккордов поддерживают «вызов API», как и другие подзадачи, было хорошей идеей, но на практике это было бесполезно и часто путало пользователей. Если вы все еще хотите такого поведения, вы можете определить задачу, которая будет делать это за вас.

  • Новый метод Signature.freeze() может использоваться для «финализации» сигнатур/подзадач.

    Обычная подпись:

    >>> s = add.s(2, 2)
    >>> result = s.freeze()
    >>> result
    <AsyncResult: ffacf44b-f8a1-44e9-80a3-703150151ef2>
    >>> s.delay()
    <AsyncResult: ffacf44b-f8a1-44e9-80a3-703150151ef2>
    

    Группа:

    >>> g = group(add.s(2, 2), add.s(4, 4))
    >>> result = g.freeze()
    <GroupResult: e1094b1d-08fc-4e14-838e-6d601b99da6d [
        70c0fb3d-b60e-4b22-8df7-aa25b9abc86d,
        58fcd260-2e32-4308-a2ea-f5be4a24f7f4]>
    >>> g()
    <GroupResult: e1094b1d-08fc-4e14-838e-6d601b99da6d [70c0fb3d-b60e-4b22-8df7-aa25b9abc86d, 58fcd260-2e32-4308-a2ea-f5be4a24f7f4]>
    
  • Определено поведение исключения аккордов (проблема #1172).

    Начиная с этой версии, обратный вызов аккорда будет менять состояние на FAILURE, когда задача, являющаяся частью аккорда, вызывает исключение.

    Смотрите больше на Обработка ошибок.

  • Новая возможность указывать дополнительные параметры командной строки для программ worker и beat.

    Атрибут app.user_options может быть использован для добавления дополнительных аргументов командной строки и ожидает опций в стиле optparse:

    from celery import Celery
    from celery.bin import Option
    
    app = Celery()
    app.user_options['worker'].add(
        Option('--my-argument'),
    )
    

    Дополнительную информацию см. в руководстве Расширения и бутстепы.

  • Все события теперь включают поле pid, которое является идентификатором процесса, отправившего событие.

  • Пульс события теперь рассчитывается на основе времени, когда событие было получено монитором, а не времени, о котором сообщил работник.

    Это означает, что работник с несинхронизированными часами больше не будет отображаться в мониторах как «Offline».

    Теперь предупреждение выдается, если разница между временем отправителя и внутренним временем превышает 15 секунд, что свидетельствует о рассинхронизации часов.

  • Поддержка монотонных часов.

    Монотонные часы теперь используются для тайм-аутов и составления расписания.

    Функция монотонных часов встроена, начиная с Python 3.4, но у нас также есть запасные реализации для Linux и macOS.

  • celery worker теперь поддерживает новый аргумент --detach для запуска рабочего в качестве демона в фоновом режиме.

  • app.events.Receiver теперь устанавливает поле local_received для входящих событий, которое устанавливается на время, когда событие было получено.

  • app.events.Dispatcher теперь принимает аргумент groups, который определяет белый список групп событий, которые будут отправлены.

    Тип события - это строка, разделенная символами „-“, где часть перед первым символом „-“ - это группа. В настоящее время существует только две группы: worker и task.

    Диспетчер, инстанцированный следующим образом:

    >>> app.events.Dispatcher(connection, groups=['worker'])
    

    будет отправлять только события, связанные с рабочими, и молча отбрасывать любые попытки отправки событий, связанных с любой другой группой.

  • Новая настройка BROKER_FAILOVER_STRATEGY.

    Этот параметр может быть использован для изменения стратегии обхода отказа транспорта, может быть либо вызываемым параметром, возвращающим итерабельную переменную, либо именем встроенной в Kombu стратегии обхода отказа. По умолчанию это «round-robin».

    При участии Мэтта Уайза.

  • Result.revoke больше не будет ждать ответов.

    Вы можете добавить аргумент reply=True, если вы действительно хотите дождаться ответов от рабочих.

  • Улучшена поддержка задач link и link_error для аккордов.

    Внесено Стивом Морином.

  • Рабочий: Теперь выдает предупреждение, если параметр CELERYD_POOL установлен для включения пулов eventlet/gevent.

    Опция -P всегда должна использоваться для выбора пула событий/событий, чтобы гарантировать, что исправления будут применены как можно раньше.

    Если вы запускаете рабочий в обертке (например, в Django manage.py), то вы должны применять патчи вручную, например, создав альтернативную обертку, которая обезьянничает патчи в начале программы перед импортом любых других модулей.

  • Теперь есть команда „inspect clock“, которая будет собирать текущее значение логических часов из рабочих.

  • celery inspect stats теперь содержит идентификатор основного процесса рабочего.

    При участии Мгера Мовсисяна.

  • Новая команда удаленного управления для сброса рабочей конфигурации.

    Пример:

    $ celery inspect conf
    

    Значения конфигурации будут преобразованы в значения, поддерживаемые JSON, где это возможно.

    При участии Мгера Мовсисяна.

  • Новые настройки CELERY_EVENT_QUEUE_TTL и << 1 >>>.

    Они контролируют, когда очередь событий мониторов будет удалена, и как долго будут видны события, опубликованные в этой очереди. Поддерживается только в RabbitMQ.

  • Новый бэкенд результатов Couchbase.

    Этот бэкенд результатов позволяет хранить и извлекать результаты задач, используя Couchbase.

    Смотрите Настройки бэкенда Couchbase для получения дополнительной информации о настройке этого бэкенда результатов.

    При участии Алена Масьеро.

  • CentOS init-script теперь поддерживает запуск нескольких рабочих экземпляров.

    Подробности см. в заголовке сценария.

    Внесено Джонатаном Джорданом.

  • AsyncResult.iter_native теперь устанавливает параметр интервала по умолчанию на 0,5

    Исправление предоставлено Иданом Камара

  • Новая настройка BROKER_LOGIN_METHOD.

    Этот параметр можно использовать для указания альтернативного метода входа в систему для AMQP-транспорта.

    При участии Адриена Гине

  • Команда удаленного управления dump_conf теперь будет выдавать строковое представление для типов, не совместимых с JSON.

  • Функция celery.security.setup_security теперь app.setup_security().

  • Повторное выполнение задания теперь распространяет значение истечения срока действия сообщения (проблема #980).

    Значение пересылается в is, поэтому время истечения срока действия не изменится. Чтобы обновить время истечения срока действия, нужно передать новый аргумент expires в retry().

  • Worker теперь аварийно завершается при возникновении ошибки канала.

    Ошибки канала зависят от транспорта и представляют собой список исключений, возвращаемых Connection.channel_errors. Для RabbitMQ это означает, что Celery аварийно завершит работу, если проверка эквивалентности для одной из очередей в CELERY_QUEUES не совпадает, что имеет смысл, поскольку это сценарий, где требуется ручное вмешательство.

  • Вызов AsyncResult.get() на цепочке теперь распространяет ошибки для предыдущих задач (проблема #1014).

  • Родительский атрибут AsyncResult теперь восстанавливается при использовании сериализации JSON (проблема #1014).

  • Журналы отключения рабочих теперь регистрируются со степенью предупреждения вместо ошибки.

    При участии Криса Адамса.

  • events.State больше не аварийно завершается при получении неизвестных типов событий.

  • SQLAlchemy Result Backend: Новая настройка CELERY_RESULT_DB_TABLENAMES может быть использована для изменения имени используемых таблиц базы данных.

    При участии Райана Петрелло.

  • SQLAlchemy Result Backend: Теперь вызывает enginge.dispose после форка

    (Выпуск #1564).

    Если вы создаете свои собственные движки SQLAlchemy, то вы также должны убедиться, что они закрываются после форка в рабочем:

    from multiprocessing.util import register_after_fork
    
    engine = create_engine(*engine_args)
    register_after_fork(engine, engine.dispose)
    
  • Был написан набор нагрузочных тестов для Celery worker.

    Он находится в каталоге funtests/stress в репозитории git. Там есть файл README для начала работы.

  • Регистратор с именем celery.concurrency был переименован в celery.pool.

  • Новая утилита командной строки celery graph.

    Эта утилита создает графики в формате GraphViz dot.

    Вы можете создавать графики на основе установленных в данный момент загрузок:

    # Create graph of currently installed bootsteps in both the worker
    # and consumer name-spaces.
    $ celery graph bootsteps | dot -T png -o steps.png
    
    # Graph of the consumer name-space only.
    $ celery graph bootsteps consumer | dot -T png -o consumer_only.png
    
    # Graph of the worker name-space only.
    $ celery graph bootsteps worker | dot -T png -o worker_only.png
    

    Или графики рабочих в кластере:

    # Create graph from the current cluster
    $ celery graph workers | dot -T png -o workers.png
    
    # Create graph from a specified list of workers
    $ celery graph workers nodes:w1,w2,w3 | dot -T png workers.png
    
    # also specify the number of threads in each worker
    $ celery graph workers nodes:w1,w2,w3 threads:2,4,6
    
    # …also specify the broker and backend URLs shown in the graph
    $ celery graph workers broker:amqp:// backend:redis://
    
    # …also specify the max number of workers/threads shown (wmax/tmax),
    # enumerating anything that exceeds that number.
    $ celery graph workers wmax:10 tmax:3
    
  • Изменен способ маринования экземпляров приложений.

    Теперь приложения могут определять метод __reduce_keys__, который используется вместо старого атрибута AppPickler. Например, если ваше приложение определяет пользовательский атрибут „foo“, который должен быть сохранен при мариновании, вы можете определить __reduce_keys__ как таковой:

    import celery
    
    class Celery(celery.Celery):
    
        def __init__(self, *args, **kwargs):
            super(Celery, self).__init__(*args, **kwargs)
            self.foo = kwargs.get('foo')
    
        def __reduce_keys__(self):
            return super(Celery, self).__reduce_keys__().update(
                foo=self.foo,
            )
    

    Это гораздо более удобный способ добавить поддержку пикинга пользовательских атрибутов. Старый вариант AppPickler все еще поддерживается, но его использование не рекомендуется, и мы хотели бы убрать его в будущей версии.

  • Возможность трассировки импорта в целях отладки.

    Параметр C_IMPDEBUG может быть установлен для отслеживания импорта по мере его возникновения:

    $ C_IMDEBUG=1 celery worker -l info
    
    $ C_IMPDEBUG=1 celery shell
    
  • Заголовки сообщений теперь доступны как часть запроса задания.

    Пример добавления и извлечения значения заголовка:

    @app.task(bind=True)
    def t(self):
        return self.request.headers.get('sender')
    
    >>> t.apply_async(headers={'sender': 'George Costanza'})
    
  • Новый сигнал before_task_publish диспетчеризируется перед отправкой сообщения задачи и может быть использован для изменения полей конечного сообщения (выпуск #1281).

  • Новый сигнал after_task_publish заменяет старый сигнал task_sent.

    Сигнал task_sent теперь устарел и не должен использоваться.

  • Новый сигнал worker_process_shutdown рассылается в дочерних процессах пула prefork при их выходе.

    Внесен Даниэлем М Таубом.

  • celery.platforms.PIDFile переименовано в celery.platforms.Pidfile.

  • Бэкенд MongoDB: Теперь можно настраивать с помощью URL:

  • MongoDB Backend: Больше не используется устаревшее pymongo.Connection.

  • Бэкенд MongoDB: Теперь отключается auto_start_request.

  • Бэкенд MongoDB: Теперь включает use_greenlets при использовании eventlet/gevent.

  • subtask() / maybe_subtask() переименованы в signature()/<< 3 >>>.

    Псевдонимы по-прежнему доступны для обратной совместимости.

  • Свойство correlation_id message теперь автоматически устанавливается на id задачи.

  • Поля сообщения задачи eta и << 1 >>> теперь включают информацию о часовом поясе.

  • Все методы result backends store_result/mark_as_* теперь должны принимать аргумент в виде ключевого слова request.

  • События теперь выдают предупреждение, если используется неработающая библиотека yajl.

  • Сигнал celeryd_init теперь принимает дополнительный аргумент в виде ключевого слова: option.

    Это отображение разобранных аргументов командной строки, которое можно использовать для подготовки новых аргументов предварительной загрузки (app.user_options['preload']).

  • Новый обратный вызов: app.on_configure().

    Этот обратный вызов вызывается, когда приложение собирается сконфигурировать (требуется ключ конфигурации).

  • Рабочий: Больше не вилка на HUP.

    Это означает, что рабочий будет повторно использовать один и тот же pid для лучшей поддержки внешних супервизоров процессов.

    При участии Джамиля Аль-Азиза.

  • Рабочий: Сообщение журнала Got task from broker было изменено на Received task .

  • Рабочий: Сообщение журнала Skipping revoked task было изменено на Discarding revoked task .

  • Оптимизация: Улучшена производительность ResultSet.join_native().

    При участии Стаса Рудакова.

  • Сигнал task_revoked теперь принимает новый аргумент request (проблема #1555).

    Сигнал revoked отправляется после удаления запроса задачи из стека, поэтому вместо него необходимо использовать объект Request для получения информации о задаче.

  • Worker: Новый аргумент командной строки -X для исключения очередей (проблема #1399).

    Аргумент -X является обратным аргументу -Q и принимает список очередей, которые нужно исключить (не потреблять из них):

    # Consume from all queues in CELERY_QUEUES, but not the 'foo' queue.
    $ celery worker -A proj -l info -X foo
    
  • Добавляет C_FAKEFORK переменную окружения для простой отладки init-script/celery multi.

    Это означает, что теперь вы можете это сделать:

    $ C_FAKEFORK=1 celery multi start 10
    

    или:

    $ C_FAKEFORK=1 /etc/init.d/celeryd start
    

    чтобы избежать шага демонизации для просмотра ошибок, которые не видны из-за отсутствия stdout/stderr.

    В общий init-скрипт была добавлена команда dryrun, которая включает эту опцию.

  • Новый публичный API для выталкивания и выталкивания из текущего стека задач:

    celery.app.push_current_task() и celery.app.pop_current_task`().

  • RetryTaskError было переименовано в Retry.

    Старое название все еще доступно для обратной совместимости.

  • Новое полупредикатное исключение Reject.

    Это исключение может быть поднято в reject/requeue сообщении задачи, примеры см. в Отклонить.

  • Semipredicates документировано: (Retry/Ignore/Reject).

Плановые демонтажи

  • Настройка BROKER_INSIST и аргумент insist для ~@connection больше не поддерживаются.

  • Настройка CELERY_AMQP_TASK_RESULT_CONNECTION_MAX больше не поддерживается.

    Вместо этого используйте BROKER_POOL_LIMIT.

  • Настройка CELERY_TASK_ERROR_WHITELIST больше не поддерживается.

    Вместо этого следует установить атрибут ErrorMail класса задачи. Вы также можете сделать это с помощью CELERY_ANNOTATIONS:

    from celery import Celery
    from celery.utils.mail import ErrorMail
    
    class MyErrorMail(ErrorMail):
        whitelist = (KeyError, ImportError)
    
        def should_send(self, context, exc):
            return isinstance(exc, self.whitelist)
    
    app = Celery()
    app.conf.CELERY_ANNOTATIONS = {
        '*': {
            'ErrorMail': MyErrorMails,
        }
    }
    
  • Функции, создающие соединения брокера, больше не поддерживают аргумент connect_timeout.

    Теперь его можно установить только с помощью параметра BROKER_CONNECTION_TIMEOUT. Это связано с тем, что функции больше не создают соединения напрямую, а получают их из пула соединений.

  • Настройка CELERY_AMQP_TASK_RESULT_EXPIRES больше не поддерживается.

    Вместо этого используйте CELERY_TASK_RESULT_EXPIRES.

Изменения в сроках амортизации

См. Временная линия устаревания Celery.

Исправления

  • AMQP Backend: join не преобразовывал исключения при использовании сериализатора json.

  • Неабстрактные классы задач теперь совместно используются приложениями (выпуск #1150).

    Обратите внимание, что неабстрактные классы задач не должны использоваться в новом API. Пользовательские классы задач следует создавать только при использовании их в качестве базового класса в декораторе @task.

    Это исправление обеспечивает обратную совместимость со старыми версиями Celery, так что неабстрактные классы задач работают, даже если модуль импортируется несколько раз, так что приложение также инстанцируется несколько раз.

  • Рабочий: Обходное решение для ошибок Unicode в журналах (проблема #427).

  • Методы задач: .apply_async теперь работает правильно, если список args равен None (проблема #1459).

  • Пулы Eventlet/gevent/solo/threads теперь правильно обрабатывают ошибки BaseException, вызванные задачами.

  • <<< Команды дистанционного управления autoscale и pool_grow/<< 2 >>> теперь также будут автоматически увеличивать и уменьшать счетчик предварительной выборки потребителей.

    Исправление внесено Дэниелом М. Таубом.

  • Команды celery control pool_ не приводили строковые аргументы к int.

  • Аккорды Redis/Cache: Результат обратного вызова теперь устанавливается на отказ, если группа исчезла из базы данных (проблема #1094).

  • Рабочий: Теперь следит за тем, чтобы процесс выключения не инициировался более одного раза.

  • Программы: celery multi теперь правильно обрабатывает опции -f и << 2 >>> (проблема #1541).

Внутренние изменения

  • Модуль celery.task.trace был переименован в celery.app.trace.

  • Модуль celery.concurrency.processes был переименован в celery.concurrency.prefork.

  • Классы, которые больше не возвращаются к использованию приложения по умолчанию:

    Это означает, что при инстанцировании этих классов вы должны передать определенное приложение.

  • EventDispatcher.copy_buffer переименовано в app.events.Dispatcher.extend_buffer().

  • Удален неиспользуемый и никогда не документированный глобальный экземпляр celery.events.state.state.

  • app.events.Receiver теперь является подклассом kombu.mixins.ConsumerMixin.

  • celery.apps.worker.Worker был отрефакторен как подкласс celery.worker.WorkController.

    Это устраняет множество дублирующих функций.

  • Метод Celery.with_default_connection был удален в пользу with app.connection_or_acquire (app.connection_or_acquire())

  • Класс celery.results.BaseDictBackend был удален и заменен на celery.results.BaseBackend.

Вернуться на верх