Что нового в Celery 4.0 (latentcall)

Автор:

Спросите Солема (ask at celeryproject.org)

Celery - это простая, гибкая и надежная распределенная система для обработки огромного количества сообщений, предоставляющая операторам инструменты, необходимые для обслуживания такой системы.

Это очередь задач, ориентированная на обработку в реальном времени, а также поддерживающая планирование задач.

Celery имеет большое и разнообразное сообщество пользователей и разработчиков, вам стоит присоединиться к нам on IRC или << 1 >>>.

Чтобы узнать больше о сельдерее, вам следует перейти по ссылке introduction.

Хотя эта версия обратно совместима с предыдущими версиями, важно, чтобы вы прочитали следующий раздел.

Эта версия официально поддерживается на CPython 2.7, 3.4 и 3.5. и также поддерживается на PyPy.

Предисловие

Добро пожаловать в Celery 4!

Это масштабный релиз с изменениями более чем двухлетней давности. Он не только содержит множество новых функций, но и исправляет огромный список ошибок, поэтому во многих отношениях его можно назвать «Снежным барсом».

Следующая основная версия Celery будет поддерживать только Python 3.5, где мы планируем использовать преимущества новой библиотеки asyncio.

Этот выпуск был бы невозможен без поддержки моего работодателя, Robinhood (мы принимаем на работу!).

  • Спросите Солема

Посвящается Себастьяну «Зебу» Бьорнеруду (RIP), с особой благодарностью Ty Wilkins, за разработку нашего нового логотипа, всем соавторам, которые помогли сделать это, и моим коллегам из Robinhood.

Стена вкладчиков

Аарон МакМиллин, Адам Чейнз, Адам Ренберг, Адриано Мартинс де Хесус, Адриан Гине, Ахмет Демир, Айтор Гомес-Гуари, Алан Жустино, Альберт Ванг, Алекс Кошелев, Алекс Рэттрей, Алекс Уильямс, Александр Кошелев, Александр Лебедев, Александр Обловатный, Алексей Котляров, Али Бозоргхан, Алиса Зоя Беван-МакГрегор, Аллард Хове, Алман Один, Амир Рустамзаде, Андреа Раббаглиетти, Андреа Роза, Андрей Фокау, Андрей Родионов, Андрей Стюарт, Андрей Юрчук, Анейл Маллаварапу, Арески Белаид, Арменак Бабурян, Артур Вюйяр, Артём Коваль, Асиф Сайфуддин Ауви, Аск Солем, Бальтазар Рубероль, Батист Билер, Беркер Пексаг, Берт Вандербауведе, Брендан Смитиман, Брайан Боутерс, Брайс Грофф, Кэмерон Уилл, ЧангБо Гуо, Крис Кларк, Крис Дьюри, Крис Эрвей, Крис Харрис, Крис Мартин, Чиллар Ананд, Колин Макинтош, Конрад Крамер, Кори Фарвелл, Крейг Джеллик, Каллен Роудс, Даллас Марлоу, Дэниел Девайн, Дэниел Уоллес, Данило Барген, Даванум Шринивас, Дейв Смит, Дэвид Баумголд, Дэвид Харриган, Дэвид Правек, Деннис Бракхан, Дерек Андерсон, Дмитрий Дыгало, Дмитрий Малиновский, Донгвейминг, Дудаш Адам, Дастин Дж. Митчелл, Эд Морли, Эдвард Беттс, Элои Ривар, Эммануэль Казенаве, Фахад Сиддики, Фатих Суку, Феанил Патель, Федерико Фикарелли, Феликс Шварц, Феликс Ян, Фернандо Роча, Флавио Гросси, Франтишек Холоп, Гао Цзянмяо, Джордж Уивелл, Джеральд Манипон, Жиль Дартигелонгу, Джино Ледесма, Грег Уилбур, Гийом Сеген, Хэнк Джон, Хогни Гилфасон, Илья Георгиевский, Ионел Кристиан Мэрэриш, Иван Ларин, Джеймс Пулек, Джаред Льюис, Джейсон Витч, Джаспер Брайант-Грин, Джефф Уидман, Джереми Тиллман, Джереми Зафран, Джослин Делаланде, Джо Джевник, Джо Санфорд, Джон Андерсон, Джон Бархэм, Джон Киркхэм, Джон Уитлок, Джонатан Ванаско, Джошуа Харлоу, Жуан Рикардо, Хуан Карлос Феррер, Хуан Росси, Джастин Патрин, Кай Гронер, Кевин Харви, Кевин Ричардсон, Кому Вайрагу, Константинос Коукопулос, Коухей Маеда, Красекумар Рамараджу, Кшиштоф Буйневич, Латиция М. Haskins, Len Buckens, Lev Berman, lidongming, Lorenzo Mancini, Lucas Wiman, Luke Pomfrey, Luyun Xie, Maciej Obuchowski, Manuel Kaufmann, Marat Sharafutdinov, Marc Sibson, Marcio Ribeiro, Marin Atanasov Nikolov, Mathieu Fenniak, Mark Parncutt, Mauro Rocco, Максим Бошемин, Максим Вдб, Мгер Мовсисян, Майкл Акилина, Майкл Дуэйн Муринг, Майкл Пермана, Микаэль Пенхард, Майк Эттвуд, Митчел Хамферис, Мохамед Абуэльсауд, Моррис Твид, Мортон Фокс, Моше ван дер Стерр, Нат Уильямс, Натан Ван Гхим, Николя Унравел, Ник Найби, Омер Кац, Омер Корнер, Ори Хох, Пол Пирс, Пауло Бу, Павел Капышин, Филип Гарнеро, Пьер Ферсинг, Петр Кильчук, Петр Маслянка, Квентин Праде, Радек Чайка, Рагурам Шринивасан, Рэнди Барлоу, Рафаэль Мишель, Реми Леоне, Роберт Куп, Роберт Колба, Рокаллит Вульф, Родольфо Карвальо, Роджер Ху, Ромуальд Брюне, Ронгзе Жу, Росс Дин, Райан Лаки, Реми Грейнхофер, Самуэль Жиффар, Самуэль Жайле, Сергей Азовсков, Сергей Тихонов, Сынха Ким, Саймон Пеетерс, Спенсер Е. Olson, Srinivas Garlapati, Stephen Milner, Steve Peak, Steven Sklar, Stuart Axon, Sukrit Khera, Tadej Janež, Taha Jahangir, Takeshi Kanemoto, Tayfun Sen, Tewfik Sadaoui, Thomas French, Thomas Grainger, Tomas Machalek, Tobias Schottdorf, Tocho Tochev, Valentyn Klindukh, Vic Kumar, Vladimir Bolshakov, Vladimir Gorbunov, Уэйн Чанг, Виланд Хоффманн, Видо ден Холландер, Уил Лэнгфорд, Уилл Томпсон, Уильям Кинг, Юрий Селиванов, Витис Банайтис, Зоран Павлович, Синь Ли, 許邱翔, @allenling, @alzeih, @bastb, @bee-keeper, @ffeast, @firefly4268, @flyingfoxlee, @gdw2, @gitaarik, @hankjin, @lvh, @m-vdb, @kindule, @mdk: , @michael-k, @mozillazg, @nokrik, @ocean1, @orlo666, @raducc, @wanglei, @worldexception, @xBeAsTx.

Примечание

Эта стена была автоматически сгенерирована из истории git, поэтому, к сожалению, она не включает людей, которые помогают в более важных вещах, таких как ответы на вопросы списка рассылки.

Обновление с Celery 3.1

Шаг 1: Обновление до Celery 3.1.25

Если вы еще не сделали этого, первым шагом будет обновление до Celery 3.1.25.

Эта версия добавляет совместимость с новым протоколом сообщений, так что вы можете постепенно переходить с версии 3.1 на версию 4.0.

Сначала разверните рабочих, обновив их до версии 3.1.25. Это означает, что эти рабочие могут обрабатывать сообщения, отправленные клиентами, использующими версии 3.1 и 4.0.

После обновления рабочих систем вы можете обновить клиентов (например, веб-серверы).

Шаг 2: Обновите конфигурацию с новыми именами параметров

В этой версии радикально изменены названия параметров конфигурации, чтобы они были более согласованными.

Изменения полностью обратно совместимы, поэтому у вас есть возможность подождать, пока старые названия настроек не будут устаревшими, но для облегчения перехода мы включили утилиту командной строки, которая автоматически переписывает ваши настройки.

Дополнительную информацию см. в разделе Имена настроек в нижнем регистре.

Шаг 3: Прочитайте важные примечания в этом документе

Убедитесь, что вас не коснется ни одно из важных замечаний по обновлению, упомянутых в следующем разделе.

Особенно важно отметить, что Celery теперь проверяет аргументы, которые вы отправляете в задачу, сопоставляя их с сигнатурой (Проверка аргументов задачи).

Шаг 4: Обновление до Celery 4.0

В этот момент вы можете обновить своих работников и клиентов новой версией.

Важные замечания

Отказано в поддержке Python 2.6

Celery теперь требует Python 2.7 или более поздней версии, а также отказывается от поддержки Python 3.3, поэтому поддерживаются следующие версии:

  • CPython 2.7

  • CPython 3.4

  • CPython 3.5

  • PyPy 5.4 (pypy2)

  • PyPy 5.5-alpha (pypy3)

Последняя основная версия с поддержкой Python 2

Начиная с Celery 5.0 будет поддерживаться только Python 3.5+.

Чтобы убедиться, что вас не затронет это изменение, вам следует привязать версию Celery в вашем файле требований либо к конкретной версии: celery==4.0.0, либо к диапазону: celery>=4.0,<5.0.

Отказ от поддержки Python 2 позволит нам удалить огромное количество кода совместимости, а переход на Python 3.5 позволит нам использовать преимущества типизации, async/await, asyncio и подобных концепций, которым нет альтернативы в старых версиях.

Celery 4.x будет продолжать работать на Python 2.7, 3.4, 3.5; так же как Celery 3.x продолжает работать на Python 2.6.

Поддержка Django

Celery 4.x требует Django 1.8 или более поздней версии, но мы действительно рекомендуем использовать по крайней мере Django 1.9 для новой функции transaction.on_commit.

Частой проблемой при вызове задач из Django является ситуация, когда задача связана с изменением модели, и вы хотите отменить задачу, если транзакция будет откатана, или обеспечить выполнение задачи только после того, как изменения будут записаны в базу данных.

transaction.atomic позволяет решить эту проблему, добавив задачу в качестве обратного вызова, который будет вызываться только при фиксации транзакции.

Пример использования:

from functools import partial
from django.db import transaction

from .models import Article, Log
from .tasks import send_article_created_notification

def create_article(request):
    with transaction.atomic():
        article = Article.objects.create(**request.POST)
        # send this task only if the rest of the transaction succeeds.
        transaction.on_commit(partial(
            send_article_created_notification.delay, article_id=article.pk))
        Log.objects.create(type=Log.ARTICLE_CREATED, object_pk=article.pk)

Удаленные функции

  • Microsoft Windows больше не поддерживается.

    Набор тестов пройден, и Celery, похоже, работает с Windows, но мы не даем никаких гарантий, поскольку не можем диагностировать проблемы на этой платформе. Если вы являетесь компанией, нуждающейся в поддержке на этой платформе, пожалуйста, свяжитесь с нами.

  • Jython больше не поддерживается.

Характеристики удалены для простоты

  • Механизм задания Webhook (celery.task.http) был удален.

    В настоящее время легко использовать модуль requests для написания задач webhook вручную. Мы бы с удовольствием использовали запросы, но мы просто не можем этого сделать, так как в сообществе Python есть очень активная толпа «против зависимостей».

    Если вам нужна обратная совместимость, вы можете просто скопировать + вставить версию модуля 3.1 и убедиться, что он импортирован рабочим: https://github.com/celery/celery/blob/3.1/celery/task/http.py.

  • Задачи больше не отправляют сообщения об ошибках.

    Это также удаляет поддержку app.mail_admins, и любую функциональность, связанную с отправкой электронной почты.

  • celery.contrib.batches был удален.

    Это была экспериментальная функция, поэтому на нее не распространяются наши гарантии по срокам вывода из эксплуатации.

    Вы можете скопировать и использовать существующий код батчей для использования в своих проектах: https://github.com/celery/celery/blob/3.1/celery/contrib/batches.py.

Функции удалены из-за отсутствия финансирования

В релизе 3.1 мы объявили, что некоторые транспорты переведены в статус экспериментальных, и что официальной поддержки этих транспортов не будет.

Поскольку этот тонкий намек на необходимость финансирования не удался, мы полностью убрали их, нарушив обратную совместимость.

  • Использование Django ORM в качестве брокера больше не поддерживается.

    Вы все еще можете использовать Django ORM в качестве бэкенда результатов: смотрите раздел django-celery-results - Использование Django ORM/Cache в качестве бэкенда результатов для получения дополнительной информации.

  • Использование SQLAlchemy в качестве брокера больше не поддерживается.

    Вы по-прежнему можете использовать SQLAlchemy в качестве бэкенда результатов.

  • Использование CouchDB в качестве брокера больше не поддерживается.

    Вы по-прежнему можете использовать CouchDB в качестве бэкенда результатов.

  • Использование IronMQ в качестве брокера больше не поддерживается.

  • Использование Beanstalk в качестве брокера больше не поддерживается.

Кроме того, некоторые функции были полностью удалены, так что при попытке их использования будет возникать исключение:

  • Функция --autoreload была удалена.

    Это была экспериментальная функция, и на нее не распространяются наши гарантии по срокам изъятия. Флаг полностью удален, поэтому при его наличии рабочий будет аварийно завершать работу при запуске. К счастью, этот флаг не используется в производственных системах.

  • Экспериментальный пул threads больше не поддерживается и был удален.

  • Функция force_execv больше не поддерживается.

    Команда celery worker теперь игнорирует установки --no-execv, --force-execv и CELERYD_FORCE_EXECV.

    В версии 5.0 этот флаг будет полностью удален, и рабочий будет выдавать ошибку.

  • Старый унаследованный бэкенд результатов «amqp» был устаревшим и будет удален в Celery 5.0.

    Пожалуйста, используйте бэкенд результатов rpc для вызовов в стиле RPC, и постоянный бэкенд результатов для многопользовательских результатов.

Мы считаем, что большинство из них можно исправить без значительных усилий, поэтому если вы заинтересованы в возвращении какой-либо из этих функций, пожалуйста, свяжитесь с нами.

А теперь хорошие новости

Новый протокол сообщений о задачах

В этой версии представлен совершенно новый протокол сообщений о задачах - первое серьезное изменение протокола с начала проекта.

Новый протокол включен по умолчанию в этой версии, и поскольку новая версия не имеет обратной совместимости, вы должны быть осторожны при обновлении.

Версия 3.1.25 была выпущена для обеспечения совместимости с новым протоколом, поэтому самый простой способ обновления - сначала перейти на эту версию, а затем перейти на 4.0 в ходе второго развертывания.

Если вы хотите продолжать использовать старый протокол, вы также можете настроить номер используемой версии протокола:

app = Celery()
app.conf.task_protocol = 1

Подробнее о возможностях, доступных в новом протоколе, читайте в разделе новостей, расположенном далее в этом документе.

Имена настроек в нижнем регистре

В погоне за красотой все настройки теперь переименованы в строчные буквы, а названия некоторых настроек были переименованы для единообразия.

Это изменение полностью обратно совместимо, поэтому вы по-прежнему можете использовать имена настроек в верхнем регистре, но мы хотели бы, чтобы вы обновились как можно скорее, и вы можете сделать это автоматически с помощью команды celery upgrade settings:

$ celery upgrade settings proj/settings.py

Эта команда изменит ваш модуль на месте, чтобы использовать новые имена в нижнем регистре (если вы хотите использовать верхний регистр с префиксом «CELERY», см. блок ниже), и сохранит резервную копию в proj/settings.py.orig.

Для пользователей Django и тех, кто хочет сохранить имена в верхнем регистре

Если вы загружаете конфигурацию Celery из модуля настроек Django, то вам лучше использовать имена в верхнем регистре.

Вы также хотите использовать префикс CELERY_, чтобы настройки Celery не столкнулись с настройками Django, используемыми другими приложениями.

Для этого сначала нужно преобразовать файл настроек для использования новой согласованной схемы именования и добавить префикс ко всем настройкам, связанным с Celery:

$ celery upgrade settings proj/settings.py --django

После обновления файла настроек вам необходимо явно установить префикс в вашем модуле proj/celery.py:

app.config_from_object('django.conf:settings', namespace='CELERY')

Самый актуальный пример интеграции Django Celery вы можете найти здесь: Первые шаги в работе с Django.

Примечание

Это также добавит префикс к настройкам, которые ранее не имели его, например, BROKER_URL должно быть написано CELERY_BROKER_URL с пространством имен CELERY CELERY_BROKER_URL.

К счастью, вам не придется вручную изменять файлы, так как программа celery upgrade settings --django сделает все правильно.

Загрузчик попытается определить, использует ли ваша конфигурация новый формат, и будет действовать соответственно, но это также означает, что вам не разрешается смешивать и сопоставлять новые и старые имена параметров, если только вы не предоставите значение для обеих альтернатив.

Основным отличием от предыдущих версий, помимо имен в нижнем регистре, является переименование некоторых префиксов, например, celerybeat_ в beat_, celeryd_ в worker_.

Префикс celery_ также был удален, и параметры, связанные с задачами, из этого пространства имен теперь имеют префикс task_, а параметры, связанные с рабочими - worker_.

Кроме этого, большинство настроек будут такими же в нижнем регистре, за исключением нескольких специальных:

Имя установки

Заменить на

CELERY_MAX_CACHED_RESULTS

result_cache_max

CELERY_MESSAGE_COMPRESSION

result_compression/task_compression.

CELERY_TASK_RESULT_EXPIRES

result_expires

CELERY_RESULT_DBURI

result_backend

CELERY_RESULT_ENGINE_OPTIONS

database_engine_options

-*-_DB_SHORT_LIVED_SESSIONS

database_short_lived_sessions

CELERY_RESULT_DB_TABLE_NAMES

database_db_names

CELERY_ACKS_LATE

task_acks_late

CELERY_ALWAYS_EAGER

task_always_eager

CELERY_ANNOTATIONS

task_annotations

CELERY_MESSAGE_COMPRESSION

task_compression

CELERY_CREATE_MISSING_QUEUES

task_create_missing_queues

CELERY_DEFAULT_DELIVERY_MODE

task_default_delivery_mode

CELERY_DEFAULT_EXCHANGE

task_default_exchange

CELERY_DEFAULT_EXCHANGE_TYPE

task_default_exchange_type

CELERY_DEFAULT_QUEUE

task_default_queue

CELERY_DEFAULT_RATE_LIMIT

task_default_rate_limit

CELERY_DEFAULT_ROUTING_KEY

task_default_routing_key

-"-_EAGER_PROPAGATES_EXCEPTIONS

task_eager_propagates

CELERY_IGNORE_RESULT

task_ignore_result

CELERY_TASK_PUBLISH_RETRY

task_publish_retry

CELERY_TASK_PUBLISH_RETRY_POLICY

task_publish_retry_policy

CELERY_QUEUES

task_queues

CELERY_ROUTES

task_routes

CELERY_SEND_TASK_SENT_EVENT

task_send_sent_event

CELERY_TASK_SERIALIZER

task_serializer

CELERYD_TASK_SOFT_TIME_LIMIT

task_soft_time_limit

CELERYD_TASK_TIME_LIMIT

task_time_limit

CELERY_TRACK_STARTED

task_track_started

CELERY_DISABLE_RATE_LIMITS

worker_disable_rate_limits

CELERY_ENABLE_REMOTE_CONTROL

worker_enable_remote_control

CELERYD_SEND_EVENTS

worker_send_task_events

Полную таблицу изменений можно посмотреть в Новые настройки строчных букв.

Json теперь является сериализатором по умолчанию

Наконец-то пришло время положить конец господству pickle в качестве механизма сериализации по умолчанию, и начиная с этой версии сериализатором по умолчанию является json.

Это изменение было announced with the release of Celery 3.1.

Если вы все еще полагаетесь на pickle в качестве сериализатора по умолчанию, то вам необходимо настроить свое приложение перед обновлением до версии 4.0:

task_serializer = 'pickle'
result_serializer = 'pickle'
accept_content = {'pickle'}

Сериализатор Json теперь также поддерживает некоторые дополнительные типы:

  • datetime, time, date

    Преобразуется в json-текст в формате ISO-8601.

  • Decimal

    Преобразуется в текст в формате json.

  • django.utils.functional.Promise

    Только для Django: Ленивые строки, используемые для перевода и т.д., оцениваются и пытаются преобразоваться в тип json.

  • uuid.UUID

    Преобразуется в текст в формате json.

Вы также можете определить метод __json__ в ваших пользовательских классах для поддержки сериализации JSON (должен возвращать тип, совместимый с json):

class Person:
    first_name = None
    last_name = None
    address = None

    def __json__(self):
        return {
            'first_name': self.first_name,
            'last_name': self.last_name,
            'address': self.address,
        }

Базовый класс Task больше не регистрирует задачи автоматически

Класс Task больше не использует специальный мета-класс, который автоматически регистрирует задачу в реестре задач.

Вместо этого теперь это обрабатывается декораторами app.task.

Если вы все еще используете задания, основанные на классах, то вам нужно зарегистрировать их вручную:

class CustomTask(Task):
    def run(self):
        print('running')
CustomTask = app.register_task(CustomTask())

Лучшей практикой является использование пользовательских классов задач только для переопределения общего поведения, а затем использование декоратора задач для реализации задачи:

@app.task(bind=True, base=CustomTask)
def custom(self):
    print('running')

Это изменение также означает, что атрибут задания abstract больше не имеет никакого эффекта.

Проверка аргументов задачи

Аргументы задачи теперь проверяются при вызове задачи, даже асинхронно:

>>> @app.task
... def add(x, y):
...     return x + y

>>> add.delay(8, 8)
<AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c>

>>> add.delay(8)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "celery/app/task.py", line 376, in delay
    return self.apply_async(args, kwargs)
  File "celery/app/task.py", line 485, in apply_async
    check_arguments(*(args or ()), **(kwargs or {}))
TypeError: add() takes exactly 2 arguments (1 given)

Вы можете отключить проверку аргументов для любой задачи, установив ее атрибут typing в значение False:

>>> @app.task(typing=False)
... def add(x, y):
...     return x + y

Или, если вы хотите полностью отключить это для всех задач, вы можете передать strict_typing=False при создании приложения:

app = Celery(..., strict_typing=False)

Redis Events не имеет обратной совместимости

Опции транспорта Redis fanout_patterns и << 1 >>> теперь включены по умолчанию.

Работники/мониторы без включенных флагов не смогут видеть работников с отключенным флагом. Они по-прежнему могут выполнять задания, но не смогут получать сообщения мониторинга друг друга.

Вы можете выполнить обновление с обратной совместимостью, сначала настроив рабочие и мониторы 3.1 для включения настроек, перед окончательным обновлением до 4.0:

BROKER_TRANSPORT_OPTIONS = {
    'fanout_patterns': True,
    'fanout_prefix': True,
}

Пересмотр приоритетов Redis

Приоритет 0 теперь самый низкий, 9 - самый высокий.

Это изменение было сделано для того, чтобы поддержка приоритетов соответствовала тому, как она работает в AMQP.

Внесено Алексом Кошелевым.

Django: Автообнаружение теперь поддерживает конфигурации приложений Django

Теперь функцию autodiscover_tasks() можно вызывать без аргументов, и обработчик Django автоматически найдет ваши установленные приложения:

app.autodiscover_tasks()

Интеграция Django example in the documentation была обновлена для использования вызова без аргументов.

Это также обеспечивает совместимость с новыми, эээ, AppConfig вещами, появившимися в последних версиях Django.

Прямые очереди рабочих больше не используют автоудаление

Рабочие/клиенты, работающие под управлением версии 4.0, больше не смогут отправлять прямые сообщения рабочим, работающим под управлением более старых версий, и наоборот.

Если вы полагаетесь на прямые сообщения рабочих, вам следует сначала обновить рабочих и клиентов 3.x, чтобы они использовали новые настройки маршрутизации, заменив celery.utils.worker_direct() на эту реализацию:

from kombu import Exchange, Queue

worker_direct_exchange = Exchange('C.dq2')

def worker_direct(hostname):
    return Queue(
        '{hostname}.dq2'.format(hostname),
        exchange=worker_direct_exchange,
        routing_key=hostname,
    )

Эта функция закрыла проблему #2492.

Удалены старые программы командной строки

При установке Celery больше не будут устанавливаться программы celeryd, celerybeat и celeryd-multi.

Об этом было объявлено с выходом Celery 3.1, но у вас могут оставаться скрипты, указывающие на старые имена, поэтому обязательно обновите их, чтобы использовать новую команду umbrella:

Программа

Новый статус

Замена

celeryd

УДАЛЕНО

celery worker

celerybeat

УДАЛЕНО

celery beat

celeryd-multi

УДАЛЕНО

celery multi

Новости

Основные моменты нового протокола

Новый протокол устраняет многие проблемы старого протокола и позволяет реализовать некоторые давно востребованные функции:

  • Большинство данных теперь отправляются в виде заголовков сообщений, а не сериализуются вместе с телом сообщения.

    В первой версии протокола рабочий всегда должен был десериализовать сообщение, чтобы иметь возможность прочитать мета-данные задачи, такие как идентификатор задачи, имя и т. д. Это также означало, что рабочий был вынужден дважды декодировать данные, сначала десериализуя сообщение при получении, снова сериализуя сообщение для отправки дочернему процессу, а затем дочерний процесс снова десериализует сообщение.

    Сохранение полей метаданных в заголовках сообщений означает, что рабочему не нужно декодировать полезную нагрузку перед передачей задания дочернему процессу, а также то, что теперь рабочий может перенаправить задание, написанное на языке, отличном от Python, другому рабочему.

  • Новый заголовок сообщения lang может быть использован для указания языка программирования, на котором написано задание.

  • Worker сохраняет результаты для внутренних ошибок, таких как ContentDisallowed, и других ошибок десериализации.

  • Worker сохраняет результаты и отправляет события мониторинга для незарегистрированных ошибок задачи.

  • Worker вызывает callbacks/errbacks, даже если результат отправляется родительским процессом (например, WorkerLostError при завершении дочернего процесса, ошибки десериализации, незарегистрированные задачи).

  • Новый заголовок origin содержит информацию о процессе, отправляющем задание (имя рабочего узла, или информацию о PID и имени хоста).

  • Новый заголовок shadow позволяет изменять имя задачи, используемое в журналах.

    Это полезно для диспетчеризации подобных шаблонов, например, задачи, которая вызывает любую функцию, использующую pickle (не делайте этого дома):

    from celery import Task
    from celery.utils.imports import qualname
    
    class call_as_task(Task):
    
        def shadow_name(self, args, kwargs, options):
            return 'call_as_task:{0}'.format(qualname(args[0]))
    
        def run(self, fun, *args, **kwargs):
            return fun(*args, **kwargs)
    call_as_task = app.register_task(call_as_task())
    
  • Новые поля argsrepr и kwargsrepr содержат текстовые представления аргументов задачи (возможно, усеченные) для использования в журналах, мониторах и т.д.

    Это означает, что рабочему не нужно десериализовывать полезную нагрузку сообщения, чтобы отобразить аргументы задачи в информационных целях.

  • Цепочки теперь используют специальное поле chain, что позволяет поддерживать цепочки из тысяч и более задач.

  • Новые заголовки parent_id и << 1 >>> добавляют информацию об отношениях задачи с другими задачами.

    • parent_id является идентификатором задачи, которая вызвала эту задачу

    • root_id - это первая задача в рабочем потоке.

    Эти поля могут быть использованы для улучшения мониторов, таких как цветок, для группировки связанных сообщений вместе (например, цепочки, группы, аккорды, полные рабочие потоки и т.д.).

  • app.TaskProducer заменяется на app.amqp.create_task_message() и app.amqp.send_task_message().

    Разделение обязанностей на создание и отправку означает, что людям, которые хотят отправлять сообщения, используя AMQP-клиент Python напрямую, не нужно реализовывать протокол.

    Метод app.amqp.create_task_message() вызывает либо app.amqp.as_task_v2(), либо app.amqp.as_task_v1() в зависимости от настроенного протокола задачи, и возвращает специальный кортеж task_message, содержащий заголовки, свойства и тело сообщения задачи.

См.также

Новый протокол задач полностью задокументирован здесь: Версия 2.

Улучшение бассейна в Префорке

Задачи теперь регистрируются в дочернем процессе

Логирование успеха/неудачи задачи теперь происходит из дочернего процесса, выполняющего задачу. В результате утилиты протоколирования, такие как Sentry, могут получить полную информацию о задачах, включая переменные в стеке трассировки.

-Ofair теперь является стратегией планирования по умолчанию

Чтобы снова включить поведение по умолчанию в версии 3.1, используйте опцию командной строки -Ofast.

Было много путаницы относительно того, что делает опция командной строки -Ofair, и использование термина «prefetch» в объяснениях, вероятно, не помогло, учитывая, насколько запутанной является эта терминология в AMQP.

Когда рабочий Celery, использующий пул prefork, получает задание, ему необходимо делегировать это задание дочернему процессу для выполнения.

Пул prefork имеет настраиваемое количество дочерних процессов (--concurrency), которые могут быть использованы для выполнения задач, и каждый дочерний процесс использует трубы/сокеты для связи с родительским процессом:

  • inqueue (pipe/socket): родитель отправляет задание дочернему процессу

  • outqueue (pipe/socket): дочерняя программа отправляет результат/обратное значение родительской программе.

В Celery 3.1 механизм планирования по умолчанию просто посылал задачу первому inqueue, который был доступен для записи, с некоторой эвристикой, чтобы убедиться, что мы выполняем круговое распределение между ними, чтобы каждый дочерний процесс получал одинаковое количество задач.

Это означает, что в стратегии планирования по умолчанию рабочий может отправлять задания тому же дочернему процессу, который уже выполняет задание. Если эта задача долго выполняется, она может надолго заблокировать ожидающую задачу. Что еще хуже, сотни коротко выполняющихся задач могут застрять за долго выполняющейся задачей, даже если есть дочерние процессы, свободные для выполнения работы.

Стратегия планирования -Ofair была добавлена, чтобы избежать этой ситуации, и когда она включена, она добавляет правило, что никакая задача не должна быть отправлена в дочерний процесс, который уже выполняет задачу.

Стратегия справедливого планирования может работать немного хуже, если у вас есть только краткосрочные задачи.

Ограничение размера резидентной памяти дочернего процесса

Теперь вы можете ограничить максимальный объем памяти, выделяемой на один дочерний процесс пула prefork, установив параметр worker --max-memory-per-child, или параметр worker_max_memory_per_child.

Ограничение касается объема памяти RSS/резидента и указывается в килобайтах.

Дочерний процесс, превысивший лимит, будет завершен и заменен новым процессом после возвращения текущего выполняемого задания.

Дополнительную информацию см. в разделе Максимальное количество памяти на одну детскую установку.

Внесено Дэйвом Смитом.

Один лог-файл на каждый дочерний процесс

Init-scrips и celery multi теперь используют опцию формата файла журнала %I (например, /var/log/celery/%n%I.log).

Это изменение было необходимо для обеспечения того, чтобы каждый дочерний процесс имел отдельный файл журнала после переноса регистрации задач в дочерний процесс, так как запись нескольких процессов в один и тот же файл журнала может привести к повреждению.

Вам рекомендуется обновить свои init-скрипты и аргументы celery multi, чтобы использовать эту новую опцию.

Перевозки

Поддержка приоритетной очереди RabbitMQ

Дополнительную информацию см. в разделе Приоритеты сообщений RabbitMQ.

Предоставлено Джеральдом Манипоном.

Настройте URL-адрес брокера отдельно для чтения/записи

Добавлены новые параметры broker_read_url и << 1 >>>, чтобы можно было предоставлять отдельные URL брокера для соединений, используемых для потребления/публикации.

В дополнение к опциям конфигурации, в API приложения были добавлены два новых метода:

  • app.connection_for_read()

  • app.connection_for_write()

Теперь их следует использовать вместо app.connection() для указания намерения требуемого соединения.

Примечание

Доступны два пула соединений: app.pool (чтение) и app.producer_pool (запись). Последний на самом деле дает не соединения, а полные экземпляры kombu.Producer.

def publish_some_message(app, producer=None):
    with app.producer_or_acquire(producer) as producer:
        ...

def consume_messages(app, connection=None):
    with app.connection_or_acquire(connection) as connection:
        ...

Поддержка расширений очередей RabbitMQ

Объявления очередей теперь могут задавать TTL сообщения и время истечения очереди напрямую, используя аргументы message_ttl и << 1 >>>.

В Queue были добавлены новые аргументы, позволяющие напрямую и удобно конфигурировать расширения очередей RabbitMQ в объявлениях очередей:

  • Queue(expires=20.0)

    Установка времени истечения очереди в плавающих секундах.

    См. kombu.Queue.expires.

  • Queue(message_ttl=30.0)

    Установка времени жизни сообщения очереди в секундах в формате float.

    См. kombu.Queue.message_ttl.

  • Queue(max_length=1000)

    Установите максимальную длину очереди (количество сообщений) как int.

    См. kombu.Queue.max_length.

  • Queue(max_length_bytes=1000)

    Установите максимальную длину очереди (общий размер сообщения в байтах) как int.

    См. kombu.Queue.max_length_bytes.

  • Queue(max_priority=10)

    Объявить очередь приоритетной очередью, которая направляет сообщения на основе поля priority сообщения.

    См. kombu.Queue.max_priority.

Транспорт Amazon SQS теперь официально поддерживается

Транспорт брокера SQS был переписан для использования асинхронного ввода-вывода и, таким образом, присоединился к RabbitMQ, Redis и QPid в качестве официально поддерживаемых транспортов.

Новая реализация также использует преимущества длинного опроса и устраняет несколько проблем, связанных с использованием SQS в качестве брокера.

Эта работа была спонсирована компанией Nextdoor.

Транспорт Apache QPid теперь официально поддерживается

Внесено Брайаном Баутерсом.

Redis: поддержка Sentinel

Вы можете направить соединение на список URL-адресов, например:

sentinel://0.0.0.0:26379;sentinel://0.0.0.0:26380/...

где каждый сентинел разделяется символом ;. Множественные сентинелы обрабатываются конструктором kombu.Connection, и помещаются в альтернативный список серверов для подключения в случае обрыва соединения.

При участии Сергея Азовскова, и Лоренцо Манчини.

Задачи

Декоратор автоповтора задачи

Написание пользовательской обработки повторных попыток для событий исключений настолько распространено, что теперь мы имеем встроенную поддержку для этого.

Для этого в декораторах задач теперь поддерживается новый аргумент autoretry_for, в котором можно указать кортеж исключений для автоматической повторной попытки:

from twitter.exceptions import FailWhaleError

@app.task(autoretry_for=(FailWhaleError,))
def refresh_timeline(user):
    return twitter.refresh_timeline(user)

Дополнительную информацию см. в разделе Автоматические повторные попытки для известных исключений.

Внесено Дмитрием Малиновским.

Task.replace Улучшения

  • self.replace(signature) теперь может заменить любую задачу, аккорд или группу, а сигнатура, на которую производится замена, может быть аккордом, группой или любым другим типом сигнатуры.

  • Больше не наследует callbacks и errbacks существующей задачи.

    Если вы заменяете узел в дереве, то вы не ожидаете, что новый узел унаследует дочерние элементы старого узла.

  • Task.replace_in_chord было удалено, вместо него используйте .replace.

  • Если замена является группой, эта группа будет автоматически преобразована в аккорд, в котором обратный вызов «накапливает» результаты групповых задач.

    Новая встроенная задача (для этого была добавлена celery.accumulate)

При участии Стива Морина, и Аска Солема.

Удаленное выполнение заданий

Новый task_remote_tracebacks сделает трассировку задач более полезной, внедряя стек удаленного рабочего.

Эта функция требует дополнительной библиотеки tblib.

Внесено Ионелом Кристианом Мэриешем.

Обработка ошибок подключения к задаче

Ошибки, связанные с соединением, возникающие при отправке задания, теперь повторно отображаются как ошибка kombu.exceptions.OperationalError:

>>> try:
...     add.delay(2, 2)
... except add.OperationalError as exc:
...     print('Could not send task %r: %r' % (add, exc))

Дополнительную информацию см. в разделе Обработка ошибок подключения.

Gevent/Eventlet: Выделенный поток для получения результатов

При использовании gevent или eventlet за потребление событий теперь отвечает один поток.

Это означает, что если у вас много вызовов, получающих результаты, то для их получения будет выделен отдельный поток:

result = add.delay(2, 2)

# this call will delegate to the result consumer thread:
#   once the consumer thread has received the result this greenlet can
# continue.
value = result.get(timeout=3)

Это делает выполнение вызовов RPC при использовании gevent/eventlet намного лучше.

AsyncResult.then(on_success, on_error)

API AsyncResult был расширен для поддержки протокола promise.

В настоящее время это работает только с бэкендами результатов RPC (amqp) и Redis, но позволяет подключать обратные вызовы для завершения задач:

import gevent.monkey
monkey.patch_all()

import time
from celery import Celery

app = Celery(broker='amqp://', backend='rpc')

@app.task
def add(x, y):
    return x + y

def on_result_ready(result):
    print('Received result for id %r: %r' % (result.id, result.result,))

add.delay(2, 2).then(on_result_ready)

time.sleep(3)  # run gevent event loop for a while.

Здесь продемонстрировано использование gevent, но на самом деле этот API более полезен в циклах событий с обратным вызовом, таких как twisted, или tornado.

Новый API маршрутизатора задач

Параметр task_routes теперь может содержать функции, а маршруты map теперь поддерживают шаблоны glob и regexes.

Вместо использования классов маршрутизаторов теперь можно просто определить функцию:

def route_for_task(name, args, kwargs, options, task=None, **kwargs):
    from proj import tasks

    if name == tasks.add.name:
        return {'queue': 'hipri'}

Если вам не нужны аргументы, вы можете использовать аргументы start, только убедитесь, что вы всегда принимаете также аргументы star, чтобы у нас была возможность добавить больше функций в будущем:

def route_for_task(name, *args, **kwargs):
    from proj import tasks
    if name == tasks.add.name:
        return {'queue': 'hipri', 'priority': 9}

И аргумент options, и новый аргумент с ключевым словом task являются новыми для маршрутизаторов в стиле функций, и облегчают написание маршрутизаторов на основе параметров выполнения или свойств задачи.

Необязательный аргумент ключевого слова task не будет установлен, если задача вызывается по имени с помощью app.send_task().

Дополнительные примеры, включая использование glob/regexes в маршрутизаторах, смотрите в task_routes и << 1 >>>.

Рефактор холста

Реализация canvas/work-flow была сильно доработана для устранения некоторых давно нерешенных проблем.

  • Обратные вызовы ошибок теперь могут принимать реальные экземпляры исключений и трассировки (проблема #2538).

    >>> add.s(2, 2).on_error(log_error.s()).delay()
    

    Где log_error может быть определено как:

    @app.task
    def log_error(request, exc, traceback):
        with open(os.path.join('/var/errors', request.id), 'a') as fh:
            print('--\n\n{0} {1} {2}'.format(
                task_id, exc, traceback), file=fh)
    

    Дополнительные примеры см. в разделе Холст: Проектирование рабочих потоков.

  • chain(a, b, c) теперь работает так же, как a | b | c.

    Это означает, что chain больше не может возвращать экземпляр chain, вместо этого он может оптимизировать рабочий процесс таким образом, что, например, две группы, соединенные в цепочку, становятся одной группой.

  • Теперь группы внутри групп разворачиваются в одну группу (выпуск #1509).

  • задачи chunks/map/starmap теперь маршрутизируются на основе целевой задачи

  • аккорды и цепочки теперь могут быть неизменяемыми.

  • Исправлена ошибка, при которой сериализованные подписи не преобразовывались обратно в подписи (проблема #2078)

    Исправление внесено Россом Дином.

  • Исправлена проблема, при которой цепочки и группы не работали при использовании сериализации JSON (проблема #2076).

    Исправление внесено Россом Дином.

  • Создание аккорда больше не приводит к множественным значениям для аргумента ключевого слова „task_id“ (проблема #2225).

    Исправление внесено Анейлом Маллаварапу.

  • Исправлена проблема, при которой возвращался неверный результат, когда цепочка содержала аккорд в качестве предпоследней задачи.

    Исправление внесено Анейлом Маллаварапу.

  • Специальный случай group(A.s() | group(B.s() | C.s())) теперь работает.

  • Цепочка: Исправлена ошибка с неправильным набором id, когда подзадача также является цепочкой.

  • group | group теперь сглажена в одну группу (выпуск #2573).

  • Исправлена проблема, при которой group | task не обновлялся до аккорда (проблема #2922).

  • Аккорды теперь правильно устанавливают ссылки result.parent.

  • chunks/map/starmap теперь маршрутизируются на основе целевой задачи.

  • Signature.link теперь работает, когда аргумент скалярный (не список)

    (Выпуск №2019).

  • group() теперь правильно пересылает аргументы ключевых слов (проблема #3426).

    Исправление внесено Самуэлем Гиффардом.

  • chord>, где группа заголовков состоит только из одной задачи, теперь превращается в простую цепочку.

  • Передача аргумента link в group.apply_async() теперь вызывает ошибку (проблема #3508).

  • chord | sig теперь присоединяется к обратному вызову аккорда (проблема #3356).

Периодические задачи

Новый API для настройки периодических задач

Этот новый API позволяет использовать подписи при определении периодических задач, устраняя вероятность неправильного ввода имен задач.

Примером нового API является here.

Оптимизированная реализация Beat

Реализация celery beat была оптимизирована для миллионов периодических задач путем использования кучи для планирования записей.

При участии Аска Солема и Александра Кошелева.

Планирование задач на основе восхода, заката, рассвета и сумерек

Дополнительную информацию см. в разделе Солнечные графики.

Внесено Марком Парнкаттом.

Бэкенды результатов

RPC Result Backend зрелый

Множество ошибок в ранее экспериментальном бэкенде результатов RPC были исправлены и теперь могут быть рассмотрены для производственного использования.

Вклад внесли Аск Солем, Моррис Твид.

Redis: Оптимизация бэкенда результатов

result.get() теперь используется pub/sub для потоковой передачи результатов задач

Вызов result.get() при использовании бэкенда результатов Redis раньше был очень дорогим, поскольку он использовал опрос для ожидания, пока результат станет доступен. Интервал опроса по умолчанию в 0,5 секунды не способствовал повышению производительности, но был необходим, чтобы избежать зацикливания.

Новая реализация использует механизмы Redis Pub/Sub для немедленной публикации и получения результатов, что значительно улучшает время выполнения задачи.

При участии Ярослава Жаворонкова и Аска Солема.

Новая оптимизированная реализация соединения аккордов

Это была экспериментальная функция, представленная в Celery 3.1, которую можно было включить, только добавив ?new_join=1 в конфигурацию URL бэкенда результата.

Мы считаем, что реализация была протестирована достаточно тщательно, чтобы считать ее стабильной и включенной по умолчанию.

Новая реализация значительно снижает накладные расходы на аккорды, и особенно при больших аккордах выигрыш в производительности может быть огромным.

Представлен новый бэкенд результатов Riak

Дополнительную информацию см. в разделе conf-riak-result-backend.

Вклад внесли Жиль Дартигуэлонг, Alman One и NoKriK.

Представлен новый бэкенд результатов CouchDB

Дополнительную информацию см. в разделе Настройки бэкенда CouchDB.

Внесено Натаном Ван Гимом.

Представлен новый бэкенд результатов Consul

Добавьте поддержку Consul в качестве бэкенда, используя хранилище ключей/значений Consul.

Consul имеет HTTP API, через который вы можете хранить ключи с их значениями.

Бэкенд расширяет KeyValueStoreBackend и реализует большинство методов.

В основном для установки, получения и удаления объектов.

Это позволяет Celery хранить результаты выполнения задачи в K/V хранилище Consul.

Consul также позволяет установить TTL для ключей, используя Sessions из Consul. Таким образом, бэкенд поддерживает автоматическое истечение срока действия результатов задач.

Для получения дополнительной информации о Консуле посетите сайт https://consul.io/.

Бэкенд использует python-consul для общения с HTTP API. Этот пакет полностью совместим с Python 3, как и этот бэкенд:

$ pip install python-consul

Это установит необходимый пакет для взаимодействия с HTTP API Consul из Python.

Вы также можете указать consul в качестве расширения в зависимости от Celery:

$ pip install celery[consul]

Дополнительную информацию см. в разделе Пакеты.

Внесено Видо ден Холландером.

Совершенно новый бэкенд результатов Cassandra

Совершенно новый бэкенд Cassandra, использующий новую библиотеку cassandra-driver, заменяет старый бэкенд результатов, использующий старую библиотеку pycassa.

Дополнительную информацию см. в разделе Настройки бэкенда Cassandra.

Для зависимости от Celery с использованием Cassandra в качестве бэкенда результатов:

$ pip install celery[cassandra]

Вы также можете объединить несколько требований к расширению, для получения дополнительной информации см. раздел Пакеты.

Представлен новый бэкенд результатов Elasticsearch

Дополнительную информацию см. в разделе Настройки бэкенда Elasticsearch.

Для зависимости от Celery с Elasticsearch в качестве результата используйте bakend:

$ pip install celery[elasticsearch]

Вы также можете объединить несколько требований к расширению, для получения дополнительной информации см. раздел Пакеты.

При участии Ахмета Демира.

Представлен новый бэкенд результатов файловой системы

Дополнительную информацию см. в разделе Настройки бэкенда файловой системы.

При участии Моше ван дер Стерре.

Пакетирование событий

Теперь события буферизируются в рабочем и отправляются в виде списка, что уменьшает накладные расходы, необходимые для отправки событий мониторинга.

Для авторов пользовательских мониторов событий не потребуется никаких действий, если вы используете помощники Python Celery (<< 0 >>>) для реализации вашего монитора.

Однако, если вы разбираете необработанные сообщения о событиях, вы должны учитывать пакетные сообщения о событиях, поскольку они отличаются от обычных сообщений о событиях следующим образом:

  • Ключ маршрутизации для пакета сообщений о событиях будет установлен в <event-group>.multi, если единственная группа событий, передаваемых в пакет, в настоящее время task (что дает ключ маршрутизации task.multi).

  • Тело сообщения будет представлять собой сериализованный список словарей вместо словаря. Каждый элемент списка можно рассматривать как обычное тело сообщения о событии.

Другие новости…

Требования

  • Теперь зависит от Kombu 4.0.

  • Теперь зависит от billiard версии 3.5.

  • Больше не зависит от anyjson. Прощай, старый друг :(

Задачи

  • В настоящее время «anon-exchange» используется для простой прямой маршрутизации по имени.

    Это увеличивает производительность, поскольку полностью обходит таблицу маршрутизации, а также повышает надежность транспорта брокера Redis.

  • Пустой ResultSet теперь оценивается как True.

    Исправление внесено Колином Макинтошем.

  • Ключ маршрутизации по умолчанию (task_default_routing_key) и имя обмена (task_default_exchange) теперь берутся из настройки task_default_queue.

    Это означает, что для изменения имени очереди по умолчанию теперь нужно задать только один параметр.

  • Новая настройка task_reject_on_worker_lost и атрибут задачи reject_on_worker_lost определяют, что произойдет, когда дочерний рабочий процесс, выполняющий задачу late ack, будет завершен.

    Внесено Майклом Пермана.

  • Task.subtask переименовано в Task.signature с псевдонимом.

  • Task.subtask_from_request переименовано в Task.signature_from_request с псевдонимом.

  • Атрибут delivery_mode для kombu.Queue теперь соблюдается (выпуск #1953).

  • Маршруты в task-routes теперь могут напрямую указывать экземпляр Queue.

    Пример:

    task_routes = {'proj.tasks.add': {'queue': Queue('add')}}
    
  • AsyncResult теперь возвращает ValueError, если task_id равен None. (Проблема #1996).

  • Повторные задания не пересылали настройки истечения срока выполнения (проблема #3297).

  • result.get() теперь поддерживает аргумент on_message для установки обратного вызова, который будет вызываться для каждого полученного сообщения.

  • Добавлены новые абстрактные классы:

  • Task.replace теперь правильно пересылает обратные вызовы (проблема #2722).

    Исправление внесено Nicolas Unravel.

  • Task.replace: Добавление к цепочке/аккорду (закрывает #3232)

    Исправлена проблема #3232, добавляющая сигнатуру в цепочку (если она есть). Исправлено подавление аккордов, если заданная сигнатура содержит один.

    Исправление внесено @honux.

  • Повторное выполнение задачи теперь выполняется и в режиме eager.

    Исправление внесено Феанилом Пателем.

Побить

  • Исправлен бесконечный цикл crontab при неверной дате.

    Если вхождение никогда не может быть достигнуто (например, 31 апреля), попытка достичь следующего вхождения вызовет бесконечный цикл.

    Попробуйте исправить это, вызвав ошибку RuntimeError после 2 000 итераций

    (Также в процессе добавлена проверка на високосные годы в кронтабе)

    Исправление внесено Ромуальдом Брюне.

  • Теперь обеспечивает выход программы с ненулевым кодом выхода, когда исключение завершает работу сервиса.

    Исправление внесено Simon Peeters.

Приложение

  • Даты теперь всегда учитывают часовые пояса, даже если enable_utc отключено (выпуск #943).

    Исправление внесено Омером Кацем.

  • Конфигурация: Предварительная конфигурация приложения теперь также сохраняется вместе с конфигурацией.

    Исправление внесено Джереми Зафраном.

  • Теперь приложение может изменить способ создания имен задач, используя

    метод gen_task_name().

    Внесено Дмитрием Малиновским.

  • В приложении появилось новое свойство app.current_worker_task, которое возвращает задачу, над которой в данный момент ведется работа (или None). (Проблема #2100).

Ведение журнала

  • get_task_logger() теперь вызывает исключение при попытке использовать имя «celery» или «celery.task» (проблема #3475).

Пулы исполнения

  • Eventlet/Gevent: теперь включена функция AMQP heartbeat (выпуск #3338).

  • Эвентлет/Гевент: Исправлено состояние гонки, приводящее к ошибкам «одновременного чтения» (проблема #2755).

  • Prefork: Пул префорков теперь использует poll вместо select там, где это возможно (проблема #2373).

  • Prefork: Исправлена ошибка, при которой пул отказывался выключать рабочего (проблема #2606).

  • Эвентлет: Теперь возвращает размер пула в команде celery inspect stats.

    Внесено Александром Обловатным.

Тестирование

  • Celery теперь является плагином pytest, включающим фикстуры, полезные для модульного и интеграционного тестирования.

    Более подробную информацию см. в testing user guide.

Перевозки

  • amqps:// теперь можно указать, что требуется SSL.

  • Транспорт Redis: Транспорт Redis теперь поддерживает опцию broker_use_ssl.

    Внесено Робертом Колба.

  • Сериализатор JSON теперь вызывает obj.__json__ для неподдерживаемых типов.

    Это означает, что теперь вы можете определить метод __json__ для пользовательских типов, которые могут быть сведены к встроенному типу json.

    Пример:

    class Person:
        first_name = None
        last_name = None
        address = None
    
        def __json__(self):
            return {
                'first_name': self.first_name,
                'last_name': self.last_name,
                'address': self.address,
            }
    
  • Сериализатор JSON теперь работает с datetime, Django promise, UUID и Decimal.

  • Новый Queue.consumer_arguments может использоваться для возможности установки приоритета потребителя через x-priority.

    См. https://www.rabbitmq.com/consumer-priority.html

    Пример:

    consumer = Consumer(channel, consumer_arguments={'x-priority': 3})
    
  • Очередь/обмен: Добавлена опция no_declare (также включена для внутренних обменов amq.).

Программы

  • Celery теперь использует argparse, вместо optparse.

  • Все программы теперь отключают цвета, если управляющий терминал не является TTY.

  • celery worker: Аргумент -q теперь отключает баннер запуска.

  • celery worker: Сообщение «рабочий готов» теперь регистрируется с серьезностью info, а не warn.

  • celery multi: Формат %n теперь является синонимом %N, чтобы соответствовать формату celery worker.

  • celery inspect/celery control: теперь поддерживается новая опция --json для вывода в формате json.

  • celery inspect registered: теперь игнорирует встроенные задачи.

  • celery purge теперь принимает параметры -Q и -X, используемые для указания того, какие очереди включать и исключать из очистки.

  • Новое celery logtool: Утилита для фильтрации и разбора лог-файлов celery worker

  • celery multi: теперь проходит через форматы файлов журнала %i и %I.

  • Общие: %p теперь можно использовать для расширения до полного имени рабочего узла в аргументах log-file/pid-file.

  • Новый параметр командной строки

    --executable теперь доступен для демонизирующих программ (celery worker и celery beat).

    Предоставлено Бертом Вандербауведом.

  • celery worker: поддерживает новую опцию --prefetch-multiplier.

    Внесено Микаэлем Пенхардом.

  • Аргумент --loader теперь действует всегда, даже если задан аргумент app (проблема №3405).

  • inspect/control теперь принимает команды из реестра

    Это означает, что пользовательские команды дистанционного управления также могут быть использованы из командной строки.

    Обратите внимание, что необходимо указать аргументы/и тип аргументов, чтобы аргументы были правильно переданы в командную строку.

    Теперь есть два декоратора, использование которых зависит от типа команды: @inspect_command + @control_command:

    from celery.worker.control import control_command
    
    @control_command(
        args=[('n', int)]
        signature='[N=1]',
    )
    def something(state, n=1, **kwargs):
        ...
    

    Здесь args - это список аргументов, поддерживаемых командой. Список должен содержать кортежи (argument_name, type).

    signature - это просто подсказка командной строки, используемая, например, в celery -A proj control --help.

    Команды также поддерживают вариадные аргументы, что означает, что все оставшиеся аргументы будут добавлены к одной переменной. Здесь демонстрируется команда terminate, которая принимает аргумент signal и переменное количество task_ids:

    from celery.worker.control import control_command
    
    @control_command(
        args=[('signal', str)],
        signature='<signal> [id1, [id2, [..., [idN]]]]',
        variadic='ids',
    )
    def terminate(state, signal, ids, **kwargs):
        ...
    

    Теперь эту команду можно вызвать с помощью:

    $ celery -A proj control terminate SIGKILL id1 id2 id3`
    

    Дополнительную информацию см. в разделе Написание собственных команд дистанционного управления.

Рабочий

  • Улучшения и исправления для LimitedSet.

    Избавление от утечки памяти + добавление minlen размера набора: минимальный остаточный размер набора после работы в течение некоторого времени. minlen элементы сохраняются, даже если они должны были быть исключены.

    Проблемы со старым и еще более старым кодом:

    1. В некоторых сценариях (например, при многократном добавлении элемента) куча будет расти.

    2. Быстрое добавление большого количества предметов не позволит очистить их достаточно быстро (если вообще возможно).

    3. При общении с другими рабочими были отправлены revoked._data, но они были обработаны на другой стороне как iterable. Это означает присвоение этим ключам новой (текущей) метки времени. Таким образом, рабочие могли перерабатывать элементы вечно. В сочетании с 1) и 2) это означает, что при большом количестве рабочих скоро закончится память.

    Теперь все эти проблемы должны быть устранены.

    Это должно исправить проблемы #3095, #3086.

    Внесено Дэвидом Правеком.

  • Новые настройки для управления очередями команд дистанционного управления.

    • control_queue_expires

      Установите время истечения очереди как для очереди команд удаленного управления, так и для очереди ответов удаленного управления.

    • control_queue_ttl

      Установите время жизни сообщений для очередей команд удаленного управления и очередей ответов удаленного управления.

    Внесено Аланом Джустино.

  • Сигнал worker_shutdown теперь всегда вызывается во время выключения.

    Ранее он не вызывался, если экземпляр worker сначала был собран gc.

  • Теперь Worker запускает потребителя команд удаленного управления только в том случае, если используемый транспорт брокера действительно поддерживает их.

  • Gossip теперь устанавливает x-message-ttl для очереди событий значение heartbeat_interval s. (Выпуск #2005).

  • Теперь сохраняет код выхода (проблема #2024).

  • Теперь отклоняет сообщения с недопустимым значением ETA (вместо ack, что означает, что они будут отправлены на биржу мертвых букв, если таковая настроена).

  • Исправлено падение при использовании аргумента -purge.

  • Уровень журнала для неустранимых ошибок изменен с error на critical.

  • Улучшенная точность ограничения скорости.

  • Учет отсутствующей информации о часовом поясе в поле «Срок действия задачи».

    Исправление внесено Альбертом Вангом.

  • Рабочий больше не имеет Queues шагов загрузки, так как он теперь

    лишним.

  • Теперь строка «Получено задание» выдается даже для отозванных заданий. (Проблема №3155).

  • Теперь соблюдается установка broker_connection_retry.

    Исправление внесено Нат Уильямс.

  • Новые настройки control_queue_ttl и control_queue_expires теперь позволяют настраивать TTL сообщений команд удаленного управления, а также время истечения очереди.

    Внесено Аланом Джустино.

  • Новое celery.worker.state.requests позволяет выполнять O(1) просмотр активных/резервных задач по идентификатору.

  • Автомасштабирование не всегда обновляло keep-alive при уменьшении масштаба.

    Исправление внесено Филипом Гарнеро.

  • Исправлена опечатка options_list -> option_list.

    Исправление внесено Грегом Уилбуром.

  • Некоторые аргументы командной строки worker и аргументы класса Worker() были переименованы для согласованности.

    Все они имеют псевдонимы для обратной совместимости.

    • --send-events -> --task-events

    • --schedule -> --schedule-filename

    • --maxtasksperchild -> --max-tasks-per-child

    • Beat(scheduler_cls=) -> Beat(scheduler=)

    • Worker(send_events=True) -> Worker(task_events=True)

    • Worker(task_time_limit=) -> Worker(time_limit=)

    • Worker(task_soft_time_limit=) -> Worker(soft_time_limit=)

    • Worker(state_db=) -> Worker(statedb=)

    • Worker(working_directory=) -> Worker(workdir=)

Отладочные утилиты

  • celery.contrib.rdb: Изменен баннер удаленного отладчика, чтобы можно было легко скопировать и вставить адрес (больше нет точки в адресе).

    Внесено Джонатаном Ванаско.

  • Исправлена совместимость с последними версиями psutil (проблема #3262).

Сигналы

  • App: Новые сигналы для настройки/финализации приложений:

  • Задача: Новые сигналы задачи для отклоненных сообщений задачи:

    • celery.signals.task_rejected.

    • celery.signals.task_unknown.

  • Рабочий: Новый сигнал для отправки события сердцебиения.

    • celery.signals.heartbeat_sent

      Внесено Кевином Ричардсоном.

События

  • Сообщения о событиях теперь используют опцию RabbitMQ x-message-ttl для обеспечения отбрасывания старых сообщений о событиях.

    По умолчанию - 5 секунд, но его можно изменить с помощью параметра event_queue_ttl.

  • Task.send_event теперь автоматически повторяет отправку события при разрыве соединения, в соответствии с настройками повтора публикации задачи.

  • Мониторы событий теперь по умолчанию устанавливают значение event_queue_expires.

    Очереди теперь будут заканчиваться через 60 секунд после того, как монитор перестанет их потреблять.

  • Исправлена ошибка, при которой значение None не обрабатывалось должным образом.

    Исправление внесено Dongweiming.

  • Новая настройка event_queue_prefix теперь может быть использована для изменения префикса очереди по умолчанию celeryev для очередей приемников событий.

    Внесено Такеши Канемото.

  • State.tasks_by_type и State.tasks_by_worker теперь могут быть использованы в качестве отображения для быстрого доступа к этой информации.

Развертывание

  • Общие init-скрипты теперь поддерживают переменные окружения CELERY_SU и CELERYD_SU_ARGS для установки пути и аргументов для su (su(1)).

  • Общие init-скрипты теперь лучше поддерживают FreeBSD и другие BSD-системы, выполняя поиск конфигурационного файла /usr/local/etc/.

    Прислано Таха Джахангиром.

  • Общий init-скрипт: Исправлена странная ошибка для celerybeat, когда перезапуск не всегда срабатывал (проблема #3018).

  • Сценарий systemd init теперь использует оболочку при выполнении служб.

    Внесено Томасом Мачалеком.

Бэкенды результатов

  • Redis: Теперь таймаут сокета по умолчанию составляет 120 секунд.

    Значение по умолчанию можно изменить с помощью новой настройки redis_socket_timeout.

    Внесено Рагурамом Шринивасаном.

  • Очереди результатов RPC Backend теперь автоматически удаляются по умолчанию (выпуск #2001).

  • Бэкенд RPC: Исправлена проблема, при которой исключение не десериализовывалось должным образом с помощью json-сериализатора (проблема #2518).

    Исправление внесено Аллардом Хёве.

  • CouchDB: бэкенд, используемый для двойного json-кодирования результатов.

    Исправление внесено Эндрю Стюартом.

  • CouchDB: Исправлена опечатка, из-за которой бэкенд не был найден (проблема #3287).

    Исправление внесено Эндрю Стюартом.

  • MongoDB: Теперь поддерживается установка параметра result_serialzier в значение bson для использования собственного сериализатора библиотек MongoDB.

    При участии Давиде Кварта.

  • MongoDB: работа с URI была улучшена для использования

    имя базы данных, пользователя и пароль из URI, если они предоставлены.

    Внесено Самуэлем Джаиллетом.

  • Бэкенд результатов SQLAlchemy: Теперь игнорирует все опции движка результатов при использовании NullPool (выпуск #1930).

  • Бэкенд результатов SQLAlchemy: Теперь устанавливает максимальный размер символа равным 155, чтобы справиться с поврежденной мозгом реализацией MySQL Unicode (проблема #1748).

  • Общие: Все исключения/предупреждения Celery теперь наследуются от common CeleryError/CeleryWarning. (Проблема #2643).

Улучшение документации

Внесено

  • Адам Чейнз

  • Амир Рустамзаде

  • Артур Вюйяр

  • Батист Билер

  • Беркер Пексаг

  • Брайс Грофф

  • Дэниел Девайн

  • Эдвард Беттс

  • Джейсон Ветч

  • Джефф Видман

  • Мацей Обуховский

  • Мануэль Продавец

  • Максим Бошемин

  • Митчел Хамферис

  • Павел Капышин

  • Пьер Ферсинг

  • Рик

  • Стивен Скляр

  • Тайфун Сен

  • Виланд Хоффманн

Реорганизация, амортизация и удаления

Несовместимые изменения

  • Префорк: Вызов result.get() или присоединение любого результата внутри задачи теперь вызывает RuntimeError.

    В предыдущих версиях это приводило к появлению предупреждения.

  • celery.worker.consumer теперь является пакетом, а не модулем.

  • Модуль celery.worker.job переименован в celery.worker.request.

  • Бить: Scheduler.Publisher/<.publisher переименован в .Producer/.producer.

  • Результат: Аргумент/атрибут task_name app.AsyncResult был удален.

    Раньше это поле использовалось для совместимости с pickle, но теперь в нем нет необходимости.

  • Бэкенды: Аргументы с именем status переименованы в state.

  • Бэкенды: backend.get_status() переименован в backend.get_state().

  • Бэкенды: backend.maybe_reraise() переименован в .maybe_throw()

    В API promise используется .throw(), поэтому это изменение было сделано, чтобы сделать его более последовательным.

    Имеется псевдоним, поэтому вы можете использовать maybe_reraise до версии Celery 5.0.

Внеплановые переезды

  • Экспериментальная функция celery.contrib.methods была удалена, так как в ее реализации было слишком много ошибок, чтобы быть полезной.

  • Инит-скрипты CentOS были удалены.

    Они не добавляют никаких возможностей по сравнению с общими init-скриптами, поэтому рекомендуется использовать их или что-то вроде supervisor.

Реорганизация Амортизация

Эти символы были переименованы, и хотя в этой версии есть псевдоним для обратной совместимости, они будут удалены в Celery 5.0, поэтому убедитесь, что вы переименовали их как можно скорее, чтобы убедиться, что они не сломаются в этом релизе.

Есть шанс, что вы будете использовать только первый из этого списка, но никогда не знаешь:

Плановые демонтажи

Модули

  • Модуль celery.worker.job был переименован в celery.worker.request.

    Это был внутренний модуль, поэтому он не должен был оказывать никакого влияния. Теперь он является частью общедоступного API, поэтому не должен больше меняться.

  • Модуль celery.task.trace был переименован в celery.app.trace, поскольку пакет celery.task постепенно закрывается. Модуль будет удален в версии 5.0, поэтому, пожалуйста, измените любой импорт из:

    from celery.task.trace import X
    

    к:

    from celery.app.trace import X
    
  • Старые псевдонимы совместимости в модуле celery.loaders были удалены.

    • Удалено celery.loaders.current_loader(), используйте: current_app.loader

    • Удалено celery.loaders.load_settings(), используйте: current_app.conf

Результат

  • AsyncResult.serializable() и celery.result.from_serializable.

    была удалена:

    Используйте вместо этого:

    >>> tup = result.as_tuple()
    >>> from celery.result import result_from_tuple
    >>> result = result_from_tuple(tup)
    
  • Удалено BaseAsyncResult, вместо этого используйте AsyncResult для проверки экземпляров.

  • Удалено TaskSetResult, вместо него используйте GroupResult.

    • TaskSetResult.total -> len(GroupResult)

    • TaskSetResult.taskset_id -> GroupResult.id

  • Удалено ResultSet.subtasks, вместо него используйте ResultSet.results.

TaskSet

TaskSet был удален, так как в Celery 3.0 он был заменен конструкцией group.

Если у вас есть код, подобный этому:

>>> from celery.task import TaskSet

>>> TaskSet(add.subtask((i, i)) for i in xrange(10)).apply_async()

Вам нужно заменить это на:

>>> from celery import group
>>> group(add.s(i, i) for i in xrange(10))()

События

  • Удаления для класса celery.events.state.Worker:

    • Worker._defaults атрибут.

      Используйте {k: getattr(worker, k) for k in worker._fields}.

    • Worker.update_heartbeat

      Используйте Worker.event(None, timestamp, received)

    • Worker.on_online

      Используйте Worker.event('online', timestamp, received, fields)

    • Worker.on_offline

      Используйте Worker.event('offline', timestamp, received, fields)

    • Worker.on_heartbeat

      Используйте Worker.event('heartbeat', timestamp, received, fields)

  • Удаления для класса celery.events.state.Task:

    • Task._defaults атрибут.

      Используйте {k: getattr(task, k) for k in task._fields}.

    • Task.on_sent

      Используйте Worker.event('sent', timestamp, received, fields)

    • Task.on_received

      Используйте Task.event('received', timestamp, received, fields)

    • Task.on_started

      Используйте Task.event('started', timestamp, received, fields)

    • Task.on_failed

      Используйте Task.event('failed', timestamp, received, fields)

    • Task.on_retried

      Используйте Task.event('retried', timestamp, received, fields)

    • Task.on_succeeded

      Используйте Task.event('succeeded', timestamp, received, fields)

    • Task.on_revoked

      Используйте Task.event('revoked', timestamp, received, fields)

    • Task.on_unknown_event

      Используйте Task.event(short_type, timestamp, received, fields)

    • Task.update

      Используйте Task.event(short_type, timestamp, received, fields)

    • Task.merge

      Свяжитесь с нами, если вам это необходимо.

Магические аргументы ключевых слов

Поддержка очень старых магических ключевых слов-аргументов, принимаемых задачами, окончательно удалена в этой версии.

Если вы все еще используете их, вам придется переписать любую задачу, все еще использующую старый модуль celery.decorators и зависящую от аргументов ключевых слов, передаваемых задаче, например:

from celery.decorators import task

@task()
def add(x, y, task_id=None):
    print('My task id is %r' % (task_id,))

следует переписать в:

from celery import task

@task(bind=True)
def add(self, x, y):
    print('My task id is {0.request.id}'.format(self))

Удаленные настройки

Следующие настройки были удалены и больше не поддерживаются:

Настройки ведения журнала

Имя установки

Заменить на

CELERYD_LOG_LEVEL

celery worker --loglevel

CELERYD_LOG_FILE

celery worker --logfile

CELERYBEAT_LOG_LEVEL

celery beat --loglevel

CELERYBEAT_LOG_FILE

celery beat --logfile

CELERYMON_LOG_LEVEL

celerymon устарел, используйте flower

CELERYMON_LOG_FILE

celerymon устарел, используйте flower

CELERYMON_LOG_FORMAT

celerymon устарел, используйте flower

Настройки задачи

Имя установки

Заменить на

CELERY_CHORD_PROPAGATES

Н/Д

Изменения во внутреннем API

Изменения в сроках амортизации

См. Временная линия устаревания Celery.

Вернуться на верх