Очереди

Исходный код: Lib/asyncio/queues.py.


Очереди asyncio разработаны так, чтобы быть похожими на классы модуля queue. Хотя очереди asyncio не являются потокобезопасными, они разработаны специально для использования в коде async/await.

Обратите внимание, что методы очередей asyncio не имеют параметра timeout; используйте функцию asyncio.wait_for() для выполнения операций с очередью с таймаутом.

См. также раздел Examples ниже.

Очередь

class asyncio.Queue(maxsize=0)

Очередь «первый вошел - первый вышел» (FIFO).

Если maxsize меньше или равно нулю, то размер очереди бесконечен. Если это целое число больше 0, то await put() блокирует очередь при достижении maxsize, пока элемент не будет удален get().

В отличие от потоковой работы стандартной библиотеки queue, размер очереди всегда известен и может быть возвращен вызовом метода qsize().

Изменено в версии 3.10: Удален параметр loop.

Этот класс является not thread safe.

maxsize

Количество элементов, разрешенных в очереди.

empty()

Возвращает True, если очередь пуста, False в противном случае.

full()

Возвращает True, если в очереди есть maxsize элементов.

Если очередь была инициализирована с maxsize=0 (по умолчанию), то full() никогда не возвращает True.

coroutine get()

Удалить и вернуть элемент из очереди. Если очередь пуста, подождите, пока элемент не станет доступен.

get_nowait()

Возвращает элемент, если он немедленно доступен, в противном случае поднимает QueueEmpty.

coroutine join()

Блокировать до тех пор, пока все элементы в очереди не будут получены и обработаны.

Счетчик незавершенных задач увеличивается всякий раз, когда элемент добавляется в очередь. Счетчик уменьшается всякий раз, когда потребительская корутина вызывает task_done(), чтобы указать, что элемент был получен и вся работа над ним завершена. Когда счетчик незавершенных задач падает до нуля, join() разблокируется.

coroutine put(item)

Поместите элемент в очередь. Если очередь переполнена, подождите, пока освободится свободный слот, прежде чем добавить элемент.

put_nowait(item)

Поместите элемент в очередь без блокировки.

Если свободный слот не доступен, поднимите QueueFull.

qsize()

Возвращает количество элементов в очереди.

task_done()

Указывает на то, что ранее записанная задача завершена.

Используется потребителями очереди. Для каждого get(), используемого для получения задачи, последующий вызов task_done() сообщает очереди, что обработка задачи завершена.

Если join() в настоящее время блокируется, он возобновится, когда все элементы будут обработаны (это означает, что вызов task_done() был получен для каждого элемента, который был put() в очереди).

Вызывает ValueError, если вызывается большее количество раз, чем было помещено элементов в очередь.

Очередь приоритетов

class asyncio.PriorityQueue

Вариант Queue; извлекает записи в порядке приоритета (сначала наименьшая).

Записи обычно представляют собой кортежи вида (priority_number, data).

Очередь LIFO

class asyncio.LifoQueue

Вариант Queue, при котором сначала извлекаются последние добавленные записи (последние входящие, первые исходящие).

Исключения

exception asyncio.QueueEmpty

Это исключение возникает, когда метод get_nowait() вызывается на пустой очереди.

exception asyncio.QueueFull

Исключение, возникающее при вызове метода put_nowait() на очереди, достигшей своего maxsize.

Примеры

Очереди можно использовать для распределения рабочей нагрузки между несколькими одновременными задачами:

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())
Вернуться на верх