Очереди¶
Исходный код: Lib/asyncio/queues.py
очереди asyncio спроектированы так, чтобы быть похожими на классы модуля queue
. Хотя очереди asyncio не являются потокобезопасными, они предназначены специально для использования в коде async/await.
Обратите внимание, что методы очередей asyncio не имеют параметра timeout; используйте функцию asyncio.wait_for()
для выполнения операций с очередями с тайм-аутом.
Смотрите также раздел Examples ниже.
Очередь¶
- class asyncio.Queue(maxsize=0)¶
Очередь «первый вошел - первый вышел» (FIFO).
Если значение maxsize меньше или равно нулю, то размер очереди бесконечен. Если это целое число больше
0
, тоawait put()
блокируется, когда очередь достигает максимального размера, до тех пор, пока элемент не будет удален с помощьюget()
.В отличие от стандартной многопоточности библиотеки
queue
, размер очереди всегда известен и может быть возвращен вызовом методаqsize()
.Изменено в версии 3.10: Удален параметр loop.
Этот класс имеет значение not thread safe.
- maxsize¶
Количество элементов, разрешенных в очереди.
- empty()¶
Верните
True
, если очередь пуста,False
в противном случае.
- full()¶
Возвращает
True
, если в очереди естьmaxsize
элементов.Если очередь была инициализирована с помощью
maxsize=0
(значение по умолчанию), тоfull()
никогда не возвращаетTrue
.
- coroutine get()¶
Удалите и верните элемент из очереди. Если очередь пуста, подождите, пока элемент не станет доступен.
- get_nowait()¶
Верните товар, если он сразу доступен, в противном случае поднимите
QueueEmpty
.
- coroutine join()¶
Блокируйте до тех пор, пока все элементы в очереди не будут получены и обработаны.
Количество незавершенных задач увеличивается всякий раз, когда элемент добавляется в очередь. Количество уменьшается всякий раз, когда пользовательская сопрограмма вызывает
task_done()
, чтобы указать, что элемент был извлечен и вся работа с ним завершена. Когда количество незавершенных задач упадет до нуля,join()
разблокируется.
- coroutine put(item)¶
Поместите товар в очередь. Если очередь заполнена, дождитесь, пока освободится место, прежде чем добавлять товар.
- put_nowait(item)¶
Поместите элемент в очередь без блокировки.
Если в данный момент свободных мест нет, поднимите ставку
QueueFull
.
- qsize()¶
Возвращает количество элементов в очереди.
- task_done()¶
Указывает на то, что ранее поставленная в очередь задача завершена.
Используется пользователями очереди. Для каждого
get()
, используемого для получения задачи, последующий вызовtask_done()
сообщает очереди, что обработка задачи завершена.Если
join()
в данный момент заблокирован, он возобновится, когда все элементы будут обработаны (это означает, что для каждого элемента, который был помещенput()
в очередь, был получен вызовtask_done()
).Вызывает
ValueError
, если вызывается больше раз, чем было элементов, помещенных в очередь.
Приоритетная очередь¶
Очередь FIFO¶
Исключения¶
- exception asyncio.QueueEmpty¶
Это исключение возникает, когда метод
get_nowait()
вызывается в пустой очереди.
- exception asyncio.QueueFull¶
Исключение возникает при вызове метода
put_nowait()
в очереди, которая достигла своего максимального размера.
Примеры¶
Очереди могут использоваться для распределения рабочей нагрузки между несколькими параллельными задачами:
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())