Как использовать Celery для планирования задач

Существует множество способов планирования задач в вашем приложении Django, но есть некоторые преимущества использования Celery. Он поддерживается, хорошо масштабируется и отлично работает с Django. Учитывая его широкое применение, существует множество ресурсов, позволяющих узнать о нем больше, а полученные знания могут пригодиться в других проектах.

Celery Version 5.0.x

Эта документация относится к Celery 5.0.x. Более ранние или более поздние версии Celery могут вести себя по-другому. Кроме того, ссылки на документацию Celery могут перестать работать, если в новых версиях Celery произойдет реорганизация документации, а такое случается.

Введение в Celery

Цель Celery - позволить вам выполнять код по расписанию. Почему это может быть полезно? Вот несколько распространенных случаев:

Случай 1: Предположим, что поступил веб-запрос от пользователя, который ждет завершения запроса, чтобы новая страница могла загрузиться в его браузере. Исходя из запроса, вам нужно выполнить некоторый код, который займет некоторое время (большее, чем человек хотел бы ждать веб-страницу), но вам не нужно выполнять этот код до ответа на веб-запрос. Вы можете использовать Celery, чтобы ваш долго выполняющийся код был вызван позже, и сразу же ответить на веб-запрос.

Это часто случается, если для обработки запроса вам необходимо обратиться к удаленному серверу. Ваше приложение не может контролировать, сколько времени потребуется удаленному серверу для ответа, или удаленный сервер может быть отключен.

Случай 2: Другая распространенная ситуация - необходимость регулярного выполнения кода. Например, возможно, каждый час вы хотите просматривать последний прогноз погоды и сохранять данные. Вы можете написать задачу для выполнения этой работы, а затем попросить Celery запускать ее каждый час. Задача выполнится и поместит данные в базу данных, после чего ваше веб-приложение получит доступ к последнему прогнозу погоды.

Некоторая терминология сельдерея:

задача — это просто функция Python. . Вы можете думать о планировании задачи как о вызове функции с задержкой по времени. Например, вы можете попросить Celery вызвать вашу функцию task1 с аргументами (1, 3, 3) через пять минут. Или вы можете вызывать свою функцию batchjob каждую ночь в полночь.

Когда задача готова к выполнению, Celery помещает ее в очередь queue, список задач, готовых к выполнению. У вас может быть много очередей, но для простоты мы будем считать, что здесь одна очередь.

Помещение задачи в очередь, так сказать, просто добавляет ее в список дел. Для выполнения задачи какой-то другой процесс, называемый worker, должен наблюдать за этой очередью задач. Когда он увидит задачи в очереди, он вытащит первую и выполнит ее, а затем вернется, чтобы ждать еще. У вас может быть много рабочих процессов, возможно, на разных серверах, но пока мы предположим одного рабочего.

Позже мы поговорим об очереди, рабочих и еще одном важном процессе, о котором мы еще не упоминали, но пока этого достаточно.

Сложная часть

Для того чтобы все это работало, процессы Django и Celery должны согласовать большую часть своей конфигурации, а процессы Celery должны выполнить достаточно настроек Django, чтобы наши задачи могли получить доступ к базе данных и так далее. Это немного сложно, потому что Django и Celery имеют совершенно разный стартовый код.

(Примечание: Это было намного проще несколько основных релизов Celery назад, когда мы могли просто запускать приложения Celery как команды управления Django. Ах, это были времена...)

Вот некоторые ключевые моменты:

  • DJANGO_SETTINGS_MODULE должен быть установлен в окружении перед запуском процесса Celery. Его наличие в окружении запускает внутреннюю магию в Celery для запуска установки Django в нужное время.
  • "Приложение" Celery должно быть создано и настроено во время запуска как Django, так и Celery.
  • Все задачи Celery должны быть импортированы во время запуска Django и Celery.

Установка Celery локально

Мы собираемся использовать Redis вместе с Celery. Поэтому вам потребуется установить Redis. Я укажу на документацию Redis install documentation, чтобы не воспроизводить ее здесь. Затем установите Celery и все зависимости, необходимые для использования Redis, одной командой, запущенной в нашем виртуальном окружении:

$ pip install celery[redis]

Настройка Django для Celery

Нам просто нужно настроить Celery для использования с runserver. Для Celery broker, который позволяет Django и Celery workers общаться, мы будем использовать Redis. Redis хорош тем, что его легко настроить и он подходит для многих производственных сред.

Предупреждение: Прежде чем использовать Redis дальше, чем просто играть в локальной системе, прочитайте о безопасности Redis и спланируйте защиту вашего сервера Redis. Он не предназначен для обеспечения безопасности "из коробки".

В ваш файл настроек Django добавьте:

CELERY_BROKER_URL = "redis://localhost:6379/0"
CELERY_RESULT_BACKEND = "redis://localhost:6379/1"

Примечание: Значение broker является единственным наиболее важным значением конфигурации, поскольку оно указывает Django и Celery, как общаться. Если у них нет одинакового значения для этой настройки, ни одно задание не будет запущено.

Создание приложения Celery

Нам нужен небольшой Python-файл, который будет инициализировать Celery так, как мы хотим, независимо от того, запущен ли он в процессе Django или Celery.

Заманчиво просто создать файл celery.py на верхнем уровне нашего проекта, но это именно то имя, которое мы не можем использовать, поскольку Celery владеет файлом celery пространство имен пакета. Вместо этого я создам celery.py внутри одного из существующих пакетов. В этих примерах это будет myapp/celery.py.

Вот код, который вам нужно добавить:

from celery import Celery

# Create default Celery app
app = Celery()

# namespace='CELERY' means all celery-related configuration keys
# should be uppercased and have a `CELERY_` prefix in Django settings.
# https://docs.celeryproject.org/en/stable/userguide/configuration.html
app.config_from_object("django.conf:settings", namespace="CELERY")

# When we use the following in Django, it loads all the <appname>.tasks
# files and registers any tasks it finds in them. We can import the
# tasks files some other way if we prefer.
app.autodiscover_tasks()

На самом деле не имеет значения, какой переменной вы присваиваете объект Celery(). Celery найдет ее, если она находится на верхнем уровне модуля.

config_from_object важен, поэтому мы можем поместить настройки CELERY_* в наши настройки Django, и Celery будет использовать эти значения. Все, что мы хотим настроить в Celery, мы просто находим правильную конфигурацию настройки, измените его на все заглавные буквы, поставьте CELERY_ впереди и установите его в наших настройках Django.

Например, существует настройка Celery timezone. Если бы мы хотели установить его, мы бы поместили что-то вроде этого в настройки Django:

CELERY_TIMEZONE = "America/New_York"

Очень важно, чтобы этот файл был импортирован на поздней стадии установки Django, после того, как все ваши приложения Django были зарегистрированы и модели загружены. Я рекомендую импортировать его внутри метода ready() любого приложения Django.

from django.apps import AppConfig


class MyAppConfig(AppConfig):
    # ...

    def ready(self):
        # Import celery app now that Django is mostly ready.
        # This initializes Celery and autodiscovers tasks
        import myapp.celery
        ...

Если вы используете Django до версии 3.2, чтобы убедиться, что это будет использоваться, добавьте это в myapp/__init__.py:

myapp/__init__.py

default_app_config = 'myapp.MyAppConfig’

Вы увидите ниже, что мы скажем процессам Celery загрузить это с помощью опции командной строки.

Написание задачи

Как упоминалось ранее, задача может быть просто функцией Python. Однако Сельдерей должен знать об этом. Это довольно просто при использовании Celery с Django. Просто добавьте в приложение файл tasks.py, поместите свои задачи в этот файл и украсьте их с помощью @shared_task(). Вот простой tasks.py:

from celery import shared_task

@shared_task()
def add(x, y):
    return x + y

Пометка функции как задачи не препятствует ее нормальному вызову. Вы все еще можете назвать его: z = add(1, 2), и он будет работать точно так же, как и раньше. Отметив ее как задачу, вы получите дополнительные способы ее вызова.

При импорте Celery зарегистрирует этот метод как задачу для нашего приложения. Или вызов app.autodiscover_tasks() загрузит задачи во все ваши файлы <appname>/tasks.py.

Все задачи должны быть импортированы во время запуска Django и Celery, чтобы Celery знал о них. Если мы поместим их в файлы <appname>/tasks.py и вызовем app.autodiscover_tasks(), это сработает. Или мы могли бы поместить наши задачи в файлы наших моделей, или импортировать их оттуда, или импортировать их из готовых методов приложения.

Выдача задания

Начнем с простого случая, о котором мы упоминали выше. Мы хотим запустить нашу задачу в ближайшее время. Мы просто не хотим, чтобы это задерживало нашу текущую ветку. Мы можем сделать это, добавив .delay к имени нашей задачи:

from myapp.tasks import add

add.delay(2, 2)

Celery добавит задание в свою очередь ("worker, please call myapp.tasks.add(2, 2)") и немедленно вернется. Как только незанятый рабочий увидит его во главе очереди, он удалит его из очереди, затем выполнит его, примерно так:

import myapp.tasks.add

myapp.tasks.add(2, 2)

Предупреждение об импортируемых именах

Важно, чтобы ваша задача всегда импортировалась и упоминалась с использованием то же имя пакета. Например, в зависимости от того, как настроен ваш путь Python, вы можете ссылаться на него как на myproject.myapp.tasks.add или myapp.tasks.add. . Или из myapp.views вы можете импортировать его как .tasks.add. Но Сельдерей не может знать, что это одна и та же задача.

Тестирование

Как уже упоминалось, для выполнения задач Celery должен быть запущен отдельный процесс, worker. Вот как мы можем запустить рабочий процесс для наших нужд разработки.

Сначала откройте новую оболочку или окно. В этой оболочке настройте ту же среду разработки Django — активируйте виртуальную среду или добавьте что-то в свой путь Python, что бы вы ни делали, чтобы вы могли использовать runserver для запуска ваш проект.

Кроме того, даже если в противном случае вы бы этого не сделали, вы должны установить DJANGO_SETTINGS_MODULE в вашем окружении, иначе Celery не распознает, что он работает с Django.

Теперь вы можете запустить рабочего в этой оболочке:

$ celery -A myapp.celery worker --loglevel=info

Рабочий будет выполняться в этом окне и отправлять вывод туда.

"Опция" командной строки -A на самом деле не является необязательной. Celery импортирует этот модуль и будет искать там наш объект приложения Celery.

Кстати, здесь мы можем быть более конкретными, например. -A myapp.celery:app, чтобы сообщить Celery, что приложение, которое мы хотим использовать, находится в переменной верхнего уровня app в модуле. Но вам не нужно было бы делать это, если бы у вас не было нескольких приложений Celery в модуле, и нет причин делать это для большинства проектов Django.

Выполнить задание

Вернитесь в первое окно, запустите оболочку Django и запустите вашу задачу:

 $ python manage.py shell
 >>> from myapp.tasks import add
 >>> add.delay(2, 2)
<AsyncResult: 80abe5c2-0f4f-4b93-b924-2ebad70b44b7>
>>>

Вы должны увидеть вывод в окне рабочего, указывающий на то, что рабочий выполнил задание:

[2013-01-21 08:47:08,076: INFO/MainProcess] Got task from broker: myapp.tasks.add[e080e047-b2a2-43a7-af74-d7d9d98b02fc]
[2013-01-21 08:47:08,299: INFO/MainProcess] Task myapp.tasks.add[e080e047-b2a2-43a7-af74-d7d9d98b02fc] succeeded in 0.183349132538s: 4

Если вместо этого все зависает, вернитесь назад и убедитесь, что в приложении Django внесены предложенные выше изменения в настройки, myapp/__init__.py и myapp/apps.py.

Пример

Ранее мы упоминали об использовании Celery, чтобы избежать задержки ответа на веб-запрос. Вот упрощенное представление Django, использующее эту технику:

# views.py

def view(request):
    form = SomeForm(request.POST)
    if form.is_valid():
        data = form.cleaned_data
        # Schedule a task to process the data later
        do_something_with_form_data.delay(data)
    return render_to_response(...)

# tasks.py

@shared_task
def do_something_with_form_data(data):
    call_slow_web_service(data['user'], data['text'], ...)

Устранение неполадок

Задачи Celery могут быть трудновыполнимыми, потому что несколько частей должны присутствовать и правильно взаимодействовать друг с другом. Многие из обычных советов по-прежнему применимы:

  • Get the simplest possible configuration working first.
  • Use the python debugger and print statements to see what's going on.
  • Turn up logging levels (e.g. --loglevel debug on the worker) to get more insight.

Есть также некоторые инструменты, которые являются уникальными для Celery.

Вежливое планирование

В настройках Django вы можете добавить:

CELERY_ALWAYS_EAGER = True

и Celery будет обходить весь механизм планирования и вызывать ваш код напрямую.

Другими словами, при CELERY_ALWAYS_EAGER = True эти два утверждения выполняются одинаково:

add.delay(2, 2)
add(2, 2)

Вы можете использовать это, чтобы заставить работать вашу основную логику, прежде чем вводить усложнения в планировании Celery.

Проверьте результаты

Каждый раз, когда вы планируете выполнение задачи, Celery возвращает объект AsyncResult. Вы можете сохранить этот объект, а затем использовать его позже, чтобы узнать, была ли выполнена задача, была ли она успешной, и каков был результат.

result = add.delay(2, 2)
...
if result.ready():
    print("Task has run")
    if result.successful():
        print("Result was: %s" % result.result)
    else:
        if isinstance(result.result, Exception):
            print("Task failed due to raising an exception")
            raise result.result
        else:
            print("Task failed without raising exception"
 else:
     print("Task has not yet run")

Периодическое планирование

Другой распространенный случай - выполнение задачи по регулярному расписанию. Celery реализует это с помощью другого процесса, celery beat. Celery beat работает непрерывно, и всякий раз, когда наступает время для выполнения запланированной задачи, celery beat ставит ее в очередь на выполнение.

По очевидным причинам, должен быть запущен только один процесс celery beat (в отличие от рабочих, где вы можете запускать столько, сколько хотите и нужно).

Запуск celery beat аналогичен запуску worker. Запустите другое окно, настройте ваше окружение Django, затем:

$ celery -A myapp.celery beat

Примечание: Если вы запускаете celery beat где-то, где у него не будет постоянной файловой системы при всех вызовах, например, в контейнере, то проигнорируйте следующие инструкции и обратитесь к другой статье моего блога: Как планировать задачи с помощью Celery Beat в контейнере.

Чтобы организовать выполнение задачи "add" из пакета "myapp.tasks" каждые 30 секунд с аргументами (16, 16), добавьте это в настройки Django:

CELERY_BEAT_SCHEDULE = {
      'add-every-30-seconds': {
        'task': 'myapp.tasks.add',
        'schedule': 30.0,
        'args': (16, 16),
        'options': {
            'expires': 15.0,
        },
    },
}

Для безопасности опция expires говорит Celery, что если он не сможет выполнить эту задачу в течение 15 секунд, то просто отменит ее. Мы знаем, что в любом случае поставим в очередь еще одну задачу каждые 30 секунд.

Подсказки и советы

Не передавайте объекты модели задачам. Поскольку задачи не выполняются немедленно, к тому времени, когда задача выполнится и посмотрит на объект модели, который был ей передан, соответствующая запись в базе данных может измениться. Если задача затем что-то сделает с объектом модели и сохранит его, эти изменения в базе данных будут перезаписаны более старыми данными.

Почти всегда безопаснее сохранить объект, передать ключ записи и снова искать объект в задаче:

myobject.save()
mytask.delay(myobject.pk)

...


@task
def mytask(pk):
    myobject = MyModel.objects.get(pk=pk)
    ...

Планируйте задачи в других задачах. Совершенно правильно планировать одну задачу во время выполнения другой. Это хороший способ убедиться, что вторая задача не будет запущена, пока первая не выполнит необходимую работу.

Не ждите одну задачу в другой. Если задача ждет другую задачу, рабочий первой задачи блокируется и не может выполнять больше никакой работы, пока ожидание не закончится. Это может привести к тупику, рано или поздно.

Если вы находитесь в задаче A и хотите запланировать задачу B, а после завершения задачи B выполнить еще какую-то работу, лучше создать задачу C для выполнения этой работы, а задаче B запланировать задачу C, когда она будет выполнена.

Следующие шаги

После того, как вы поймете основы, некоторые части руководства пользователя Celery станут хорошим чтением. Я рекомендую начать со следующих глав; остальные либо не актуальны для пользователей Django, либо являются более продвинутыми:

Использование Celery в производстве

Для получения информации по этой теме ознакомьтесь с другой статьей моего блога, "Сельдерей в производстве".

Вот и все! Мы надеемся, что вам понравится изучать новые способы составления расписаний в Celery. Ждем ваших комментариев!

Вернуться на верх