Задачи

Задачи - это строительные блоки приложений Celery.

Задача - это класс, который может быть создан из любого вызываемого объекта. Он выполняет двойную роль, определяя, что происходит, когда задача вызывается (посылает сообщение), и что происходит, когда рабочий получает это сообщение.

Каждый класс задач имеет уникальное имя, и на это имя ссылаются в сообщениях, чтобы рабочий мог найти нужную функцию для выполнения.

Сообщение задачи не удаляется из очереди, пока это сообщение не будет acknowledged рабочим. Рабочий может резервировать много сообщений заранее, и даже если рабочий будет убит - из-за сбоя питания или по другой причине - сообщение будет повторно доставлено другому рабочему.

В идеале функции задач должны быть idempotent: это означает, что функция не вызовет непредвиденных эффектов даже при многократном вызове с одними и теми же аргументами. Поскольку рабочий не может определить, являются ли ваши задачи идемпотентными, поведение по умолчанию заключается в подтверждении сообщения заранее, непосредственно перед его выполнением, так что вызов задачи, которая уже началась, никогда не будет выполнен снова.

Если ваша задача идемпотентна, вы можете установить опцию acks_late, чтобы рабочий подтвердил сообщение после возвращения задачи. См. также статью FAQ Следует ли мне использовать retry или acks_late?.

Обратите внимание, что рабочий подтвердит сообщение, если дочерний процесс, выполняющий задание, будет завершен (либо вызовом задания sys.exit(), либо сигналом), даже если включено acks_late. Такое поведение является преднамеренным, поскольку…

  1. Мы не хотим повторно выполнять задачи, которые заставляют ядро посылать процессу SIGSEGV (segmentation fault) или подобные сигналы.

  2. Мы предполагаем, что системный администратор, намеренно убивающий задачу, не хочет, чтобы она автоматически перезапускалась.

  3. Задача, выделяющая слишком много памяти, рискует спровоцировать OOM-киллер ядра, что может повториться.

  4. Задача, которая всегда терпит неудачу при повторной доставке, может вызвать высокочастотный цикл сообщений, выводящий систему из строя.

Если вы действительно хотите, чтобы задание повторно доставлялось в этих сценариях, вам следует включить параметр task_reject_on_worker_lost.

Предупреждение

Задача, которая блокируется на неопределенное время, может в конечном итоге помешать рабочему экземпляру выполнять любую другую работу.

Если ваша задача выполняет ввод-вывод, то убедитесь, что вы добавили таймауты к этим операциям, например, добавив таймаут к веб-запросу с помощью библиотеки requests:

connect_timeout, read_timeout = 5.0, 30.0
response = requests.get(URL, timeout=(connect_timeout, read_timeout))

Time limits удобны для обеспечения своевременного возврата всех задач, но событие ограничения времени фактически приведет к принудительному завершению процесса, поэтому используйте их только для выявления случаев, когда вы еще не использовали ручные таймауты.

Планировщик пула prefork по умолчанию не очень дружелюбен к длительно выполняющимся задачам, поэтому если у вас есть задачи, которые выполняются в течение минут/часов, убедитесь, что вы включили аргумент командной строки -Ofair в celery worker. Более подробную информацию смотрите в Пределы предварительной выборки, а для достижения наилучшей производительности направляйте длительно выполняющиеся и коротко выполняющиеся задачи на выделенных рабочих (Автоматическая маршрутизация).

Если ваш рабочий зависает, пожалуйста, выясните, какие задачи запущены, прежде чем отправлять проблему, так как, скорее всего, зависание вызвано тем, что одна или несколько задач зависли на сетевой операции.

В этой главе вы узнаете все об определении задач, а это оглавление:

Основы

Вы можете легко создать задачу из любого вызываемого объекта с помощью декоратора task():

from .models import User

@app.task
def create_user(username, password):
    User.objects.create(username=username, password=password)

Существует также множество options, которые могут быть установлены для задачи, их можно указать в качестве аргументов декоратора:

@app.task(serializer='json')
def create_user(username, password):
    User.objects.create(username=username, password=password)

Связанные задачи

Привязка задачи означает, что первым аргументом задачи всегда будет экземпляр задачи (self), как и в привязанных методах Python:

logger = get_task_logger(__name__)

@app.task(bind=True)
def add(self, x, y):
    logger.info(self.request.id)

Связанные задачи нужны для повторных попыток (с помощью app.Task.retry()), для доступа к информации о текущем запросе задачи и для любой дополнительной функциональности, которую вы добавляете в базовые классы пользовательских задач.

Наследование задач

Аргумент base декоратора задачи задает базовый класс задачи:

import celery

class MyTask(celery.Task):

    def on_failure(self, exc, task_id, args, kwargs, einfo):
        print('{0!r} failed: {1!r}'.format(task_id, exc))

@app.task(base=MyTask)
def add(x, y):
    raise KeyError()

Имена

Каждая задача должна иметь уникальное имя.

Если явное имя не указано, декоратор задач сгенерирует его для вас, и это имя будет основано на 1) модуле, в котором определена задача, и 2) имени функции задачи.

Пример установки явного имени:

>>> @app.task(name='sum-of-two-numbers')
>>> def add(x, y):
...     return x + y

>>> add.name
'sum-of-two-numbers'

Лучшей практикой является использование имени модуля в качестве пространства имен, так имена не будут сталкиваться, если задача с таким именем уже определена в другом модуле.

>>> @app.task(name='tasks.add')
>>> def add(x, y):
...     return x + y

Вы можете узнать имя задачи, исследовав ее атрибут .name:

>>> add.name
'tasks.add'

Имя, которое мы указали здесь (tasks.add), является именно тем именем, которое было бы автоматически сгенерировано для нас, если бы задача была определена в модуле с именем tasks.py:

tasks.py:

@app.task
def add(x, y):
    return x + y
>>> from tasks import add
>>> add.name
'tasks.add'

Автоматическое именование и относительный импорт

Относительный импорт и автоматическая генерация имен не очень хорошо сочетаются, поэтому если вы используете относительный импорт, вам следует задать имя явно.

Например, если клиент импортирует модуль "myapp.tasks" как ".tasks", а рабочий импортирует модуль как "myapp.tasks", сгенерированные имена не совпадут, и рабочий выдаст ошибку NotRegistered.

Это также происходит при использовании Django и использовании именования в стиле project.myapp в INSTALLED_APPS:

INSTALLED_APPS = ['project.myapp']

Если вы установите приложение под именем project.myapp, то модуль задач будет импортирован как project.myapp.tasks, поэтому вы должны убедиться, что всегда импортируете задачи, используя одно и то же имя:

>>> from project.myapp.tasks import mytask   # << GOOD

>>> from myapp.tasks import mytask    # << BAD!!!

Во втором примере задача будет называться по-разному, поскольку рабочий и клиент импортируют модули под разными именами:

>>> from project.myapp.tasks import mytask
>>> mytask.name
'project.myapp.tasks.mytask'

>>> from myapp.tasks import mytask
>>> mytask.name
'myapp.tasks.mytask'

По этой причине вы должны быть последовательны в том, как вы импортируете модули, и это также является лучшей практикой Python.

Аналогично, не следует использовать относительный импорт старого стиля:

from module import foo   # BAD!

from proj.module import foo  # GOOD!

Относительные импорты нового стиля в порядке и могут быть использованы:

from .module import foo  # GOOD!

Если вы хотите использовать Celery в проекте, уже активно использующем эти паттерны, и у вас нет времени на рефакторинг существующего кода, вы можете рассмотреть возможность явного указания имен вместо того, чтобы полагаться на автоматическое именование:

@app.task(name='proj.tasks.add')
def add(x, y):
    return x + y

Изменение поведения автоматического именования

Добавлено в версии 4.0.

Есть некоторые случаи, когда автоматическое именование по умолчанию не подходит. Рассмотрим множество задач в разных модулях:

project/
       /__init__.py
       /celery.py
       /moduleA/
               /__init__.py
               /tasks.py
       /moduleB/
               /__init__.py
               /tasks.py

При использовании автоматического именования по умолчанию каждая задача будет иметь сгенерированное имя типа moduleA.tasks.taskA, moduleA.tasks.taskB, moduleB.tasks.test и так далее. Возможно, вы захотите избавиться от присутствия tasks в именах всех задач. Как указывалось выше, вы можете явно задать имена для всех задач, или изменить поведение автоматического именования, переопределив app.gen_task_name(). Продолжая пример, celery.py может содержать:

from celery import Celery

class MyCelery(Celery):

    def gen_task_name(self, name, module):
        if module.endswith('.tasks'):
            module = module[:-6]
        return super().gen_task_name(name, module)

app = MyCelery('main')

Таким образом, каждая задача будет иметь имя типа moduleA.taskA, moduleA.taskB и moduleB.test.

Предупреждение

Убедитесь, что ваша app.gen_task_name() является чистой функцией: это означает, что для одного и того же входа она всегда должна возвращать один и тот же выход.

Запрос задания

app.Task.request содержит информацию и состояние, связанные с текущей выполняемой задачей.

Запрос определяет следующие атрибуты:

id:

Уникальный идентификатор выполняемого задания.

группа:

Уникальный идентификатор group, если эта задача является членом.

аккорд:

Уникальный идентификатор аккорда, к которому принадлежит эта задача (если задача является частью заголовка).

идентификатор корреляции:

Пользовательский идентификатор, используемый для таких вещей, как дедупликация.

args:

Позиционные аргументы.

kwargs:

Аргументы с ключевыми словами.

происхождение:

Имя хоста, отправившего это задание.

повторные попытки:

Сколько раз текущая задача была повторно запущена. Целое число, начинающееся с 0.

нетерпеливый:

Устанавливается в True, если задание выполняется локально на клиенте, а не рабочим.

eta:

Первоначальное время прибытия задания (если таковое имеется). Это время в UTC (в зависимости от настройки enable_utc).

истекает:

Первоначальное время истечения срока действия задания (если таковое имеется). Указывается в UTC (в зависимости от настройки enable_utc).

имя хоста:

Имя узла рабочего экземпляра, выполняющего задание.

информация о доставке:

Дополнительная информация о доставке сообщения. Это отображение, содержащее ключ обмена и маршрутизации, использованный для доставки этой задачи. Используется, например, app.Task.retry() для повторной отправки задания в ту же очередь назначения. Наличие ключей в этом дикте зависит от используемого брокера сообщений.

ответ на:

Имя очереди, в которую будут отправляться ответы (используется, например, в бэкенде результатов RPC).

вызванный_прямо:

Этот флаг устанавливается в true, если задание не было выполнено рабочим.

временной лимит:

Кортеж текущих (soft, hard) временных ограничений, активных для этой задачи (если таковые имеются).

обратные вызовы:

Список сигнатур, которые будут вызваны в случае успешного завершения этой задачи.

errback:

Список сигнатур, которые будут вызваны в случае неудачи этой задачи.

utc:

Устанавливается в true, если у вызывающего абонента включен UTC (enable_utc).

Добавлено в версии 3.1.

заголовки:

Сопоставление заголовков сообщений, отправленных с этим сообщением задачи (может быть None).

ответ_на:

Куда отправить ответ (имя очереди).

идентификатор корреляции:

Обычно совпадает с идентификатором задачи, часто используется в amqp для отслеживания того, для чего был получен ответ.

Добавлено в версии 4.0.

root_id:

Уникальный идентификатор первой задачи в рабочем процессе, частью которого является эта задача (если таковые имеются).

родительский_ид:

Уникальный идентификатор задачи, которая вызвала эту задачу (если таковая имеется).

цепь:

Обратный список задач, образующих цепочку (если таковые имеются). Последний элемент в этом списке будет следующей задачей, сменяющей текущую задачу. При использовании первой версии протокола задач задачи цепочки будут находиться в request.callbacks.

Пример

Примером задания для доступа к информации в контексте является:

@app.task(bind=True)
def dump_context(self, x, y):
    print('Executing task id {0.id}, args: {0.args!r} kwargs: {0.kwargs!r}'.format(
            self.request))

Аргумент bind означает, что функция будет «связанным методом», так что вы сможете получить доступ к атрибутам и методам экземпляра типа задачи.

Ведение журнала

Рабочий автоматически настроит ведение журнала для вас, или вы можете настроить ведение журнала вручную.

Существует специальный логгер с именем «celery.task», вы можете наследоваться от этого логгера, чтобы автоматически получать имя задачи и уникальный идентификатор как часть логов.

Лучшей практикой является создание общего регистратора для всех ваших задач в верхней части вашего модуля:

from celery.utils.log import get_task_logger

logger = get_task_logger(__name__)

@app.task
def add(x, y):
    logger.info('Adding {0} + {1}'.format(x, y))
    return x + y

Celery использует стандартную библиотеку Python logger, а документацию можно найти here.

Вы также можете использовать print(), поскольку все, что записывается в стандартный out/-err, будет перенаправлено в систему протоколирования (вы можете отключить это, см. worker_redirect_stdouts).

Примечание

Рабочий не будет обновлять перенаправление, если вы создадите экземпляр регистратора где-нибудь в задаче или модуле задачи.

Если вы хотите перенаправить sys.stdout и sys.stderr в пользовательский логгер, вы должны включить это вручную, например:

import sys

logger = get_task_logger(__name__)

@app.task(bind=True)
def add(self, x, y):
    old_outs = sys.stdout, sys.stderr
    rlevel = self.app.conf.worker_redirect_stdouts_level
    try:
        self.app.log.redirect_stdouts_to_logger(logger, rlevel)
        print('Adding {0} + {1}'.format(x, y))
        return x + y
    finally:
        sys.stdout, sys.stderr = old_outs

Примечание

Если определенный логгер Celery, который вам нужен, не выдает журналы, вам следует проверить, правильно ли он распространяется. В этом примере «celery.app.trace» включен, чтобы выводились журналы «succeeded in»:

import celery
import logging

@celery.signals.after_setup_logger.connect
def on_after_setup_logger(**kwargs):
    logger = logging.getLogger('celery')
    logger.propagate = True
    logger = logging.getLogger('celery.app.trace')
    logger.propagate = True

Примечание

Если вы хотите полностью отключить настройку Celery logging, используйте сигнал setup_logging:

import celery

@celery.signals.setup_logging.connect
def on_setup_logging(**kwargs):
    pass

Проверка аргументов

Добавлено в версии 4.0.

Celery проверит переданные аргументы при вызове задачи, как это делает Python при вызове обычной функции:

>>> @app.task
... def add(x, y):
...     return x + y

# Calling the task with two arguments works:
>>> add.delay(8, 8)
<AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c>

# Calling the task with only one argument fails:
>>> add.delay(8)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "celery/app/task.py", line 376, in delay
    return self.apply_async(args, kwargs)
  File "celery/app/task.py", line 485, in apply_async
    check_arguments(*(args or ()), **(kwargs or {}))
TypeError: add() takes exactly 2 arguments (1 given)

Вы можете отключить проверку аргументов для любой задачи, установив ее атрибут typing в значение False:

>>> @app.task(typing=False)
... def add(x, y):
...     return x + y

# Works locally, but the worker receiving the task will raise an error.
>>> add.delay(8)
<AsyncResult: f59d71ca-1549-43e0-be41-4e8821a83c0c>

Сокрытие конфиденциальной информации в аргументах

Добавлено в версии 4.0.

При использовании task_protocol 2 и выше (по умолчанию с версии 4.0) вы можете переопределить способ представления позиционных аргументов и ключевых слов в журналах и событиях мониторинга с помощью вызывающих аргументов argsrepr и << 2 >>>:

>>> add.apply_async((2, 3), argsrepr='(<secret-x>, <secret-y>)')

>>> charge.s(account, card='1234 5678 1234 5678').set(
...     kwargsrepr=repr({'card': '**** **** **** 5678'})
... ).delay()

Предупреждение

Чувствительная информация все равно будет доступна любому, кто сможет прочитать ваше сообщение с заданием от брокера или иным образом перехватить его.

По этой причине вам, вероятно, следует зашифровать ваше сообщение, если оно содержит конфиденциальную информацию, или в данном примере с номером кредитной карты фактический номер может храниться в зашифрованном виде в безопасном хранилище, которое вы получаете и расшифровываете в самой задаче.

Повторная попытка

app.Task.retry() может использоваться для повторного выполнения задания, например, в случае восстанавливаемых ошибок.

Когда вы вызовете retry, он отправит новое сообщение, используя тот же идентификатор задачи, и позаботится о том, чтобы сообщение было доставлено в ту же очередь, что и исходная задача.

При повторной попытке выполнения задачи это также записывается как состояние задачи, так что вы можете отслеживать ход выполнения задачи с помощью экземпляра результата (см. Государства).

Вот пример с использованием retry:

@app.task(bind=True)
def send_twitter_status(self, oauth, tweet):
    try:
        twitter = Twitter(oauth)
        twitter.update_status(tweet)
    except (Twitter.FailWhaleError, Twitter.LoginError) as exc:
        raise self.retry(exc=exc)

Примечание

Вызов app.Task.retry() вызовет исключение, поэтому любой код после повторной попытки не будет достигнут. Это исключение Retry, оно обрабатывается не как ошибка, а как полупредикат, сигнализирующий рабочему, что задание должно быть повторено, чтобы он мог сохранить правильное состояние при включении бэкенда результата.

Это нормальная операция и происходит всегда, если только аргумент throw для повторной попытки не установлен на False.

Аргумент bind декоратора задачи предоставляет доступ к self (экземпляр типа задачи).

Аргумент exc используется для передачи информации об исключении, которая используется в журналах и при хранении результатов задачи. И исключение, и обратная трассировка будут доступны в состоянии задачи (если включен бэкенд результатов).

Если задача имеет значение max_retries, текущее исключение будет поднято повторно, если превышено максимальное количество повторных попыток, но этого не произойдет, если:

  • Не был указан аргумент exc.

    В этом случае будет вызвано исключение MaxRetriesExceededError.

  • В настоящее время нет исключений

    Если нет исходного исключения для повторного повышения, то вместо него будет использован аргумент exc, таким образом:

    self.retry(exc=Twitter.LoginError())
    

    поднимет заданный аргумент exc.

Использование пользовательской задержки повторной попытки

Когда задача должна быть повторно запущена, она может подождать определенное количество времени, и задержка по умолчанию определяется атрибутом default_retry_delay. По умолчанию она установлена на 3 минуты. Обратите внимание, что единица измерения задержки - секунды (int или float).

Вы также можете указать аргумент countdown в retry(), чтобы отменить это значение по умолчанию.

@app.task(bind=True, default_retry_delay=30 * 60)  # retry in 30 minutes.
def add(self, x, y):
    try:
        something_raising()
    except Exception as exc:
        # overrides the default delay to retry after 1 minute
        raise self.retry(exc=exc, countdown=60)

Автоматические повторные попытки для известных исключений

Добавлено в версии 4.0.

Иногда вы просто хотите повторить выполнение задачи при возникновении определенного исключения.

К счастью, вы можете указать Celery на автоматическое повторение задачи, используя аргумент autoretry_for в декораторе task():

from twitter.exceptions import FailWhaleError

@app.task(autoretry_for=(FailWhaleError,))
def refresh_timeline(user):
    return twitter.refresh_timeline(user)

Если вы хотите указать пользовательские аргументы для внутреннего вызова retry(), передайте декоратору task() аргумент retry_kwargs:

@app.task(autoretry_for=(FailWhaleError,),
          retry_kwargs={'max_retries': 5})
def refresh_timeline(user):
    return twitter.refresh_timeline(user)

Это предоставляется как альтернатива ручной обработке исключений, и в приведенном выше примере будет сделано то же самое, что и при обертывании тела задачи в tryexcept:

@app.task
def refresh_timeline(user):
    try:
        twitter.refresh_timeline(user)
    except FailWhaleError as exc:
        raise div.retry(exc=exc, max_retries=5)

Если вы хотите автоматически повторять попытку при любой ошибке, просто используйте:

@app.task(autoretry_for=(Exception,))
def x():
    ...

Добавлено в версии 4.2.

Если ваши задачи зависят от другого сервиса, например, от запроса к API, то целесообразно использовать exponential backoff, чтобы не перегружать сервис своими запросами. К счастью, поддержка Celery автоматического повтора позволяет сделать это легко. Просто укажите аргумент retry_backoff, например, так:

from requests.exceptions import RequestException

@app.task(autoretry_for=(RequestException,), retry_backoff=True)
def x():
    ...

По умолчанию экспоненциальный откат также вводит случайный джиттер, чтобы избежать выполнения всех задач в один и тот же момент. Кроме того, максимальная задержка отката ограничена 10 минутами. Все эти параметры можно настроить с помощью опций, описанных ниже.

Добавлено в версии 4.4.

Вы также можете установить параметры autoretry_for, max_retries, retry_backoff, retry_backoff_max и retry_jitter в заданиях на основе классов:

class BaseTaskWithRetry(Task):
    autoretry_for = (TypeError,)
    max_retries = 5
    retry_backoff = True
    retry_backoff_max = 700
    retry_jitter = False
Task.autoretry_for

Список/кортеж классов исключений. Если во время выполнения задачи возникнет любое из этих исключений, задача будет автоматически повторена. По умолчанию ни одно исключение не будет автоматически перепроведено.

Task.max_retries

Число. Максимальное количество повторных попыток перед сдачей. Значение None означает, что задача будет повторять попытки вечно. По умолчанию этот параметр имеет значение 3.

Task.retry_backoff

Булево значение или число. Если этот параметр установлен в True, автоповторы будут отложены по правилам exponential backoff. Первая попытка будет иметь задержку в 1 секунду, вторая - в 2 секунды, третья - в 4 секунды, четвертая - в 8 секунд и так далее. (Однако это значение задержки изменяется параметром retry_jitter, если он включен.) Если этот параметр установлен в число, оно используется в качестве коэффициента задержки. Например, если эта опция установлена в 3, первая повторная попытка задержится на 3 секунды, вторая - на 6 секунд, третья - на 12 секунд, четвертая - на 24 секунды и так далее. По умолчанию эта опция установлена в False, и автоповторы не задерживаются.

Task.retry_backoff_max

Число. Если включена опция retry_backoff, то она устанавливает максимальную задержку в секундах между авторежимами задач. По умолчанию эта опция установлена на 600, что составляет 10 минут.

Task.retry_jitter

Булево значение. Jitter используется для внесения случайности в экспоненциальные задержки обратного хода, чтобы предотвратить одновременное выполнение всех задач в очереди. Если эта опция установлена в True, значение задержки, вычисленное retry_backoff, рассматривается как максимальное, а фактическое значение задержки будет случайным числом между нулем и этим максимумом. По умолчанию эта опция установлена в True.

Список опций

Декоратор задачи может принимать ряд опций, которые изменяют поведение задачи, например, вы можете установить предел скорости для задачи с помощью опции rate_limit.

Любой аргумент ключевого слова, переданный декоратору задач, будет фактически установлен как атрибут результирующего класса задач, а это список встроенных атрибутов.

Общий

Task.name

Имя, под которым зарегистрировано задание.

Вы можете задать это имя вручную, или имя будет сгенерировано автоматически, используя имя модуля и класса.

См. также Имена.

Task.request

Если задача выполняется, то здесь будет содержаться информация о текущем запросе. Используется локальное хранилище Thread.

См. Запрос задания.

Task.max_retries

Применяется, только если задача вызывает self.retry или если задача украшена аргументом autoretry_for.

Максимальное количество повторных попыток перед тем, как сдаться. Если число повторных попыток превышает это значение, будет вызвано исключение MaxRetriesExceededError.

Примечание

Вы должны вызвать retry() вручную, так как он не будет автоматически повторять попытку при исключении…

По умолчанию установлено значение 3. Значение None отключает лимит повторных попыток, и задача будет повторять попытки до тех пор, пока не добьется успеха.

Task.throws

Необязательный кортеж ожидаемых классов ошибок, которые не должны рассматриваться как фактическая ошибка.

Ошибки из этого списка будут сообщены бэкенду результатов как сбой, но рабочий не будет регистрировать событие как ошибку, и трассировка не будет включена.

Пример:

@task(throws=(KeyError, HttpNotFound)):
def get_foo():
    something()

Типы ошибок:

  • Ожидаемые ошибки (в Task.throws)

    Зарегистрирован с серьезностью INFO, отслеживание исключено.

  • Неожиданные ошибки

    Зафиксировано с серьезностью ERROR, с включенным отслеживанием.

Task.default_retry_delay

Время по умолчанию в секундах до повторного выполнения задания. Может быть либо int, либо float. По умолчанию это трехминутная задержка.

Task.rate_limit

Установите лимит скорости для данного типа задач (ограничивает количество задач, которые могут быть запущены за определенный промежуток времени). Задачи все равно будут завершаться, когда действует ограничение скорости, но может пройти некоторое время, прежде чем будет разрешен их запуск.

Если это значение None, ограничение скорости не действует. Если это целое число или float, то оно интерпретируется как «задач в секунду».

Ограничения скорости могут быть указаны в секундах, минутах или часах путем добавления к значению «/s», «/m» или «/h». Задания будут равномерно распределены по указанному временному интервалу.

Пример: «100/m» (сто задач в минуту). Это обеспечит минимальную задержку в 600 мс между запуском двух задач на одном и том же экземпляре рабочего.

По умолчанию установлено значение task_default_rate_limit: если не указано, ограничение скорости для задач по умолчанию отключено.

Обратите внимание, что это ограничение скорости для каждого рабочего экземпляра, а не глобальное ограничение скорости. Чтобы обеспечить глобальное ограничение скорости (например, для API с максимальным количеством запросов в секунду), вы должны ограничиться определенной очередью.

Task.time_limit

Жесткий лимит времени, в секундах, для этого задания. Если не задано, используется значение по умолчанию для рабочих.

Task.soft_time_limit

Мягкий лимит времени для этого задания. Если не задано, используется значение по умолчанию.

Task.ignore_result

Не храните состояние задачи. Обратите внимание, что это означает, что вы не можете использовать AsyncResult для проверки готовности задачи или получения ее возвращаемого значения.

Task.store_errors_even_if_ignored

Если True, ошибки будут сохранены, даже если задача настроена на игнорирование результатов.

Task.serializer

Строка, определяющая используемый по умолчанию метод сериализации. По умолчанию используется значение task_serializer. Может быть pickle, json, yaml или любой пользовательский метод сериализации, который был зарегистрирован в kombu.serialization.registry.

Для получения дополнительной информации см. раздел Сериализаторы.

Task.compression

Строка, определяющая используемую по умолчанию схему сжатия.

По умолчанию используется значение task_compression. Может быть gzip, или bzip2, или любая пользовательская схема сжатия, зарегистрированная в реестре kombu.compression.

Для получения дополнительной информации см. раздел Компрессия.

Task.backend

Бэкенд хранилища результатов, который будет использоваться для этой задачи. Экземпляр одного из классов бэкендов в celery.backends. По умолчанию используется app.backend, определяемый параметром result_backend.

Task.acks_late

Если установлено значение True сообщения для этой задачи будут подтверждены после выполнения задачи, а не только перед (поведение по умолчанию).

Примечание: Это означает, что задание может быть выполнено несколько раз, если рабочий сломается в середине выполнения. Убедитесь, что ваши задачи имеют значение idempotent.

Глобальное значение по умолчанию может быть отменено параметром task_acks_late.

Task.track_started

Если True, задача будет сообщать о своем статусе как «запущена», когда задача выполняется рабочим. Значение по умолчанию False, так как обычное поведение заключается в том, чтобы не сообщать о таком уровне детализации. Задачи либо ожидают выполнения, либо завершены, либо ожидают повторной попытки. Наличие статуса «начато» может быть полезно, когда есть долго выполняющиеся задачи и есть необходимость сообщить, какая задача выполняется в данный момент.

Имя хоста и идентификатор процесса рабочего, выполняющего задание, будут доступны в метаданных состояния (например, result.info[„pid“])

Глобальное значение по умолчанию может быть отменено параметром task_track_started.

См.также

Ссылка на API для Task.

Государства

Celery может отслеживать текущее состояние задачи. Состояние также содержит результат успешного выполнения задачи или исключение и информацию об отслеживание неудачной задачи.

Существует несколько результативных бэкендов на выбор, и все они имеют различные сильные и слабые стороны (см. Бэкенды результатов).

В течение своей жизни задача будет переходить через несколько возможных состояний, и каждое состояние может иметь произвольные метаданные, прикрепленные к нему. Когда задача переходит в новое состояние, предыдущее состояние забывается, но некоторые переходы могут быть выведены (например, задача, находящаяся сейчас в состоянии FAILED, подразумевается, что в какой-то момент она была в состоянии STARTED).

Существуют также наборы состояний, например, набор FAILURE_STATES, и набор READY_STATES.

Клиент использует принадлежность этих наборов, чтобы решить, следует ли повторно поднять исключение (PROPAGATE_STATES), или можно ли кэшировать состояние (можно, если задача готова).

Вы также можете определить Нестандартные состояния.

Бэкенды результатов

Если вы хотите отслеживать задачи или вам нужны возвращаемые значения, то Celery должен хранить или отправлять состояния куда-то, чтобы их можно было получить позже. Есть несколько встроенных бэкендов результатов на выбор: SQLAlchemy/Django ORM, Memcached, RabbitMQ/QPid (rpc) и Redis - или вы можете определить свой собственный.

Ни один бэкенд не подходит для каждого случая использования. Вам следует ознакомиться с сильными и слабыми сторонами каждого бэкенда и выбрать наиболее подходящий для ваших нужд.

Предупреждение

Бэкенды используют ресурсы для хранения и передачи результатов. Чтобы гарантировать освобождение ресурсов, вы должны в конечном итоге вызвать get() или forget() на КАЖДОМ экземпляре AsyncResult, возвращенном после вызова задачи.

RPC Result Backend (RabbitMQ/QPid)

Бэкэнд результатов RPC (rpc://) является особенным, поскольку он не хранит состояния, а отправляет их в виде сообщений. Это важное отличие, поскольку оно означает, что результат может быть получен только один раз, и только клиентом, который инициировал задачу. Два разных процесса не могут ждать одного и того же результата.

Даже с учетом этого ограничения, это отличный выбор, если вам нужно получать изменения состояния в режиме реального времени. Использование обмена сообщениями означает, что клиенту не нужно опрашивать о новых состояниях.

По умолчанию сообщения являются переходными (непостоянными), поэтому результаты исчезнут, если брокер перезапустится. Вы можете настроить бэкенд результатов на отправку постоянных сообщений с помощью параметра result_persistent.

Бэкенд результатов базы данных

Хранение состояния в базе данных может быть удобным для многих, особенно для веб-приложений с уже существующей базой данных, но оно также имеет свои ограничения.

  • Опрос базы данных на предмет новых состояний стоит дорого, поэтому следует увеличить интервалы опроса операций, таких как result.get().

  • Некоторые базы данных используют уровень изоляции транзакций по умолчанию, который не подходит для опроса таблиц на предмет изменений.

    В MySQL уровень изоляции транзакций по умолчанию - REPEATABLE-READ: это означает, что транзакция не увидит изменений, сделанных другими транзакциями, пока текущая транзакция не будет зафиксирована.

    Рекомендуется изменить его на уровень изоляции READ-COMMITTED.

Встроенные государства

ПОДРОБНЕЕ

Задача ожидает выполнения или неизвестна. Любой неизвестный идентификатор задачи считается находящимся в состоянии ожидания.

НАЧАЛО

Задание было запущено. По умолчанию не сообщается, для включения смотрите app.Task.track_started.

мета-данные:

pid и hostname рабочего процесса, выполняющего задание.

УСПЕХ

Задание успешно выполнено.

мета-данные:

result содержит возвращаемое значение задачи.

распространяет:

Да

готово:

Да

НЕИСПРАВНОСТЬ

Выполнение задания завершилось неудачей.

мета-данные:

result содержит возникшее исключение, а traceback содержит обратную трассировку стека в момент, когда возникло исключение.

распространяет:

Да

RETRY

Задание повторно выполняется.

мета-данные:

result содержит исключение, вызвавшее повторную попытку, а traceback содержит обратную трассировку стека в момент, когда было вызвано исключение.

распространяет:

Нет

ОТМЕНЕН

Задание было отменено.

распространяет:

Да

Нестандартные состояния

Вы можете легко определить свои собственные состояния, все, что вам нужно - это уникальное имя. Имя состояния обычно представляет собой строку в верхнем регистре. В качестве примера вы можете посмотреть на abortable tasks, которое определяет пользовательское состояние ABORTED.

Используйте update_state() для обновления состояния задачи:.

@app.task(bind=True)
def upload_files(self, filenames):
    for i, file in enumerate(filenames):
        if not self.request.called_directly:
            self.update_state(state='PROGRESS',
                meta={'current': i, 'total': len(filenames)})

Здесь я создал состояние «PROGRESS», сообщающее любому приложению, знающему об этом состоянии, что задача в данный момент находится в процессе выполнения, а также о том, на каком этапе процесса она находится, благодаря наличию подсчетов current и total как части мета-данных состояния. Это может быть использовано, например, для создания индикаторов выполнения.

Создание выбираемых исключений

Редко известный факт Python заключается в том, что исключения должны соответствовать некоторым простым правилам, чтобы их можно было сериализовать с помощью модуля pickle.

Задачи, вызывающие исключения, которые не являются pickleable, не будут работать должным образом, когда Pickle используется в качестве сериализатора.

Чтобы убедиться в том, что ваши исключения могут быть pickleable, исключение * ДОЛЖНО* предоставлять исходные аргументы, с которыми оно было создано, в своем атрибуте .args. Самый простой способ обеспечить это - сделать так, чтобы исключение вызывало Exception.__init__.

Давайте рассмотрим несколько примеров, которые работают, и один, который не работает:

# OK:
class HttpError(Exception):
    pass

# BAD:
class HttpError(Exception):

    def __init__(self, status_code):
        self.status_code = status_code

# OK:
class HttpError(Exception):

    def __init__(self, status_code):
        self.status_code = status_code
        Exception.__init__(self, status_code)  # <-- REQUIRED

Поэтому правило таково: Для любого исключения, поддерживающего пользовательские аргументы *args, необходимо использовать Exception.__init__(self, *args).

Специальной поддержки для ключевых аргументов не существует, поэтому если вы хотите сохранить ключевые аргументы при распаковке исключения, вы должны передавать их как обычные args:

class HttpError(Exception):

    def __init__(self, status_code, headers=None, body=None):
        self.status_code = status_code
        self.headers = headers
        self.body = body

        super(HttpError, self).__init__(status_code, headers, body)

Полупредикаты

Рабочий обертывает задачу в функцию трассировки, которая записывает конечное состояние задачи. Существует ряд исключений, которые могут быть использованы для сигнализации этой функции об изменении того, как она обрабатывает возврат задачи.

Игнорировать

Задача может поднять Ignore, чтобы заставить рабочего игнорировать задачу. Это означает, что для задачи не будет записано состояние, но сообщение будет подтверждено (удалено из очереди).

Это можно использовать, если вы хотите реализовать пользовательскую функциональность, подобную отзыву, или вручную сохранить результат задачи.

Пример хранения отозванных задач в наборе Redis:

from celery.exceptions import Ignore

@app.task(bind=True)
def some_task(self):
    if redis.ismember('tasks.revoked', self.request.id):
        raise Ignore()

Пример, в котором результаты сохраняются вручную:

from celery import states
from celery.exceptions import Ignore

@app.task(bind=True)
def get_tweets(self, user):
    timeline = twitter.get_timeline(user)
    if not self.request.called_directly:
        self.update_state(state=states.SUCCESS, meta=timeline)
    raise Ignore()

Отклонить

Задача может поднять Reject, чтобы отклонить сообщение задачи, используя метод AMQPs basic_reject. Это не будет иметь никакого эффекта, если не включено Task.acks_late.

Отклонение сообщения имеет тот же эффект, что и подтверждение, но некоторые брокеры могут реализовать дополнительную функциональность, которую можно использовать. Например, RabbitMQ поддерживает концепцию Dead Letter Exchanges, где очередь может быть настроена на использование обмена мертвыми буквами, на которые повторно доставляются отклоненные сообщения.

Reject также может быть использован для повторной очереди сообщений, но, пожалуйста, будьте очень осторожны при его использовании, так как это может легко привести к бесконечному циклу сообщений.

Пример использования reject, когда задача вызывает состояние out of memory:

import errno
from celery.exceptions import Reject

@app.task(bind=True, acks_late=True)
def render_scene(self, path):
    file = get_file(path)
    try:
        renderer.render_scene(file)

    # if the file is too big to fit in memory
    # we reject it so that it's redelivered to the dead letter exchange
    # and we can manually inspect the situation.
    except MemoryError as exc:
        raise Reject(exc, requeue=False)
    except OSError as exc:
        if exc.errno == errno.ENOMEM:
            raise Reject(exc, requeue=False)

    # For any other error we retry after 10 seconds.
    except Exception as exc:
        raise self.retry(exc, countdown=10)

Пример повторной регистрации сообщения:

from celery.exceptions import Reject

@app.task(bind=True, acks_late=True)
def requeues(self):
    if not self.request.delivery_info['redelivered']:
        raise Reject('no reason', requeue=True)
    print('received two times')

Для получения более подробной информации о методе basic_reject обратитесь к документации вашего брокера.

Повторная попытка

Исключение Retry поднимается методом Task.retry, чтобы сообщить работнику о повторной попытке выполнения задания.

Пользовательские классы задач

Все задачи наследуются от класса app.Task. Метод run() становится телом задачи.

В качестве примера можно привести следующий код,

@app.task
def add(x, y):
    return x + y

будет делать примерно то же самое за кулисами:

class _AddTask(app.Task):

    def run(self, x, y):
        return x + y
add = app.tasks[_AddTask.name]

Instantiation

Задача не создается для каждого запроса, но регистрируется в реестре задач как глобальный экземпляр.

Это означает, что конструктор __init__ будет вызываться только один раз для каждого процесса, и что класс задачи семантически ближе к Actor.

Если у вас есть задача,

from celery import Task

class NaiveAuthenticateServer(Task):

    def __init__(self):
        self.users = {'george': 'password'}

    def run(self, username, password):
        try:
            return self.users[username] == password
        except KeyError:
            return False

Если вы направляете каждый запрос в один и тот же процесс, то он будет сохранять состояние между запросами.

Это также может быть полезно для кэширования ресурсов, например, базовый класс Task, который кэширует соединение с базой данных:

from celery import Task

class DatabaseTask(Task):
    _db = None

    @property
    def db(self):
        if self._db is None:
            self._db = Database.connect()
        return self._db

Использование для каждой задачи

Вышеперечисленное можно добавить к каждой задаче следующим образом:

@app.task(base=DatabaseTask)
def process_rows():
    for row in process_rows.db.table.all():
        process_row(row)

Тогда атрибут db задачи process_rows будет всегда оставаться одним и тем же в каждом процессе.

Использование в масштабах всего приложения

Вы также можете использовать ваш пользовательский класс во всем приложении Celery, передав его в качестве аргумента task_cls при инстанцировании приложения. Этот аргумент должен быть либо строкой, указывающей путь в python к вашему классу Task, либо самим классом:

from celery import Celery

app = Celery('tasks', task_cls='your.module.path:DatabaseTask')

В результате все ваши задачи, объявленные с использованием синтаксиса декоратора в вашем приложении, будут использовать ваш класс DatabaseTask и будут иметь атрибут << 1 >>>.

Значение по умолчанию - класс, предоставляемый Celery: 'celery.app.task:Task'.

Обработчики

after_return(self, status, retval, task_id, args, kwargs, einfo)

Обработчик, вызываемый после возврата задачи.

Parameters:
  • status – Текущее состояние задачи.

  • retval – Возвращаемое значение/исключение задачи.

  • task_id – Уникальный идентификатор задания.

  • args – Исходные аргументы для задачи, которая вернулась.

  • kwargs – Исходные аргументы ключевых слов для задачи, которая вернулась.

Именованные аргументы:

einfoExceptionInfo экземпляр, содержащий обратную трассировку (если таковая имеется).

Возвращаемое значение этого обработчика игнорируется.

on_failure(self, exc, task_id, args, kwargs, einfo)

Он запускается рабочим, когда задание не выполнено.

Parameters:
  • exc – Исключение, вызванное задачей.

  • task_id – Уникальный идентификатор неудачного задания.

  • args – Исходные аргументы для задания, которое не удалось выполнить.

  • kwargs – Исходные аргументы ключевых слов для задания, которое не удалось выполнить.

Именованные аргументы:

einfoExceptionInfo экземпляр, содержащий отслеживание.

Возвращаемое значение этого обработчика игнорируется.

on_retry(self, exc, task_id, args, kwargs, einfo)

Он запускается рабочим, когда задание должно быть повторно выполнено.

Parameters:
  • exc – Исключение, отправленное в retry().

  • task_id – Уникальный идентификатор повторного задания.

  • args – Исходные аргументы для повторно выполняемого задания.

  • kwargs – Исходные аргументы ключевых слов для повторно выполняемого задания.

Именованные аргументы:

einfoExceptionInfo экземпляр, содержащий отслеживание.

Возвращаемое значение этого обработчика игнорируется.

on_success(self, retval, task_id, args, kwargs)

Выполняется рабочим, если задание выполнено успешно.

Parameters:
  • retval – Возвращаемое значение задачи.

  • task_id – Уникальный идентификатор выполненного задания.

  • args – Исходные аргументы для выполняемого задания.

  • kwargs – Исходные аргументы ключевых слов для выполняемого задания.

Возвращаемое значение этого обработчика игнорируется.

Запросы и пользовательские запросы

Получив сообщение о необходимости выполнения задания, рабочий создает запрос для представления такого требования.

Пользовательские классы задач могут переопределить, какой класс запроса использовать, изменив атрибут celery.app.task.Task.Task.Request. Вы можете назначить либо сам класс пользовательского запроса, либо его полное имя.

У запроса есть несколько обязанностей. Пользовательские классы запросов должны покрывать их все - они отвечают за фактический запуск и отслеживание задания. Мы настоятельно рекомендуем наследоваться от celery.worker.request.Request.

При использовании pre-forking worker, методы on_timeout() и on_failure() выполняются в основном процессе worker. Приложение может использовать это средство для обнаружения сбоев, которые не были обнаружены с помощью celery.app.task.Task.on_failure().

В качестве примера, следующий пользовательский запрос обнаруживает и регистрирует жесткие временные ограничения и другие сбои.

import logging
from celery import Task
from celery.worker.request import Request

logger = logging.getLogger('my.package')

class MyRequest(Request):
    'A minimal custom request to log failures and hard time limits.'

    def on_timeout(self, soft, timeout):
        super(MyRequest, self).on_timeout(soft, timeout)
        if not soft:
           logger.warning(
               'A hard timeout was enforced for task %s',
               self.task.name
           )

    def on_failure(self, exc_info, send_failed_event=True, return_ok=False):
        super().on_failure(
            exc_info,
            send_failed_event=send_failed_event,
            return_ok=return_ok
        )
        logger.warning(
            'Failure detected for task %s',
            self.task.name
        )

class MyTask(Task):
    Request = MyRequest  # you can use a FQN 'my.package:MyRequest'

@app.task(base=MyTask)
def some_longrunning_task():
    # use your imagination

Как это работает

А вот и технические подробности. Эта часть не является тем, что вам нужно знать, но вам может быть интересно.

Все определенные задачи перечислены в реестре. Реестр содержит список имен задач и их классов. Вы можете самостоятельно исследовать этот реестр:

>>> from proj.celery import app
>>> app.tasks
{'celery.chord_unlock':
    <@task: celery.chord_unlock>,
 'celery.backend_cleanup':
    <@task: celery.backend_cleanup>,
 'celery.chord':
    <@task: celery.chord>}

Это список задач, встроенных в Celery. Обратите внимание, что задачи будут зарегистрированы только при импорте модуля, в котором они определены.

Загрузчик по умолчанию импортирует все модули, перечисленные в параметре imports.

Декоратор app.task() отвечает за регистрацию вашей задачи в реестре задач приложений.

При отправке заданий фактический код функции не передается, только имя задания для выполнения. Когда рабочий получает сообщение, он может посмотреть имя в своем реестре задач, чтобы найти код выполнения.

Это означает, что ваши работники должны всегда обновлять то же программное обеспечение, что и клиент. Это недостаток, но альтернатива - техническая проблема, которую еще предстоит решить.

Советы и лучшие практики

Игнорируйте результаты, которые вам не нужны

Если вам не важны результаты выполнения задачи, обязательно установите опцию ignore_result, так как хранение результатов тратит время и ресурсы.

@app.task(ignore_result=True)
def mytask():
    something()

Результаты могут быть даже отключены глобально с помощью параметра task_ignore_result.

Результаты могут быть включены/выключены на основе каждого выполнения, передавая булевый параметр ignore_result при вызове apply_async или delay.

@app.task
def mytask(x, y):
    return x + y

# No result will be stored
result = mytask.apply_async(1, 2, ignore_result=True)
print result.get() # -> None

# Result will be stored
result = mytask.apply_async(1, 2, ignore_result=False)
print result.get() # -> 3

По умолчанию задачи будут не игнорировать результаты (ignore_result=False), если настроен бэкенд результатов.

Порядок приоритета опций следующий:

  1. Глобальный task_ignore_result

  2. ignore_result опция

  3. Вариант выполнения задания ignore_result

Другие советы по оптимизации

Дополнительные советы по оптимизации вы найдете в разделе Optimizing Guide.

Избегайте запуска синхронных подзадач

Задача, ожидающая результата другой задачи, действительно неэффективна и может даже вызвать тупик, если пул рабочих исчерпан.

Вместо этого сделайте свою разработку асинхронной, например, используя обратные вызовы.

Плохо:

@app.task
def update_page_info(url):
    page = fetch_page.delay(url).get()
    info = parse_page.delay(url, page).get()
    store_page_info.delay(url, info)

@app.task
def fetch_page(url):
    return myhttplib.get(url)

@app.task
def parse_page(page):
    return myparser.parse_document(page)

@app.task
def store_page_info(url, info):
    return PageInfo.objects.create(url, info)

Хорошо:

def update_page_info(url):
    # fetch_page -> parse_page -> store_page
    chain = fetch_page.s(url) | parse_page.s() | store_page_info.s(url)
    chain()

@app.task()
def fetch_page(url):
    return myhttplib.get(url)

@app.task()
def parse_page(page):
    return myparser.parse_document(page)

@app.task(ignore_result=True)
def store_page_info(info, url):
    PageInfo.objects.create(url=url, info=info)

Здесь я вместо этого создал цепочку задач, соединив вместе различные signature()“ы. Вы можете прочитать о цепочках и других мощных конструкциях в Canvas: Проектирование рабочих потоков.

По умолчанию Celery не позволяет синхронно запускать подзадачи внутри задачи, но в редких или экстремальных случаях вам может понадобиться это сделать. ПРЕДУПРЕЖДЕНИЕ: не рекомендуется разрешать подзадачам работать синхронно!

@app.task
def update_page_info(url):
    page = fetch_page.delay(url).get(disable_sync_subtasks=False)
    info = parse_page.delay(url, page).get(disable_sync_subtasks=False)
    store_page_info.delay(url, info)

@app.task
def fetch_page(url):
    return myhttplib.get(url)

@app.task
def parse_page(url, page):
    return myparser.parse_document(page)

@app.task
def store_page_info(url, info):
    return PageInfo.objects.create(url, info)

Производительность и стратегии

Зернистость

Гранулярность задачи - это объем вычислений, необходимый для каждой подзадачи. В общем случае лучше разбить проблему на множество мелких задач, чем иметь несколько долго выполняющихся задач.

С меньшими заданиями вы можете обрабатывать больше заданий параллельно, и задания не будут выполняться достаточно долго, чтобы заблокировать рабочего от обработки других ожидающих заданий.

Однако выполнение задачи сопряжено с накладными расходами. Необходимо отправить сообщение, данные могут быть не локальными и т.д. Поэтому, если задачи слишком мелкозернистые, добавляемые накладные расходы, вероятно, сводят на нет все преимущества.

См.также

В книге Art of Concurrency есть раздел, посвященный теме гранулярности задач [AOC1].

[AOC1]

Бреширс, Клэй. Раздел 2.2.1, «Искусство параллелизма». O’Reilly Media, Inc. May 15, 2009. ISBN-13 978-0-596-52153-0.

Местонахождение данных

Работник, выполняющий задание, должен находиться как можно ближе к данным. Лучшим вариантом будет наличие копии в памяти, худшим - полный перенос с другого континента.

Если данные находятся далеко, можно попытаться запустить другого работника, или, если это невозможно, кэшировать часто используемые данные, или предварительно загрузить данные, которые, как вы знаете, будут использоваться.

Самый простой способ обмена данными между рабочими - использовать распределенную систему кэширования, например memcached.

См.также

Статья Distributed Computing Economics Джима Грея является отличным введением в тему локальности данных.

Государство

Поскольку Celery является распределенной системой, вы не можете знать, какой процесс или на какой машине будет выполняться задание. Вы даже не можете знать, будет ли задание выполнено своевременно.

Древние изречения об асинхронности говорят нам, что «утверждение мира - обязанность задачи». Это означает, что картина мира могла измениться с момента запроса задачи, поэтому задача отвечает за то, чтобы убедиться, что мир таков, каким он должен быть; если у вас есть задача, которая переиндексирует поисковую систему, а поисковая система должна переиндексироваться максимум каждые 5 минут, то утверждать это должна задача, а не вызывающая сторона.

Еще одна загвоздка - объекты модели Django. Их не следует передавать в качестве аргументов задачам. Почти всегда лучше повторно получить объект из базы данных во время выполнения задачи, так как использование старых данных может привести к условиям гонки.

Представьте себе следующий сценарий: у вас есть статья и задача, которая автоматически расширяет некоторые сокращения в ней:

class Article(models.Model):
    title = models.CharField()
    body = models.TextField()

@app.task
def expand_abbreviations(article):
    article.body.replace('MyCorp', 'My Corporation')
    article.save()

Сначала автор создает статью и сохраняет ее, затем он нажимает на кнопку, которая инициирует задачу сокращения:

>>> article = Article.objects.get(id=102)
>>> expand_abbreviations.delay(article)

Сейчас очередь очень занята, поэтому задание не будет выполнено еще 2 минуты. За это время другой автор вносит изменения в статью, поэтому, когда задание наконец будет выполнено, тело статьи будет возвращено к старой версии, поскольку в аргументе задания было указано старое тело.

Исправить состояние гонки легко, просто используйте вместо этого id статьи и повторите выборку статьи в теле задачи:

@app.task
def expand_abbreviations(article_id):
    article = Article.objects.get(id=article_id)
    article.body.replace('MyCorp', 'My Corporation')
    article.save()
>>> expand_abbreviations.delay(article_id)

Такой подход может быть даже выгоден с точки зрения производительности, поскольку отправка больших сообщений может быть дорогостоящей.

Транзакции базы данных

Давайте посмотрим на другой пример:

from django.db import transaction
from django.http import HttpResponseRedirect

@transaction.atomic
def create_article(request):
    article = Article.objects.create()
    expand_abbreviations.delay(article.pk)
    return HttpResponseRedirect('/articles/')

Это представление Django, создающее объект статьи в базе данных, а затем передающее первичный ключ задаче. Он использует декоратор transaction.atomic, который зафиксирует транзакцию, когда представление вернется, или откатится назад, если представление вызовет исключение.

Если задача начнет выполняться до фиксации транзакции, возникнет состояние гонки; Объект базы данных еще не существует!

Решением является использование обратного вызова on_commit для запуска задачи Celery после успешной фиксации всех транзакций.

from django.db.transaction import on_commit

def create_article(request):
    article = Article.objects.create()
    on_commit(lambda: expand_abbreviations.delay(article.pk))

Примечание

on_commit доступен в Django 1.9 и выше, если вы используете версию до этого, то библиотека django-transaction-hooks добавляет поддержку для этого.

Пример

Возьмем пример из реального мира: блог, в котором публикуемые комментарии необходимо фильтровать на предмет спама. Когда комментарий создан, спам-фильтр работает в фоновом режиме, поэтому пользователю не нужно ждать его завершения.

У меня есть приложение Django для блога, позволяющее комментировать записи в блоге. Я опишу часть моделей/представлений и задач для этого приложения.

blog/models.py

Модель комментариев выглядит следующим образом:

from django.db import models
from django.utils.translation import ugettext_lazy as _


class Comment(models.Model):
    name = models.CharField(_('name'), max_length=64)
    email_address = models.EmailField(_('email address'))
    homepage = models.URLField(_('home page'),
                               blank=True, verify_exists=False)
    comment = models.TextField(_('comment'))
    pub_date = models.DateTimeField(_('Published date'),
                                    editable=False, auto_add_now=True)
    is_spam = models.BooleanField(_('spam?'),
                                  default=False, editable=False)

    class Meta:
        verbose_name = _('comment')
        verbose_name_plural = _('comments')

В представлении, где публикуется комментарий, я сначала записываю комментарий в базу данных, а затем запускаю задачу фильтрации спама в фоновом режиме.

blog/views.py

from django import forms
from django.http import HttpResponseRedirect
from django.template.context import RequestContext
from django.shortcuts import get_object_or_404, render_to_response

from blog import tasks
from blog.models import Comment


class CommentForm(forms.ModelForm):

    class Meta:
        model = Comment


def add_comment(request, slug, template_name='comments/create.html'):
    post = get_object_or_404(Entry, slug=slug)
    remote_addr = request.META.get('REMOTE_ADDR')

    if request.method == 'post':
        form = CommentForm(request.POST, request.FILES)
        if form.is_valid():
            comment = form.save()
            # Check spam asynchronously.
            tasks.spam_filter.delay(comment_id=comment.id,
                                    remote_addr=remote_addr)
            return HttpResponseRedirect(post.get_absolute_url())
    else:
        form = CommentForm()

    context = RequestContext(request, {'form': form})
    return render_to_response(template_name, context_instance=context)

Для фильтрации спама в комментариях я использую Akismet, сервис, используемый для фильтрации спама в комментариях, размещенных на бесплатной блог-платформе Wordpress. Akismet, является бесплатным для личного использования, но для коммерческого использования вам нужно заплатить. Вы должны зарегистрироваться на их сервисе, чтобы получить ключ API.

Для осуществления вызовов API к Akismet я использую библиотеку akismet.py, написанную Michael Foord.

blog/tasks.py

from celery import Celery

from akismet import Akismet

from django.core.exceptions import ImproperlyConfigured
from django.contrib.sites.models import Site

from blog.models import Comment


app = Celery(broker='amqp://')


@app.task
def spam_filter(comment_id, remote_addr=None):
    logger = spam_filter.get_logger()
    logger.info('Running spam filter for comment %s', comment_id)

    comment = Comment.objects.get(pk=comment_id)
    current_domain = Site.objects.get_current().domain
    akismet = Akismet(settings.AKISMET_KEY, 'http://{0}'.format(domain))
    if not akismet.verify_key():
        raise ImproperlyConfigured('Invalid AKISMET_KEY')


    is_spam = akismet.comment_check(user_ip=remote_addr,
                        comment_content=comment.comment,
                        comment_author=comment.name,
                        comment_author_email=comment.email_address)
    if is_spam:
        comment.is_spam = True
        comment.save()

    return is_spam
Вернуться на верх