Приложение

Библиотека Celery должна быть инстанцирована перед использованием, этот экземпляр называется приложением (или app для краткости).

Приложение является потокобезопасным, поэтому несколько приложений Celery с различными конфигурациями, компонентами и задачами могут сосуществовать в одном пространстве процессов.

Давайте создадим его сейчас:

>>> from celery import Celery
>>> app = Celery()
>>> app
<Celery __main__:0x100469fd0>

Последняя строка показывает текстовое представление приложения: включая имя класса app (Celery), имя текущего главного модуля (__main__) и адрес памяти объекта (0x100469fd0).

Основное название

Только одно из них важно, и это имя главного модуля. Давайте рассмотрим, почему это так.

Когда вы посылаете сообщение о задаче в Celery, это сообщение не содержит исходного кода, а только имя задачи, которую вы хотите выполнить. Это работает подобно тому, как работают имена хостов в интернете: каждый рабочий поддерживает отображение имен задач на их фактические функции, называемое реестром задач.

Всякий раз, когда вы определяете задачу, эта задача также добавляется в локальный реестр:

>>> @app.task
... def add(x, y):
...     return x + y

>>> add
<@task: __main__.add>

>>> add.name
__main__.add

>>> app.tasks['__main__.add']
<@task: __main__.add>

и здесь вы снова видите __main__; всякий раз, когда Celery не может определить, к какому модулю принадлежит функция, он использует имя главного модуля для генерации начала имени задачи.

Это является проблемой только в ограниченном наборе случаев использования:

  1. Если модуль, в котором определена задача, выполняется как программа.

  2. Если приложение создается в оболочке Python (REPL).

Например, здесь, где модуль задач также используется для запуска рабочего с app.worker_main():

tasks.py:

from celery import Celery
app = Celery()

@app.task
def add(x, y): return x + y

if __name__ == '__main__':
    app.worker_main()

При выполнении этого модуля задачи будут называться, начиная с «__main__», но когда модуль импортируется другим процессом, скажем, для вызова задачи, задачи будут называться, начиная с «tasks». (настоящее имя модуля):

>>> from tasks import add
>>> add.name
tasks.add

Вы можете задать другое имя для главного модуля:

>>> app = Celery('tasks')
>>> app.main
'tasks'

>>> @app.task
... def add(x, y):
...     return x + y

>>> add.name
tasks.add

См.также

Имена

Конфигурация

Существует несколько опций, которые могут изменить работу Celery. Эти параметры можно установить непосредственно на экземпляре приложения или использовать специальный модуль конфигурации.

Конфигурация доступна в виде app.conf:

>>> app.conf.timezone
'Europe/London'

где вы также можете напрямую задать значения конфигурации:

>>> app.conf.enable_utc = True

или обновить несколько ключей одновременно, используя метод update:

>>> app.conf.update(
...     enable_utc=True,
...     timezone='Europe/London',
...)

Объект конфигурации состоит из нескольких словарей, которые просматриваются по порядку:

  1. Изменения, внесенные во время выполнения.

  2. Модуль конфигурации (при наличии)

  3. Конфигурация по умолчанию (celery.app.defaults).

Вы можете даже добавить новые источники по умолчанию, используя метод app.add_defaults().

См.также

Перейдите в раздел Configuration reference для получения полного списка всех доступных настроек и их значений по умолчанию.

config_from_object

Метод app.config_from_object() загружает конфигурацию из объекта конфигурации.

Это может быть модуль конфигурации или любой объект с атрибутами конфигурации.

Обратите внимание, что любая ранее заданная конфигурация будет сброшена при вызове config_from_object(). Если вы хотите установить дополнительную конфигурацию, вы должны сделать это после.

Пример 1: Использование имени модуля

Метод app.config_from_object() может принимать полное имя модуля Python или даже имя атрибута Python, например: "celeryconfig", "myproj.config.celery" или "myproj.config:CeleryConfig":

from celery import Celery

app = Celery()
app.config_from_object('celeryconfig')

Модуль celeryconfig может выглядеть следующим образом:

celeryconfig.py:

enable_utc = True
timezone = 'Europe/London'

и приложение сможет использовать его до тех пор, пока import celeryconfig возможно.

Пример 2: Передача фактического объекта модуля

Вы также можете передать уже импортированный объект модуля, но это не всегда рекомендуется.

Совет

Рекомендуется использовать имя модуля, так как это означает, что модуль не нужно сериализовать при использовании пула prefork. Если у вас возникли проблемы с конфигурацией или ошибки pickle, попробуйте использовать имя модуля.

import celeryconfig

from celery import Celery

app = Celery()
app.config_from_object(celeryconfig)

Пример 3: Использование класса/объекта конфигурации

from celery import Celery

app = Celery()

class Config:
    enable_utc = True
    timezone = 'Europe/London'

app.config_from_object(Config)
# or using the fully qualified name of the object:
#   app.config_from_object('module:Config')

config_from_envvar

app.config_from_envvar() берет имя модуля конфигурации из переменной окружения

Например – для загрузки конфигурации из модуля, указанного в переменной окружения с именем CELERY_CONFIG_MODULE:

import os
from celery import Celery

#: Set default configuration module name
os.environ.setdefault('CELERY_CONFIG_MODULE', 'celeryconfig')

app = Celery()
app.config_from_envvar('CELERY_CONFIG_MODULE')

Затем вы можете указать модуль конфигурации для использования через среду:

$ CELERY_CONFIG_MODULE="celeryconfig.prod" celery worker -l INFO

Конфигурация с цензурой

Если вы когда-нибудь захотите распечатать конфигурацию в качестве отладочной информации или чего-то подобного, вы также можете захотеть отфильтровать конфиденциальную информацию, такую как пароли и ключи API.

Celery поставляется с несколькими утилитами, полезными для представления конфигурации, одна из них - humanize():

>>> app.conf.humanize(with_defaults=False, censored=True)

Этот метод возвращает конфигурацию в виде табулированной строки. По умолчанию она будет содержать только изменения конфигурации, но вы можете включить встроенные ключи и значения по умолчанию, включив аргумент with_defaults.

Если вместо этого вы хотите работать с конфигурацией как со словарем, вы можете использовать метод table():

>>> app.conf.table(with_defaults=False, censored=True)

Обратите внимание, что Celery не сможет удалить всю конфиденциальную информацию, поскольку он просто использует регулярное выражение для поиска ключей с общим названием. Если вы добавляете пользовательские настройки, содержащие конфиденциальную информацию, вам следует назвать ключи именами, которые Celery идентифицирует как секретные.

Настройка конфигурации будет подвергнута цензуре, если имя содержит любую из этих подстрок:

API, TOKEN, KEY, SECRET, PASS, SIGNATURE, DATABASE

Лень

Экземпляр приложения является «ленивым», то есть он не будет оцениваться до тех пор, пока он действительно не понадобится.

Создание экземпляра Celery приведет только к следующему:

  1. Создайте логический экземпляр часов, используемый для событий.

  2. Создайте реестр задач.

  3. Установить себя в качестве текущего приложения (но не в том случае, если аргумент set_as_current был отключен)

  4. Вызовите обратный вызов app.on_init() (по умолчанию ничего не делает).

Декораторы app.task() не создают задачи в момент определения задачи, вместо этого они откладывают создание задачи либо на момент использования задачи, либо после финализации приложения,

Этот пример показывает, что задача не создается до тех пор, пока вы не используете задачу или не получите доступ к атрибуту (в данном случае repr()):

>>> @app.task
>>> def add(x, y):
...    return x + y

>>> type(add)
<class 'celery.local.PromiseProxy'>

>>> add.__evaluated__()
False

>>> add        # <-- causes repr(add) to happen
<@task: __main__.add>

>>> add.__evaluated__()
True

Финализация приложения происходит либо явно путем вызова app.finalize() – либо неявно путем обращения к атрибуту app.tasks.

Доработка объекта будет:

  1. Копирование задач, которые должны быть разделены между приложениями

    По умолчанию задачи являются общими, но если аргумент shared в декораторе задачи отключен, то задача будет приватной для приложения, к которому она привязана.

  2. Оцените все находящиеся на рассмотрении декораторы задач.

  3. Убедитесь, что все задачи привязаны к текущему приложению.

    Задачи привязываются к приложению, чтобы они могли считывать значения по умолчанию из конфигурации.

Разрыв цепи

Хотя можно зависеть от установки текущего приложения, лучшей практикой является постоянная передача экземпляра приложения всему, что в нем нуждается.

Я называю это «цепочкой приложений», поскольку она создает цепочку экземпляров в зависимости от передаваемого приложения.

Следующий пример считается плохой практикой:

from celery import current_app

class Scheduler:

    def run(self):
        app = current_app

Вместо этого он должен принимать в качестве аргумента app:

class Scheduler:

    def __init__(self, app):
        self.app = app

Внутри Celery использует функцию celery.app.app_or_default(), чтобы все работало и в API совместимости на основе модулей

from celery.app import app_or_default

class Scheduler:
    def __init__(self, app=None):
        self.app = app_or_default(app)

В процессе разработки вы можете установить переменную окружения CELERY_TRACE_APP, чтобы вызвать исключение при разрыве цепочки приложений:

$ CELERY_TRACE_APP=1 celery worker -l INFO

Абстрактные задачи

Все задачи, созданные с помощью декоратора task(), будут наследоваться от базового класса приложения Task.

Вы можете указать другой базовый класс, используя аргумент base:

@app.task(base=OtherTask):
def add(x, y):
    return x + y

Для создания пользовательского класса задачи необходимо наследоваться от нейтрального базового класса: celery.Task.

from celery import Task

class DebugTask(Task):

    def __call__(self, *args, **kwargs):
        print('TASK STARTING: {0.name}[{0.request.id}]'.format(self))
        return self.run(*args, **kwargs)

Совет

Если вы переопределите метод __call__ задачи, то очень важно, чтобы вы также вызвали self.run для выполнения тела задачи. Не вызывайте super().__call__. Метод __call__ нейтрального базового класса celery.Task присутствует только для справки. Для оптимизации он был развернут в celery.app.trace.build_tracer.trace_task, который вызывает run непосредственно на пользовательском классе задачи, если не определен метод __call__.

Нейтральный базовый класс является особенным, поскольку он еще не привязан к какому-либо конкретному приложению. Когда задача будет привязана к приложению, она будет считывать конфигурацию, чтобы установить значения по умолчанию, и так далее.

Для реализации базового класса необходимо создать задачу, используя декоратор app.task():

@app.task(base=DebugTask)
def add(x, y):
    return x + y

Можно даже изменить базовый класс по умолчанию для приложения, изменив его атрибут app.Task():

>>> from celery import Celery, Task

>>> app = Celery()

>>> class MyBaseTask(Task):
...    queue = 'hipri'

>>> app.Task = MyBaseTask
>>> app.Task
<unbound MyBaseTask>

>>> @app.task
... def add(x, y):
...     return x + y

>>> add
<@task: __main__.add>

>>> add.__class__.mro()
[<class add of <Celery __main__:0x1012b4410>>,
 <unbound MyBaseTask>,
 <unbound Task>,
 <type 'object'>]
Вернуться на верх