Рефактор «Большого экземпляра»

Ветка app является незавершенной работой по устранению использования глобальной конфигурации в Celery.

Теперь Celery можно инстанцировать, и несколько экземпляров Celery могут существовать в одном и том же пространстве процессов. Кроме того, крупные детали можно настраивать, не прибегая к «обезьяньим» исправлениям.

Примеры

Создание экземпляра Celery:

>>> from celery import Celery
>>> app = Celery()
>>> app.config_from_object('celeryconfig')
>>> #app.config_from_envvar('CELERY_CONFIG_MODULE')

Создание задач:

@app.task
def add(x, y):
    return x + y

Создание пользовательских подклассов Task:

Task = celery.create_task_cls()

class DebugTask(Task):

    def on_failure(self, *args, **kwargs):
        import pdb
        pdb.set_trace()

@app.task(base=DebugTask)
def add(x, y):
    return x + y

Запуск рабочего:

worker = celery.Worker(loglevel='INFO')

Получение доступа к конфигурации:

celery.conf.task_always_eager = True
celery.conf['task_always_eager'] = True

Контролирующие работники:

>>> celery.control.inspect().active()
>>> celery.control.rate_limit(add.name, '100/m')
>>> celery.control.broadcast('shutdown')
>>> celery.control.discard_all()

Другие интересные атрибуты:

# Establish broker connection.
>>> celery.broker_connection()

# AMQP Specific features.
>>> celery.amqp
>>> celery.amqp.Router
>>> celery.amqp.get_queues()
>>> celery.amqp.get_task_consumer()

# Loader
>>> celery.loader

# Default backend
>>> celery.backend

Как вы, вероятно, видите, это действительно открывает новые возможности настройки.

Утративший силу

  • celery.task.ping celery.task.PingTask

    Уступает команде дистанционного управления ping. Будет удалена в Celery 2.3.

Псевдонимы (ожидает устаревания)

  • celery.task.base
  • celery.task.sets
    • .TaskSet -> {app.TaskSet}

  • celery.decorators / celery.task
    • .task -> {app.task}

  • celery.execute
    • .apply_async -> {task.apply_async}

    • .apply -> {task.apply}

    • .send_task -> {app.send_task}

    • <<< 0 >> -> нет альтернатив.

  • celery.log
    • .get_default_logger -> {app.log.get_default_logger}

    • .setup_logger -> {app.log.setup_logger}

    • .get_task_logger -> {app.log.get_task_logger}

    • .setup_task_logger -> {app.log.setup_task_logger}

    • .setup_logging_subsystem -> {app.log.setup_logging_subsystem}

    • .redirect_stdouts_to_logger -> {app.log.redirect_stdouts_to_logger}

  • celery.messaging
    • .establish_connection -> {app.broker_connection}

    • .with_connection -> {app.with_connection}

    • .get_consumer_set -> {app.amqp.get_task_consumer}

    • .TaskPublisher -> {app.amqp.TaskPublisher}

    • .TaskConsumer -> {app.amqp.TaskConsumer}

    • .ConsumerSet -> {app.amqp.ConsumerSet}

  • celery.conf.* -> {app.conf}

    ПРИМЕЧАНИЕ: Все ключи конфигурации теперь называются так же, как и в конфигурации. Таким образом, доступ к ключу task_always_eager осуществляется как:

    >>> app.conf.task_always_eager
    

    вместо:

    >>> from celery import conf
    >>> conf.always_eager
    
    • .get_queues -> {app.amqp.get_queues}

  • celery.task.control
    • .broadcast -> {app.control.broadcast}

    • .rate_limit -> {app.control.rate_limit}

    • .ping -> {app.control.ping}

    • .revoke -> {app.control.revoke}

    • .discard_all -> {app.control.discard_all}

    • .inspect -> {app.control.inspect}

  • celery.utils.info
    • .humanize_seconds -> celery.utils.time.humanize_seconds

    • .textindent -> celery.utils.textindent

    • .get_broker_info -> {app.amqp.get_broker_info}

    • .format_broker_info -> {app.amqp.format_broker_info}

    • .format_queues -> {app.amqp.format_queues}

Использование приложений по умолчанию

Для обеспечения обратной совместимости должно быть возможно использовать все классы/функции без передачи явного экземпляра приложения.

Это достигается тем, что все объекты, зависящие от приложения, используют default_app, если экземпляр приложения отсутствует.

from celery.app import app_or_default

class SomeClass:

    def __init__(self, app=None):
        self.app = app_or_default(app)

Проблема такого подхода заключается в том, что существует вероятность того, что экземпляр приложения будет потерян по пути, а все вроде бы работает нормально. Проверить утечку экземпляра приложения сложно. Можно использовать переменную окружения CELERY_TRACE_APP, при включении которой celery.app.app_or_default() будет вызывать исключение всякий раз, когда придется возвращаться к экземпляру приложения по умолчанию.

Дерево зависимостей приложений

  • {app}
    • celery.loaders.base.BaseLoader

    • celery.backends.base.BaseBackend

    • {app.TaskSet}
      • celery.task.sets.TaskSet (app.TaskSet)

    • [app.TaskSetResult]
      • celery.result.TaskSetResult (app.TaskSetResult)

  • {app.AsyncResult}
    • celery.result.BaseAsyncResult / celery.result.AsyncResult

  • celery.bin.worker.WorkerCommand
    • celery.apps.worker.Worker
      • celery.worker.WorkerController
        • celery.worker.consumer.Consumer
          • celery.worker.request.Request

          • celery.events.EventDispatcher

          • celery.worker.control.ControlDispatch
            • celery.worker.control.registry.Panel

            • celery.pidbox.BroadcastPublisher

          • celery.pidbox.BroadcastConsumer

        • celery.beat.EmbeddedService

  • celery.bin.events.EvCommand
    • celery.events.snapshot.evcam
      • celery.events.snapshot.Polaroid

      • celery.events.EventReceiver

    • celery.events.cursesmon.evtop
      • celery.events.EventReceiver

      • celery.events.cursesmon.CursesMonitor

    • celery.events.dumper
      • celery.events.EventReceiver

  • celery.bin.amqp.AMQPAdmin

  • celery.bin.beat.BeatCommand
    • celery.apps.beat.Beat
      • celery.beat.Service
        • celery.beat.Scheduler

Вернуться на верх