Очереди¶
Исходный код: Lib/asyncio/queues.py
очереди asyncio спроектированы так, чтобы быть похожими на классы модуля queue. Хотя очереди asyncio не являются потокобезопасными, они предназначены специально для использования в коде async/await.
Обратите внимание, что методы очередей asyncio не имеют параметра timeout; используйте функцию asyncio.wait_for() для выполнения операций с очередями с тайм-аутом.
Смотрите также раздел Examples ниже.
Очередь¶
- class asyncio.Queue(maxsize=0)¶
Очередь «первый вошел - первый вышел» (FIFO).
Если значение maxsize меньше или равно нулю, то размер очереди бесконечен. Если это целое число больше
0, тоawait put()блокируется, когда очередь достигает максимального размера, до тех пор, пока элемент не будет удален с помощьюget().В отличие от стандартной многопоточности библиотеки
queue, размер очереди всегда известен и может быть возвращен вызовом методаqsize().Изменено в версии 3.10: Удален параметр loop.
Этот класс имеет значение not thread safe.
- maxsize¶
Количество элементов, разрешенных в очереди.
- empty()¶
Верните
True, если очередь пуста,Falseв противном случае.
- full()¶
Возвращает
True, если в очереди естьmaxsizeэлементов.Если очередь была инициализирована с помощью
maxsize=0(значение по умолчанию), тоfull()никогда не возвращаетTrue.
- coroutine get()¶
Удалите и верните элемент из очереди. Если очередь пуста, подождите, пока элемент не станет доступен.
- get_nowait()¶
Верните товар, если он сразу доступен, в противном случае поднимите
QueueEmpty.
- coroutine join()¶
Блокируйте до тех пор, пока все элементы в очереди не будут получены и обработаны.
Количество незавершенных задач увеличивается всякий раз, когда элемент добавляется в очередь. Количество уменьшается всякий раз, когда пользовательская сопрограмма вызывает
task_done(), чтобы указать, что элемент был извлечен и вся работа с ним завершена. Когда количество незавершенных задач упадет до нуля,join()разблокируется.
- coroutine put(item)¶
Поместите товар в очередь. Если очередь заполнена, дождитесь, пока освободится место, прежде чем добавлять товар.
- put_nowait(item)¶
Поместите элемент в очередь без блокировки.
Если в данный момент свободных мест нет, поднимите ставку
QueueFull.
- qsize()¶
Возвращает количество элементов в очереди.
- task_done()¶
Указывает на то, что ранее поставленная в очередь задача завершена.
Используется пользователями очереди. Для каждого
get(), используемого для получения задачи, последующий вызовtask_done()сообщает очереди, что обработка задачи завершена.Если
join()в данный момент заблокирован, он возобновится, когда все элементы будут обработаны (это означает, что для каждого элемента, который был помещенput()в очередь, был получен вызовtask_done()).Вызывает
ValueError, если вызывается больше раз, чем было элементов, помещенных в очередь.
Приоритетная очередь¶
Очередь FIFO¶
Исключения¶
- exception asyncio.QueueEmpty¶
Это исключение возникает, когда метод
get_nowait()вызывается в пустой очереди.
- exception asyncio.QueueFull¶
Исключение возникает при вызове метода
put_nowait()в очереди, которая достигла своего максимального размера.
Примеры¶
Очереди могут использоваться для распределения рабочей нагрузки между несколькими параллельными задачами:
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())