Сомнения в производительности Python async
Я запускаю приложение websocket через Django Channels и с Python 3.12. У меня есть механизм ping для проверки того, что мои пользователи все еще подключены к моему приложению, где через определенный промежуток времени fronted отправляет ping сообщение на мой сервер (допустим, каждые 5 секунд) и когда я получаю это сообщение, я делаю следующее:
Я звоню await asyncio.sleep(time interval * 2) ;
После этого я проверяю, не превышает ли временная метка последнего сообщения, полученного в этом обработчике или в любом другом из активных, time_interval * 1.5
, и если это так, я отключаю пользователя, чтобы освободить ресурсы соединения, поскольку оно больше не активно;
Однако мы заметили, что этот вызов сна, который мы делаем каждые X секунд для каждого активного соединения, которое у нас есть, когда получен вызов ping, может вызывать некоторые проблемы с производительностью приложения в целом. Мы предполагаем это, потому что заметили, что каждый раз, когда мы отключаем эту логику ping, нам требуется меньше ресурсов (количество стручков и CPU) для обработки среднего количества трафика, чем когда у нас активен этот обработчик ping. С другой стороны, кажется, что все сообщения, которые мы обслуживаем через канал django, также проходят намного быстрее, когда этот пинг-обработчик отключен.
Мой обработчик:
if action == "ping":
await self.ping_handler(message_info)
async def ping_handler(self, message_info):
await asyncio.sleep(PING_INTERVAL * 2)
if (
round_half_up(
(dt.datetime.now() - self.last_message).total_seconds(), decimals=0
)
>= PING_INTERVAL * 1.5
):
await self.close()
Кроме этого обработчика пинга, мое приложение получает сообщения от клиентов, обрабатывает некоторые данные из этих сообщений и затем транслирует их в соответствующие каналы, например, так:
async def action_dispatcher(self, action, message_info):
try:
if action == "post-message":
await self.handle_message(message_info)
elif action == "post-username":
await self.change_nickname()
elif action == "post-change-privacy":
await self.handle_change_privacy(message_info)
elif action == "get-bet-status":
await self.get_bet_status(message_info)
elif action == "ping":
self.last_message = dt.datetime.now()
await self.ping_handler(message_info)
async def handle_message(message_info):
# execute business logic before these steps
encoded_message = await super().encode_socket_message(
[
"message_type",
self.nickname,
"success",
]
)
await self.channel_layer.group_send(
self.username_group,
encoded_message,
)
Может ли кто-нибудь помочь мне понять, верны ли мои предположения или нет? Я много искал в интернете и ChatGPT, но не смог найти законного ответа на мои сомнения, что если у нас есть 200-500 активных соединений в канальном приложении, которые каждые 5 секунд выполняют эту логику пинга, что это может привести к ограничениям производительности.
Цель моего приложения - получать чат-сообщения от клиентов, обрабатывать их содержимое и транслировать его остальным пользователям через канальные слои django.
Я пробовал запускать свое приложение с этой логикой обработки пинга и без нее и искал ответы на вопросы в foruns, stackoverflow и чате gpt.
То, что производительность хуже, неудивительно. Каналы Django являются очередями «первый пришел - первый ушел» (FIFO) [источник].
Программа action_dispatcher
обрабатывает сообщения последовательно, и поэтому, ожидая X секунд в очереди отправки, вы предотвращаете обработку любых других сообщений в этой очереди в течение X секунд.
Если вы хотите не блокировать обработку сообщений, то вам следует создать задачу. Задача будет выполняться параллельно с вашей action_dispatcher
.
eg.
async def action_dispatcher(self, action, message_info):
try:
if action == "post-message":
...
elif action == "ping":
self.last_message = dt.datetime.now()
task = asyncio.create_task(self.ping_handler(message_info))
# NB. Do not await this task in your action dispatcher
# NB2. Make sure to keep a reference to your task available, otherwise
# it may not run to completion. eg.
self.ping_tasks.append(task)
Если вы просто сделаете это, то можете получить предупреждения о задачах, которые не дождались своих результатов. Вы можете использовать TaskGroup
, чтобы убедиться, что эти дочерние задачи обрабатываются должным образом. То есть сохранять ссылку до их завершения и следить за тем, чтобы их результаты были собраны. Сделайте экземпляр TaskGroup
доступным для action_dispatcher()
, а затем выполните task_group.create_task(self.ping_handler(message_info))` вместо этого.