Асинхронная отправка сообщений с помощью каналов Django по мере их создания

У меня есть процесс, который получает данные асинхронно с помощью httpx, и я хотел бы отправить эти данные сразу после их получения. Вместо этого Channels отправляет их только после получения всех данных. Не уверен, что я не понял чего-то основного, но я бы подумал, что это должно быть функциональностью хлеб и масло для асинхронного фреймворка.

Я использую каналы в слое памяти, и у меня нет возможности установить сервер Redis, кроме того, основываясь на других вопросах на stackoverflow - я не уверен, поможет ли это в любом случае.

Минимальный пример:

class AsyncSend(AsyncJsonWebsocketConsumer):

    async def connect(self):
        # Join group
        self.group_name = f"test"

        await self.channel_layer.group_add(
            self.group_name,
            self.channel_name
        )

        await self.accept()

        #Replicate non blocking call
        async for result in self.non_block():
            await self.send_task(result)

    async def non_block(self):
        for integer in range(4):
            await asyncio.sleep(1)
            curTime = datetime.now().strftime("%H:%M:%S")
            print(f"{curTime} - {integer}")
            yield integer

    async def send_task(self, result):
        curTime = datetime.now().strftime("%H:%M:%S")
        print(f"{curTime} - Send task called")
        await self.channel_layer.group_send(
            self.group_name, {
                'type': "transmit",
                'message': result,
            }
        )
        print("Finished sending")

    async def transmit(self, event):
        print(f"Received event to transmit...")

    async def disconnect(self, close_code):
        # Leave group
        await self.channel_layer.group_discard(
        self.group_name,
        self.channel_name
    )

Выход:

10:23:21 - 0
10:23:21 - Send task called
Finished sending
10:23:22 - 1
10:23:22 - Send task called
Finished sending
10:23:23 - 2
10:23:23 - Send task called
Finished sending
10:23:24 - 3
10:23:24 - Send task called
Finished sending
Received event to transmit...
Received event to transmit...
Received event to transmit...
Received event to transmit...

К сожалению, запуск потребителя в потоковом режиме, как было предложено в: How to multithread AsyncConsumer with Django Channels тоже не сработало.

Я пробовал несколько реализаций с использованием run_in_executor и threadpoolexecutors, но я не могу заставить его работать. Фактически, он запускает их последовательно, а не асинхронно.

Соответствующие поправки:

class AsyncSend(AsyncJsonWebsocketConsumer):

    def __init__(self):
        super().__init__()
        self.executor = ThreadPoolExecutor(max_workers=5)

    async def connect(self):
        ...
        with ThreadPoolExecutor(max_workers=5) as pool:
            loop = asyncio.get_running_loop()
            futures = []
            async for result in self.non_block():
                #await loop.create_task(self.send_task(result)) #No improvement over initial method
                future = await loop.run_in_executor(pool, self.send_task, result) #Same behaviour if pool is replaced with self.executor
                futures.append(future)
            try:
                results = await asyncio.gather(*futures) 
            except Exception as ex:
                print("Caught error executing task", ex)
                raise
12:43:39 - 0
12:43:40 - 1
12:43:41 - 2
12:43:42 - 3
12:43:42 - Send task called
Finished sending
12:43:42 - Send task called
Finished sending
12:43:42 - Send task called
Finished sending
12:43:42 - Send task called
Finished sending
Received event to transmit...
Received event to transmit...
Received event to transmit...
Received event to transmit...

Я также пробовал использовать uvicorn, но каждый раз одно и то же поведение.

Вернуться на верх