Рабочие и фоновые задачи

Хотя channel layers в основном предназначены для связи между различными экземплярами приложений ASGI, они также могут быть использованы для разгрузки работы на набор рабочих серверов, прослушивающих фиксированные имена каналов, в качестве простой очереди задач с очень низкой задержкой.

Примечание

Система рабочих/фоновых задач в Channels проста и очень быстра, и достигается это за счет отсутствия некоторых функций, которые вы можете счесть полезными, таких как повторные попытки или возвращаемые значения.

Мы рекомендуем использовать его для работы, которая не требует гарантий завершения (доставка at-most-once), а для работы, требующей больших гарантий, обратите внимание на отдельную выделенную очередь задач.

Эта функция не работает с канальным слоем in-memory.

Настройка фоновых задач состоит из двух частей - отправка событий, а затем настройка потребителей для получения и обработки событий.

Отправка

Чтобы отправить событие, просто отправьте его на фиксированное имя канала. Например, допустим, нам нужен фоновый процесс, который предварительно кэширует миниатюры:

# Inside a consumer
self.channel_layer.send(
    "thumbnails-generate",
    {
        "type": "generate",
        "id": 123456789,
    },
)

Обратите внимание, что отправляемое вами событие должно иметь ключ type, даже если по каналу передается только один тип сообщения, так как оно превратится в событие, которое должен обработать потребитель.

Также помните, что если вы отправляете событие из синхронной среды, вы должны использовать обертку asgiref.sync.async_to_sync, как указано в channel layers.

Получение и потребители

Каналы будут представлять вам входящие рабочие задания в виде событий внутри области видимости с type из channel, и ключом channel, соответствующим имени канала. Мы рекомендуем вам использовать ProtocolTypeRouter и ChannelNameRouter (подробнее см. Маршрутизация) для организации ваших потребителей:

application = ProtocolTypeRouter({
    ...
    "channel": ChannelNameRouter({
        "thumbnails-generate": consumers.GenerateConsumer.as_asgi(),
        "thumbnails-delete": consumers.DeleteConsumer.as_asgi(),
    }),
})

Вы сами будете указывать значения type отдельных событий при их отправке, поэтому решите, какими будут ваши имена, и напишите соответствующих потребителей. Например, вот базовый потребитель, который ожидает получить событие с type test.print, и значение text, содержащее текст для печати:

class PrintConsumer(SyncConsumer):
    def test_print(self, message):
        print("Test: " + message["text"])

После того как вы подключили потребителей, все, что вам нужно сделать, это запустить процесс, который будет их обрабатывать. Вместо сервера протоколов - поскольку здесь нет соединений - Channels предоставляет вам это с помощью команды runworker:

python manage.py runworker thumbnails-generate thumbnails-delete

Обратите внимание, что runworker будет слушать только те каналы, которые вы передадите ему в командной строке. Если вы не включите канал или забудете запустить рабочий, ваши события не будут получены и обработаны.

Вернуться на верх