Асинхронная отправка сообщений с помощью каналов Django по мере их создания
У меня есть процесс, который получает данные асинхронно с помощью httpx, и я хотел бы отправить эти данные сразу после их получения. Вместо этого Channels отправляет их только после получения всех данных. Не уверен, что я не понял чего-то основного, но я бы подумал, что это должно быть функциональностью хлеб и масло для асинхронного фреймворка.
Я использую каналы в слое памяти, и у меня нет возможности установить сервер Redis, кроме того, основываясь на других вопросах на stackoverflow - я не уверен, поможет ли это в любом случае.
Минимальный пример:
class AsyncSend(AsyncJsonWebsocketConsumer):
async def connect(self):
# Join group
self.group_name = f"test"
await self.channel_layer.group_add(
self.group_name,
self.channel_name
)
await self.accept()
#Replicate non blocking call
async for result in self.non_block():
await self.send_task(result)
async def non_block(self):
for integer in range(4):
await asyncio.sleep(1)
curTime = datetime.now().strftime("%H:%M:%S")
print(f"{curTime} - {integer}")
yield integer
async def send_task(self, result):
curTime = datetime.now().strftime("%H:%M:%S")
print(f"{curTime} - Send task called")
await self.channel_layer.group_send(
self.group_name, {
'type': "transmit",
'message': result,
}
)
print("Finished sending")
async def transmit(self, event):
print(f"Received event to transmit...")
async def disconnect(self, close_code):
# Leave group
await self.channel_layer.group_discard(
self.group_name,
self.channel_name
)
Выход:
10:23:21 - 0
10:23:21 - Send task called
Finished sending
10:23:22 - 1
10:23:22 - Send task called
Finished sending
10:23:23 - 2
10:23:23 - Send task called
Finished sending
10:23:24 - 3
10:23:24 - Send task called
Finished sending
Received event to transmit...
Received event to transmit...
Received event to transmit...
Received event to transmit...
К сожалению, запуск потребителя в потоковом режиме, как было предложено в: How to multithread AsyncConsumer with Django Channels тоже не сработало.
Я пробовал несколько реализаций с использованием run_in_executor и threadpoolexecutors, но я не могу заставить его работать. Фактически, он запускает их последовательно, а не асинхронно.
Соответствующие поправки:
class AsyncSend(AsyncJsonWebsocketConsumer):
def __init__(self):
super().__init__()
self.executor = ThreadPoolExecutor(max_workers=5)
async def connect(self):
...
with ThreadPoolExecutor(max_workers=5) as pool:
loop = asyncio.get_running_loop()
futures = []
async for result in self.non_block():
#await loop.create_task(self.send_task(result)) #No improvement over initial method
future = await loop.run_in_executor(pool, self.send_task, result) #Same behaviour if pool is replaced with self.executor
futures.append(future)
try:
results = await asyncio.gather(*futures)
except Exception as ex:
print("Caught error executing task", ex)
raise
12:43:39 - 0
12:43:40 - 1
12:43:41 - 2
12:43:42 - 3
12:43:42 - Send task called
Finished sending
12:43:42 - Send task called
Finished sending
12:43:42 - Send task called
Finished sending
12:43:42 - Send task called
Finished sending
Received event to transmit...
Received event to transmit...
Received event to transmit...
Received event to transmit...
Я также пробовал использовать uvicorn, но каждый раз одно и то же поведение.