История изменений для Celery 2.0

2.0.3

дата выхода:

2010-08-27 12:00 CEST

релиз на:

Спросите Солема

Исправления

  • Рабочий: Правильная обработка ошибок соединения при закрытии потребителей.

  • Рабочий: События теперь буферизируются, если соединение не работает, а затем отправляются при восстановлении соединения.

  • Больше не зависит от пакета mailer.

    Этот пакет имел коллизию в пространстве имен с django-mailer, поэтому его функциональность была заменена.

  • Бэкенд результатов Redis: Опечатки в документации: Redis имеет не имена баз данных, а номера баз данных. Теперь базой данных по умолчанию является 0.

  • inspect: registered_tasks запрашивала недопустимую команду из-за опечатки.

    Смотрите выпуск #170.

  • CELERY_ROUTES: Значения, определенные в маршруте, теперь должны иметь приоритет над значениями, определенными в CELERY_QUEUES при объединении двух маршрутов.

    Со следующими настройками:

    CELERY_QUEUES = {'cpubound': {'exchange': 'cpubound',
                                  'routing_key': 'cpubound'}}
    
    CELERY_ROUTES = {'tasks.add': {'queue': 'cpubound',
                                   'routing_key': 'tasks.add',
                                   'serializer': 'json'}}
    

    Окончательные параметры маршрутизации для tasks.add станут:

    {'exchange': 'cpubound',
     'routing_key': 'tasks.add',
     'serializer': 'json'}
    

    Раньше такого не было: значения в CELERY_QUEUES имели приоритет.

  • Worker аварийно завершал работу, если значение CELERY_TASK_ERROR_WHITELIST не было итерабельным

  • apply(): Убедитесь, что kwargs[„task_id“] всегда установлен.

  • AsyncResult.traceback: Теперь возвращает None, вместо того, чтобы поднимать KeyError, если traceback отсутствует.

  • inspect: Ответы работали некорректно, если не было указано место назначения.

  • Теперь можно хранить результаты/мета-данные для пользовательских состояний.

  • Рабочий: Теперь при неудачной отправке сообщений об ошибках задания выдается предупреждение.

  • celeryev: Монитор курсоров больше не аварийно завершается при изменении размера окна терминала.

    Смотрите выпуск #160.

  • Рабочий: На macOS невозможно запустить os.exec* в потоковом процессе.

    Это нарушает работу обработчика перезапуска SIGHUP, и теперь он отключен на macOS, выдавая вместо этого предупреждение.

    Смотрите выпуск #152.

  • celery.execute.trace: Правильно обрабатывайте raise(str), который все еще разрешен в Python 2.4.

    Смотрите выпуск #175.

  • Использование urllib2 в периодической задаче на macOS привело к сбою из-за автоматического определения прокси, используемого в macOS.

    Теперь это исправлено с помощью обходного пути. См. выпуск #143.

  • Debian init-scripts: Команды не должны выполняться во вложенной оболочке

    Смотрите выпуск #163.

  • Debian init-scripts: Использовать абсолютный путь программы celeryd для разрешения stat

    Смотрите выпуск #162.

Документация

  • getting-started/broker-installation: Исправлена опечатка

    set_permissions «»` -> set_permissions «.*».

  • Руководство пользователя по задачам: Добавлен раздел о транзакциях с базой данных.

    Смотрите выпуск #169.

  • Руководство пользователя по маршрутизации: Исправлена опечатка «feed»: -> {«queue»: «feeds»}.

    Смотрите выпуск #169.

  • Документированы значения по умолчанию для параметров CELERYD_CONCURRENCY и << 1 >>>.

  • Руководство пользователя по задачам: Исправлены опечатки в примере подзадачи

  • celery.signals: Документированный worker_process_init.

  • Поваренная книга по демонизации: Необходимо экспортировать DJANGO_SETTINGS_MODULE в /etc/default/celeryd.

  • Добавлено еще несколько часто задаваемых вопросов из stack overflow

  • Поваренная книга по демонизации: Исправлена опечатка CELERYD_LOGFILE/CELERYD_PIDFILE.

    в CELERYD_LOG_FILE / CELERYD_PID_FILE

    Также добавлен раздел по устранению неполадок для init-скриптов.

2.0.2

дата выхода:

2010-07-22 11:31 a.m. CEST

релиз на:

Спросите Солема

  • Маршруты: При использовании синтаксиса маршрута dict обмен для задачи может исчезнуть, сделав задачу немаршрутизируемой.

    Смотрите выпуск #158.

  • Набор тестов теперь работает на Python 2.4

  • Больше не нужно набирать PYTHONPATH=., чтобы использовать celeryconfig в текущем каталоге.

    Это достигается тем, что загрузчик по умолчанию при загрузке модуля config убеждается, что текущий каталог находится в sys.path. После загрузки sys.path возвращается в исходное состояние.

    Добавление текущего рабочего каталога в sys.path без ведома пользователя может быть проблемой безопасности, поскольку это означает, что кто-то может закинуть модуль Python в каталог пользователя, выполняющий произвольные команды. Это была первоначальная причина не делать этого, но если это делается только при загрузке модуля конфигурации, это означает, что поведение будет применяться только к модулям, импортированным в модуле конфигурации, что я считаю хорошим компромиссом (конечно, лучше, чем просто явно установить PYTHONPATH=. в любом случае).

  • Добавлен экспериментальный бэкенд Cassandra.

  • Рабочий: Обработчик SIGHUP случайно распространялся на процессы рабочего пула.

    В сочетании с GitHub SHA@7a7c44e39344789f11b5346e9cc8340f5fe4846c это заставит каждый дочерний процесс начинать новый рабочий экземпляр при закрытии окна терминала :/.

  • Рабочий: Не устанавливайте обработчик SIGHUP при запуске из терминала.

    Это устраняет проблему, когда рабочий запускается в фоновом режиме при закрытии терминала.

  • Рабочий: Теперь объединяет потоки при выключении.

    Смотрите выпуск #152.

  • Тестовое завершение работы: Не используйте atexit, вместо этого используйте функциональность nose teardown().

    Смотрите выпуск #154.

  • Debian worker init-script: Остановка теперь работает правильно.

  • Регистратор задач: добавлен метод warn (синоним warning)

  • Теперь можно определить белый список ошибок для отправки сообщений об ошибках.

    Пример:

    CELERY_TASK_ERROR_WHITELIST = ('myapp.MalformedInputError',)
    

    Смотрите выпуск #153.

  • Рабочий: Теперь обрабатывает исключения переполнения в time.mktime при разборе поля ETA.

  • LoggerWrapper: Попытка обнаружить логгеры, ведущие журнал в stderr/stdout, что приводит к бесконечному циклу.

  • Добавлено celery.task.control.inspect: Проверяет работающего рабочего.

    Примеры:

    # Inspect a single worker
    >>> i = inspect('myworker.example.com')
    
    # Inspect several workers
    >>> i = inspect(['myworker.example.com', 'myworker2.example.com'])
    
    # Inspect all workers consuming on this vhost.
    >>> i = inspect()
    
    ### Methods
    
    # Get currently executing tasks
    >>> i.active()
    
    # Get currently reserved tasks
    >>> i.reserved()
    
    # Get the current ETA schedule
    >>> i.scheduled()
    
    # Worker statistics and info
    >>> i.stats()
    
    # List of currently revoked tasks
    >>> i.revoked()
    
    # List of registered tasks
    >>> i.registered_tasks()
    
  • Команды удаленного управления dump_active/dump_reserved/dump_schedule теперь отвечают с подробными запросами задач.

    Содержит исходные аргументы и поля запрашиваемого задания.

    Кроме того, была добавлена команда удаленного управления set_loglevel, которая изменяет уровень журнала только для основного процесса.

  • Выполнение команд управления рабочим процессом теперь перехватывает ошибки и возвращает их строковое представление в ответе.

  • Добавлен набор функциональных тестов

    celery.tests.functional.case содержит утилиты для запуска и остановки встроенного рабочего процесса, для использования в функциональном тестировании.

2.0.1

дата выхода:

2010-07-09 03:02 p.m. CEST

релиз на:

Спросите Солема

  • multiprocessing.pool: Теперь обрабатывает ошибки кодирования, так что обработка ошибок не приводит к аварийному завершению рабочих процессов.

  • Ответы команды удаленного управления не работали при более строгой проверке эквивалентности в RabbitMQ 1.8.0.

    Если вы уже столкнулись с этой проблемой, возможно, вам придется удалить декларацию:

    $ camqadm exchange.delete celerycrq
    

    или:

    $ python manage.py camqadm exchange.delete celerycrq
    
  • В планировщик ETA закралась ошибка, из-за которой он мог выполнять только одну задачу в секунду(!).

    Планировщик спит между итерациями, чтобы не потреблять слишком много процессора. Он хранит список запланированных элементов, отсортированных по времени, и на каждой итерации он спит в течение оставшегося времени элемента с ближайшим сроком выполнения. Если нет задач с ETA, он будет спать минимальное количество времени, по умолчанию одну секунду.

    Сюда закралась ошибка, из-за которой программа засыпала на одну секунду для каждой запланированной задачи. Это было исправлено, так что теперь он должен перемещать задачи как горячий нож по маслу.

    Кроме того, была добавлена новая настройка для управления минимальным интервалом сна; CELERYD_ETA_SCHEDULER_PRECISION. Хорошим значением для этого параметра будет плавающая величина от 0 до 1, в зависимости от требуемой точности. Значение 0.8 означает, что при достижении ETA задачи потребуется не более 0.8 секунды, чтобы задача была перемещена в очередь готовности.

  • Бассейн: Супервизор не освободил семафор.

    Это привело бы к тупиковой ситуации, если бы все работники прекратили работу досрочно.

  • Добавлены классификаторы Trove версий Python: 2.4, 2.5, 2.6 и 2.7

  • Тесты теперь проходят на Python 2.7.

  • Task.__reduce__: Задачи, созданные с помощью декоратора задач, теперь можно мариновать.

  • setup.py: nose добавлено в tests_require.

  • Pickle теперь должен работать с SQLAlchemy 0.5.x

  • Новый дизайн домашней страницы от Яна Хенрика Хельмерса: http://celeryproject.org

  • Новая тема Sphinx от Армина Ронахера: http://docs.celeryproject.org/

  • Исправлены ошибки «pending_xref», отображаемые в HTML-рендеринге документации. Очевидно, это было вызвано новыми изменениями в Sphinx 1.0b2.

  • Классы маршрутизаторов в CELERY_ROUTES теперь импортируются лениво.

    Импортирование класса маршрутизатора в модуль, который также загружает среду Celery, привело бы к возникновению круговой зависимости. Это решается импортом по мере необходимости после установки среды.

  • CELERY_ROUTES нарушался, если был установлен на один дикт.

    Теперь этот пример в документации должен снова работать:

    CELERY_ROUTES = {'feed.tasks.import_feed': 'feeds'}
    
  • CREATE_MISSING_QUEUES не соблюдался apply_async.

  • Новая команда дистанционного управления: stats.

    Выгружает информацию о рабочем, например, идентификаторы процессов пула и общее количество выполненных задач по типам.

    Пример ответа:

    [{'worker.local':
         'total': {'tasks.sleeptask': 6},
         'pool': {'timeouts': [None, None],
                  'processes': [60376, 60377],
                  'max-concurrency': 2,
                  'max-tasks-per-child': None,
                  'put-guarded-by-semaphore': True}}]
    
  • Новая команда удаленного управления: dump_active

    Получает список задач, выполняемых в данный момент рабочим. По умолчанию аргументы передаются через repr на случай, если есть аргументы, которые не могут быть закодированы в JSON. Если вы знаете, что аргументы безопасны для JSON, вы можете передать аргумент safe=True.

    Пример ответа:

    >>> broadcast('dump_active', arguments={'safe': False}, reply=True)
    [{'worker.local': [
        {'args': '(1,)',
         'time_start': 1278580542.6300001,
         'name': 'tasks.sleeptask',
         'delivery_info': {
             'consumer_tag': '30',
             'routing_key': 'celery',
             'exchange': 'celery'},
         'hostname': 'casper.local',
         'acknowledged': True,
         'kwargs': '{}',
         'id': '802e93e9-e470-47ed-b913-06de8510aca2',
        }
    ]}]
    
  • Добавлена экспериментальная поддержка постоянных отзывов.

    Используйте аргумент -S|–statedb для рабочего, чтобы включить его:

    $ celeryd --statedb=/var/run/celeryd
    

    При этом будет использоваться файл: /var/run/celeryd.db, так как модуль shelve автоматически добавляет суффикс .db.

2.0.0

дата выхода:

2010-07-02 02:30 p.m. CEST

релиз на:

Спросите Солема

Предисловие

Celery 2.0 содержит обратно несовместимые изменения, самое важное из которых заключается в том, что зависимость от Django была удалена, поэтому Celery больше не поддерживает Django из коробки, а только как дополнительный пакет под названием django-celery.

Нам очень жаль нарушать обратную совместимость, но есть также много новых и интересных функций, которые компенсируют время, потерянное на обновление, поэтому обязательно прочитайте раздел News.

Довольно много потенциальных пользователей были расстроены зависимостью от Django, так что, возможно, это шанс получить более широкое принятие со стороны сообщества Python.

Большое спасибо всем участникам, тестерам и пользователям!

Обновление для Django-пользователей

Интеграция Django была перенесена в отдельный пакет: django-celery.

  • Для обновления необходимо установить модуль django-celery и изменить:

    INSTALLED_APPS = 'celery'
    

    к:

    INSTALLED_APPS = 'djcelery'
    
  • Если вы используете mod_wsgi, вам необходимо добавить следующую строку в ваш файл .wsgi:

    import os
    os.environ['CELERY_LOADER'] = 'django'
    
  • Следующие модули были перемещены в django-celery:

    Имя модуля

    Заменить на

    celery.models

    djcelery.models

    celery.managers

    djcelery.managers

    celery.views

    djcelery.views

    celery.urls

    djcelery.urls

    celery.management

    djcelery.management

    celery.loaders.djangoapp

    djcelery.loaders

    celery.backends.database

    djcelery.backends.database

    celery.backends.cache

    djcelery.backends.cache

Импорт djcelery автоматически настроит Celery на использование Django loader. loader. Это делается путем установки переменной окружения CELERY_LOADER в значение «django» (она не изменит его, если загрузчик уже установлен).

При использовании загрузчика Django псевдонимы бэкендов результатов «database» и «cache» будут указывать на бэкенды djcelery вместо встроенных бэкендов, а конфигурация будет считываться из настроек Django.

Модернизация для других

Бэкенд результатов базы данных

Бэкенд результатов базы данных теперь использует SQLAlchemy вместо Django ORM, смотрите Supported Databases для таблицы поддерживаемых баз данных.

DATABASE_* settings has been replaced by a single setting: CELERY_RESULT_DBURI. The value here should be an SQLAlchemy Connection String, некоторые примеры включают:

# sqlite (filename)
CELERY_RESULT_DBURI = 'sqlite:///celerydb.sqlite'

# mysql
CELERY_RESULT_DBURI = 'mysql://scott:tiger@localhost/foo'

# postgresql
CELERY_RESULT_DBURI = 'postgresql://scott:tiger@localhost/mydatabase'

# oracle
CELERY_RESULT_DBURI = 'oracle://scott:tiger@127.0.0.1:1521/sidname'

Дополнительную информацию о строках подключения см. в SQLAlchemy Connection Strings.

Для указания дополнительных опций движка базы данных SQLAlchemy можно использовать параметр CELERY_RESULT_ENGINE_OPTIONS:

# echo enables verbose logging from SQLAlchemy.
CELERY_RESULT_ENGINE_OPTIONS = {'echo': True}

Бэкенд результатов кэширования

Бэкенд результатов кэширования больше не использует фреймворк Django cache, но поддерживает в основном тот же синтаксис конфигурации:

CELERY_CACHE_BACKEND = 'memcached://A.example.com:11211;B.example.com'

Для использования кэш-бэкенда у вас должна быть установлена либо библиотека pylibmc, либо << 1 >>>, из которых первая считается лучшим выбором.

Поддерживаемые типы бэкендов - memcached:// и memory://, мы не чувствовали необходимости поддерживать какие-либо другие бэкенды, предоставляемые Django.

Обратные несовместимые изменения

  • Загрузчик по умолчанию (python) теперь печатает предупреждение об отсутствии celeryconfig.py вместо того, чтобы поднимать ImportError.

    Если конфигурация не настроена, рабочий выдает ImproperlyConfigured. Это позволяет использовать –help и т.д., не имея рабочей конфигурации.

    Также это позволяет использовать клиентскую часть Celery без настройки:

    >>> from carrot.connection import BrokerConnection
    >>> conn = BrokerConnection('localhost', 'guest', 'guest', '/')
    >>> from celery.execute import send_task
    >>> r = send_task('celery.ping', args=(), kwargs={}, connection=conn)
    >>> from celery.backends.amqp import AMQPBackend
    >>> r.backend = AMQPBackend(connection=conn)
    >>> r.get()
    'pong'
    
  • Следующие устаревшие параметры были удалены (в соответствии с планом Временная линия устаревания Celery):

    Имя установки

    Заменить на

    CELERY_AMQP_CONSUMER_QUEUES

    CELERY_QUEUES

    CELERY_AMQP_EXCHANGE

    CELERY_DEFAULT_EXCHANGE

    CELERY_AMQP_EXCHANGE_TYPE

    CELERY_DEFAULT_EXCHANGE_TYPE

    CELERY_AMQP_CONSUMER_ROUTING_KEY

    CELERY_QUEUES

    CELERY_AMQP_PUBLISHER_ROUTING_KEY

    CELERY_DEFAULT_ROUTING_KEY

  • Модуль celery.task.rest был удален, вместо него используйте celery.task.http (согласно расписанию Временная линия устаревания Celery).

  • Больше не разрешается пропускать имя класса в именах загрузчиков. (как запланировано Временная линия устаревания Celery):

    Использование неявного имени класса Loader больше не поддерживается, например, если вы используете:

    CELERY_LOADER = 'myapp.loaders'
    

    Вы должны включить имя класса загрузчика, как показано ниже:

    CELERY_LOADER = 'myapp.loaders.Loader'
    
  • CELERY_TASK_RESULT_EXPIRES теперь по умолчанию составляет 1 день.

    Предыдущая настройка по умолчанию предусматривала истечение срока действия через 5 дней.

  • Бэкенд AMQP: Не используйте различные значения для auto_delete.

    Эта ошибка стала заметна в версии RabbitMQ 1.8.0, которая больше не допускает конфликтующих объявлений для параметров auto_delete и durable.

    Если вы уже использовали Celery с этим бэкендом, скорее всего, вам придется удалить предыдущую декларацию:

    $ camqadm exchange.delete celeryresults
    
  • Теперь используется pickle вместо cPickle на версиях Python <= 2.5

    cPickle не работает в Python <= 2.5.

    Он небезопасно и некорректно использует относительный, а не абсолютный импорт, поэтому, например:

    exceptions.KeyError
    

    становится:

    celery.exceptions.KeyError
    

    Лучшим выбором будет обновление до Python 2.6, поскольку, хотя чистая версия pickle имеет худшую производительность, это единственный безопасный вариант для старых версий Python.

Новости

  • celeryev: Проклинает Celery Monitor и Event Viewer.

    Это простой монитор, позволяющий видеть, какие задания выполняются в режиме реального времени, а также исследовать отслеживание и результаты готовых заданий. Он также позволяет устанавливать новые ограничения скорости и отзывать задания.

    Скриншот:

    ../../_images/celeryevshotsm.jpg

    Если вы запустите celeryev с ключом -d, он будет работать как дампер событий, просто сбрасывая полученные события в стандартный формат:

    $ celeryev -d
    -> celeryev: starting capture...
    casper.local [2010-06-04 10:42:07.020000] heartbeat
    casper.local [2010-06-04 10:42:14.750000] task received:
        tasks.add(61a68756-27f4-4879-b816-3cf815672b0e) args=[2, 2] kwargs={}
        eta=2010-06-04T10:42:16.669290, retries=0
    casper.local [2010-06-04 10:42:17.230000] task started
        tasks.add(61a68756-27f4-4879-b816-3cf815672b0e) args=[2, 2] kwargs={}
    casper.local [2010-06-04 10:42:17.960000] task succeeded:
        tasks.add(61a68756-27f4-4879-b816-3cf815672b0e)
        args=[2, 2] kwargs={} result=4, runtime=0.782663106918
    
    The fields here are, in order: *sender hostname*, *timestamp*, *event type* and
    *additional event fields*.
    
  • Бэкенд результатов AMQP: Теперь поддерживает .ready(), .successful(), .result, .status, и даже реагирует на изменения в состоянии задачи

  • Новые руководства пользователя:

  • Рабочий: Стандартный выход/ошибка теперь перенаправляется в файл журнала.

  • billiard был перемещен обратно в репозиторий Celery.

    Имя модуля

    эквивалент сельдерея

    бильярд.пул

    celery.concurrency.processes.pool

    billiard.serialization

    celery.serialization

    billiard.utils.functional

    celery.utils.functional

    Распределение billiard может быть сохранено, в зависимости от интереса.

  • теперь зависит от carrot >= 0.10.5

  • теперь зависит от pyparsing

  • Рабочий: Добавлено –purge в качестве псевдонима для –discard.

  • Рабочий: Control-c (SIGINT) один раз приводит к теплому выключению, нажатие Control-c дважды приводит к завершению работы.

  • Добавлена поддержка использования сложных Crontab-выражений в периодических задачах. Например, теперь вы можете использовать:

    >>> crontab(minute='*/15')
    

    или даже:

    >>> crontab(minute='*/30', hour='8-17,1-2', day_of_week='thu-fri')
    

    См. Периодические задачи.

  • Рабочий: Теперь ожидает доступных процессов пула перед применением новых задач к пулу.

    Это означает, что ему не нужно ждать завершения десятков задач при выключении, потому что он применил предварительную выборку задач, не имея доступных процессов пула, чтобы немедленно принять их.

    Смотрите выпуск #122.

  • Новый встроенный способ выполнения обратных вызовов задач с использованием subtask.

    Дополнительную информацию см. в разделе Холст: Проектирование рабочих потоков.

  • Наборы задач теперь могут содержать несколько типов задач.

    TaskSet был переработан для использования нового синтаксиса, более подробную информацию смотрите в Холст: Проектирование рабочих потоков.

    Предыдущий синтаксис все еще поддерживается, но будет устаревшим в версии 1.4.

  • Результат TaskSet failed() оказался неверным.

    Смотрите выпуск #132.

  • Теперь создаются различные регистраторы для каждого класса задач.

    Смотрите выпуск #129.

  • Отсутствующие определения очередей теперь создаются автоматически.

    Это можно отключить с помощью параметра CELERY_CREATE_MISSING_QUEUES.

    Отсутствующие очереди создаются со следующими параметрами:

    CELERY_QUEUES[name] = {'exchange': name,
                           'exchange_type': 'direct',
                           'routing_key': 'name}
    

    Эта функция добавлена для простой настройки маршрутизации с помощью опции -Q для рабочего:

    $ celeryd -Q video, image
    

    Более подробную информацию см. в разделе «Новая маршрутизация» Руководства пользователя: Задачи маршрутизации.

  • Новая опция задачи: Task.queue.

    Если установлено, параметры сообщения будут взяты из соответствующей записи в CELERY_QUEUES. exchange, exchange_type и routing_key игнорируются.

  • Добавлена поддержка мягких и жестких ограничений времени выполнения заданий.

    Добавлены новые настройки:

    • CELERYD_TASK_TIME_LIMIT

      Жесткий лимит времени. При превышении этого лимита рабочий, обрабатывающий задание, будет убит и заменен новым.

    • CELERYD_TASK_SOFT_TIME_LIMIT

      Мягкий лимит времени. При его превышении будет вызвано исключение SoftTimeLimitExceeded. Задача может перехватить его, чтобы, например, очистить до наступления жесткого лимита времени.

    Добавлены новые аргументы командной строки к celeryd: –time-limit и –soft-time-limit.

    Что осталось?

    Это не будет работать на платформах, пока не поддерживающих сигналы (и в частности сигнал SIGUSR1). Поэтому в качестве альтернативы должна быть реализована возможность полного отключения этой функции на несоответствующих платформах.

    Также при превышении жесткого лимита времени результатом выполнения задачи должно быть исключение TimeLimitExceeded.

  • Тестовый набор теперь проходит без запущенного брокера, используя бэкенд carrot in-memory.

  • Вывод журнала теперь доступен в цветах.

    Уровень журнала

    Цвет

    DEBUG

    Голубой

    ПРЕДУПРЕЖДЕНИЕ

    Желтый

    КРИТИЧЕСКИЙ

    Magenta

    ERROR

    Красный

    Это включено только в том случае, если выход журнала является tty. Вы можете явно включить/выключить эту функцию с помощью параметра CELERYD_LOG_COLOR.

  • Добавлена поддержка классов маршрутизаторов задач (подобно маршрутизаторам django multi-db)

    • Новая настройка: CELERY_ROUTES

    Это один или список маршрутизаторов, которые необходимо обойти при отправке заданий. Словари в этом списке преобразуются в экземпляр celery.routes.MapRoute.

    Примеры:

    >>> CELERY_ROUTES = {'celery.ping': 'default',
                         'mytasks.add': 'cpu-bound',
                         'video.encode': {
                             'queue': 'video',
                             'exchange': 'media'
                             'routing_key': 'media.video.encode'}}
    
    >>> CELERY_ROUTES = ('myapp.tasks.Router',
                         {'celery.ping': 'default'})
    

    Где может быть myapp.tasks.Router:

    class Router(object):
    
        def route_for_task(self, task, args=None, kwargs=None):
            if task == 'celery.ping':
                return 'default'
    

    route_for_task может возвращать строку или dict. Строка означает, что это имя очереди в CELERY_QUEUES, а dict означает, что это пользовательский маршрут.

    При отправке заданий маршрутизаторы просматриваются по порядку. Первый маршрутизатор, который не возвращает None, является маршрутом для использования. Затем параметры сообщения объединяются с настройками найденного маршрута, при этом настройки маршрутизаторов имеют приоритет.

    Пример, если apply_async() имеет такие аргументы:

    >>> Task.apply_async(immediate=False, exchange='video',
    ...                  routing_key='video.compress')
    

    и маршрутизатор возвращается:

    {'immediate': True,
     'exchange': 'urgent'}
    

    окончательные варианты сообщений будут такими:

    >>> task.apply_async(
    ...    immediate=True,
    ...    exchange='urgent',
    ...    routing_key='video.compress',
    ... )
    

    (и любые параметры сообщения по умолчанию, определенные в классе Task)

  • Новый обработчик задачи, вызываемый после возврата задачи: after_return().

  • ExceptionInfo теперь передается в

    on_retry()/ on_failure() как аргумент ключевого слова einfo.

  • Рабочий: Добавлено CELERYD_MAX_TASKS_PER_CHILD / celery worker --maxtasksperchild.

    Определяет максимальное количество задач, которые может обработать рабочий пул, прежде чем процесс будет завершен и заменен новым.

  • Отозванные задания теперь помечаются состоянием REVOKED, а result.get() теперь будет поднимать TaskRevokedError.

  • celery.task.control.ping() теперь работает как ожидалось.

  • apply(throw=True) / CELERY_EAGER_PROPAGATES_EXCEPTIONS: Заставляет нетерпеливое выполнение повторно выдавать ошибки задачи.

  • Новый сигнал: ~celery.signals.worker_process_init: Отправлен внутри рабочего процесса пула при инициализации.

  • Рабочий: celery worker -Q опция: Возможность указать список очередей для использования, отключая другие настроенные очереди.

    Например, если CELERY_QUEUES определяет четыре очереди: image, video, data и default, то следующая команда заставит рабочего потреблять только из очередей image и video:

    $ celeryd -Q image,video
    
  • Рабочий: Новое возвращаемое значение для команды управления revoke:

    Теперь возвращается:

    {'ok': 'task $id revoked'}
    

    вместо True.

  • Рабочий: Теперь можно включать/выключать события с помощью дистанционного управления

    Пример использования:

    >>> from celery.task.control import broadcast
    >>> broadcast('enable_events')
    >>> broadcast('disable_events')
    
  • Удален каталог тестов верхнего уровня. Конфигурация тестов теперь находится в celery.tests.config

    Это означает, что запуск юнит-тестов не требует специальной настройки. celery/tests/__init__ теперь настраивает переменные окружения CELERY_CONFIG_MODULE и << 1 >>>, так что когда nosetests импортирует это, окружение юнит-тестов уже готово.

    Перед запуском тестов необходимо установить требования к тестам:

    $ pip install -r requirements/test.txt
    

    Выполнение всех тестов:

    $ nosetests
    

    Указание тестов для запуска:

    $ nosetests celery.tests.test_task
    

    Создание покрытия HTML:

    $ nosetests --with-coverage3
    

    Вывод покрытия находится в celery/tests/cover/index.html.

  • Рабочий: Новая опция –version: Выгрузка информации о версии и выход.

  • celeryd-multi: Инструмент для сценариев командной оболочки для запуска нескольких рабочих.

    Некоторые примеры:

    • Расширенный пример с 10 работниками:

      • Три работника обрабатывают изображения и видео очередь

      • Два рабочих обрабатывают очередь данных с уровнем журнала DEBUG

      • остальные обрабатывают очередь по умолчанию.

      $ celeryd-multi start 10 -l INFO -Q:1-3 images,video -Q:4,5:data -Q default -L:4,5 DEBUG
      
    • Получите команды для запуска 10 рабочих, с 3 процессами каждый

      $ celeryd-multi start 3 -c 3
      celeryd -n celeryd1.myhost -c 3
      celeryd -n celeryd2.myhost -c 3
      celeryd -n celeryd3.myhost -c 3
      
    • Запустить 3 названных рабочих

      $ celeryd-multi start image video data -c 3
      celeryd -n image.myhost -c 3
      celeryd -n video.myhost -c 3
      celeryd -n data.myhost -c 3
      
    • Укажите пользовательское имя хоста

      $ celeryd-multi start 2 -n worker.example.com -c 3
      celeryd -n celeryd1.worker.example.com -c 3
      celeryd -n celeryd2.worker.example.com -c 3
      

      Дополнительные параметры добавляются к каждому celeryd, но вы также можете изменить параметры для диапазонов или отдельных рабочих

    • 3 рабочих: Два с 3 процессами, и один с 10 процессами.

      $ celeryd-multi start 3 -c 3 -c:1 10
      celeryd -n celeryd1.myhost -c 10
      celeryd -n celeryd2.myhost -c 3
      celeryd -n celeryd3.myhost -c 3
      
    • Можно также указать параметры для именованных работников

      $ celeryd-multi start image video data -c 3 -c:image 10
      celeryd -n image.myhost -c 10
      celeryd -n video.myhost -c 3
      celeryd -n data.myhost -c 3
      
    • Также допускаются диапазоны и списки рабочих в опциях: (-c:1-3 можно также записать как -c:1,2,3)

      $ celeryd-multi start 5 -c 3  -c:1-3 10
      celeryd-multi -n celeryd1.myhost -c 10
      celeryd-multi -n celeryd2.myhost -c 10
      celeryd-multi -n celeryd3.myhost -c 10
      celeryd-multi -n celeryd4.myhost -c 3
      celeryd-multi -n celeryd5.myhost -c 3
      
    • Списки также работают с именованными работниками:

      $ celeryd-multi start foo bar baz xuzzy -c 3 -c:foo,bar,baz 10
      celeryd-multi -n foo.myhost -c 10
      celeryd-multi -n bar.myhost -c 10
      celeryd-multi -n baz.myhost -c 10
      celeryd-multi -n xuzzy.myhost -c 3
      
  • Теперь рабочий вызывает метод бэкенда результата process_cleanup после выполнения задачи, а не до.

  • Бэкенд результатов AMQP теперь поддерживает Pika.

Вернуться на верх