Очереди¶
Исходный код: Lib/asyncio/queues.py.
Очереди asyncio разработаны так, чтобы быть похожими на классы модуля queue
. Хотя очереди asyncio не являются потокобезопасными, они разработаны специально для использования в коде async/await.
Обратите внимание, что методы очередей asyncio не имеют параметра timeout; используйте функцию asyncio.wait_for()
для выполнения операций с очередью с таймаутом.
См. также раздел Examples ниже.
Очередь¶
-
class
asyncio.
Queue
(maxsize=0)¶ Очередь «первый вошел - первый вышел» (FIFO).
Если maxsize меньше или равно нулю, то размер очереди бесконечен. Если это целое число больше
0
, тоawait put()
блокирует очередь при достижении maxsize, пока элемент не будет удаленget()
.В отличие от потоковой работы стандартной библиотеки
queue
, размер очереди всегда известен и может быть возвращен вызовом методаqsize()
.Изменено в версии 3.10: Удален параметр loop.
Этот класс является not thread safe.
-
maxsize
¶ Количество элементов, разрешенных в очереди.
-
empty
()¶ Возвращает
True
, если очередь пуста,False
в противном случае.
-
full
()¶ Возвращает
True
, если в очереди естьmaxsize
элементов.Если очередь была инициализирована с
maxsize=0
(по умолчанию), тоfull()
никогда не возвращаетTrue
.
-
coroutine
get
()¶ Удалить и вернуть элемент из очереди. Если очередь пуста, подождите, пока элемент не станет доступен.
-
get_nowait
()¶ Возвращает элемент, если он немедленно доступен, в противном случае поднимает
QueueEmpty
.
-
coroutine
join
()¶ Блокировать до тех пор, пока все элементы в очереди не будут получены и обработаны.
Счетчик незавершенных задач увеличивается всякий раз, когда элемент добавляется в очередь. Счетчик уменьшается всякий раз, когда потребительская корутина вызывает
task_done()
, чтобы указать, что элемент был получен и вся работа над ним завершена. Когда счетчик незавершенных задач падает до нуля,join()
разблокируется.
-
coroutine
put
(item)¶ Поместите элемент в очередь. Если очередь переполнена, подождите, пока освободится свободный слот, прежде чем добавить элемент.
-
put_nowait
(item)¶ Поместите элемент в очередь без блокировки.
Если свободный слот не доступен, поднимите
QueueFull
.
-
qsize
()¶ Возвращает количество элементов в очереди.
-
task_done
()¶ Указывает на то, что ранее записанная задача завершена.
Используется потребителями очереди. Для каждого
get()
, используемого для получения задачи, последующий вызовtask_done()
сообщает очереди, что обработка задачи завершена.Если
join()
в настоящее время блокируется, он возобновится, когда все элементы будут обработаны (это означает, что вызовtask_done()
был получен для каждого элемента, который былput()
в очереди).Вызывает
ValueError
, если вызывается большее количество раз, чем было помещено элементов в очередь.
-
Очередь приоритетов¶
Очередь LIFO¶
Исключения¶
-
exception
asyncio.
QueueEmpty
¶ Это исключение возникает, когда метод
get_nowait()
вызывается на пустой очереди.
-
exception
asyncio.
QueueFull
¶ Исключение, возникающее при вызове метода
put_nowait()
на очереди, достигшей своего maxsize.
Примеры¶
Очереди можно использовать для распределения рабочей нагрузки между несколькими одновременными задачами:
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())