Очереди

Исходный код: Lib/asyncio/queues.py


очереди asyncio спроектированы так, чтобы быть похожими на классы модуля queue. Хотя очереди asyncio не являются потокобезопасными, они предназначены специально для использования в коде async/await.

Обратите внимание, что методы очередей asyncio не имеют параметра timeout; используйте функцию asyncio.wait_for() для выполнения операций с очередями с тайм-аутом.

Смотрите также раздел Examples ниже.

Очередь

class asyncio.Queue(maxsize=0)

Очередь «первый вошел - первый вышел» (FIFO).

Если значение maxsize меньше или равно нулю, то размер очереди бесконечен. Если это целое число больше 0, то await put() блокируется, когда очередь достигает максимального размера, до тех пор, пока элемент не будет удален с помощью get().

В отличие от стандартной многопоточности библиотеки queue, размер очереди всегда известен и может быть возвращен вызовом метода qsize().

Изменено в версии 3.10: Удален параметр loop.

Этот класс имеет значение not thread safe.

maxsize

Количество элементов, разрешенных в очереди.

empty()

Верните True, если очередь пуста, False в противном случае.

full()

Возвращает True, если в очереди есть maxsize элементов.

Если очередь была инициализирована с помощью maxsize=0 (значение по умолчанию), то full() никогда не возвращает True.

coroutine get()

Удалите и верните элемент из очереди. Если очередь пуста, подождите, пока элемент не станет доступен.

get_nowait()

Верните товар, если он сразу доступен, в противном случае поднимите QueueEmpty.

coroutine join()

Блокируйте до тех пор, пока все элементы в очереди не будут получены и обработаны.

Количество незавершенных задач увеличивается всякий раз, когда элемент добавляется в очередь. Количество уменьшается всякий раз, когда пользовательская сопрограмма вызывает task_done(), чтобы указать, что элемент был извлечен и вся работа с ним завершена. Когда количество незавершенных задач упадет до нуля, join() разблокируется.

coroutine put(item)

Поместите товар в очередь. Если очередь заполнена, дождитесь, пока освободится место, прежде чем добавлять товар.

put_nowait(item)

Поместите элемент в очередь без блокировки.

Если в данный момент свободных мест нет, поднимите ставку QueueFull.

qsize()

Возвращает количество элементов в очереди.

task_done()

Указывает на то, что ранее поставленная в очередь задача завершена.

Используется пользователями очереди. Для каждого get(), используемого для получения задачи, последующий вызов task_done() сообщает очереди, что обработка задачи завершена.

Если join() в данный момент заблокирован, он возобновится, когда все элементы будут обработаны (это означает, что для каждого элемента, который был помещен put() в очередь, был получен вызов task_done()).

Вызывает ValueError, если вызывается больше раз, чем было элементов, помещенных в очередь.

Приоритетная очередь

class asyncio.PriorityQueue

Вариант Queue; извлекает записи в порядке приоритета (самая младшая из них первая).

Записи обычно представляют собой кортежи вида (priority_number, data).

Очередь FIFO

class asyncio.LifoQueue

Вариант Queue, который сначала извлекает самые последние добавленные записи (последний вход, первый выход).

Исключения

exception asyncio.QueueEmpty

Это исключение возникает, когда метод get_nowait() вызывается в пустой очереди.

exception asyncio.QueueFull

Исключение возникает при вызове метода put_nowait() в очереди, которая достигла своего максимального размера.

Примеры

Очереди могут использоваться для распределения рабочей нагрузки между несколькими параллельными задачами:

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())
Вернуться на верх