queue
— Класс синхронизированной очереди¶
Исходный код: Lib/queue.py
Модуль queue
реализует очереди с несколькими производителями и несколькими потребителями. Это особенно полезно в многопоточном программировании, когда необходимо безопасно обмениваться информацией между несколькими потоками. Класс Queue
в этом модуле реализует всю необходимую семантику блокировки.
Модуль реализует три типа очередей, которые отличаются только порядком извлечения записей. В очереди FIFO первыми добавляются задачи, которые извлекаются первыми. В очереди LIFO запись, добавленная последней, извлекается первой (работает как стек). При приоритетной очереди записи сортируются (с использованием модуля heapq
), и запись с наименьшим значением извлекается первой.
Внутренне эти три типа очередей используют блокировки для временной блокировки конкурирующих потоков; однако они не предназначены для обработки повторного входа внутри потока.
Кроме того, модуль реализует «простой» тип очереди FIFO, SimpleQueue
, конкретная реализация которого обеспечивает дополнительные гарантии в обмен на меньшую функциональность.
Модуль queue
определяет следующие классы и исключения:
- class queue.Queue(maxsize=0)¶
Конструктор для очереди FIFO. maxsize - это целое число, устанавливающее верхний предел количества элементов, которые могут быть помещены в очередь. Вставка будет заблокирована, как только будет достигнут этот размер, до тех пор, пока элементы очереди не будут использованы. Если значение maxsize меньше или равно нулю, то размер очереди бесконечен.
- class queue.LifoQueue(maxsize=0)¶
Конструктор для очереди LIFO. maxsize - это целое число, устанавливающее верхний предел количества элементов, которые могут быть помещены в очередь. Вставка будет заблокирована, как только будет достигнут этот размер, до тех пор, пока элементы очереди не будут использованы. Если значение maxsize меньше или равно нулю, то размер очереди бесконечен.
- class queue.PriorityQueue(maxsize=0)¶
Конструктор для приоритетной очереди. maxsize - это целое число, устанавливающее верхний предел количества элементов, которые могут быть помещены в очередь. Как только будет достигнут этот размер, вставка будет заблокирована до тех пор, пока элементы очереди не будут использованы. Если значение maxsize меньше или равно нулю, то размер очереди бесконечен.
Сначала извлекаются записи с наименьшим значением (запись с наименьшим значением - это та, которая была бы возвращена с помощью
min(entries)
). Типичным шаблоном для записей является кортеж в виде:(priority_number, data)
.Если элементы data несопоставимы, данные могут быть заключены в класс, который игнорирует элемент данных и сравнивает только номер приоритета:
from dataclasses import dataclass, field from typing import Any @dataclass(order=True) class PrioritizedItem: priority: int item: Any=field(compare=False)
- class queue.SimpleQueue¶
Конструктор для неограниченной очереди FIFO. В простых очередях отсутствуют расширенные функциональные возможности, такие как отслеживание задач.
Добавлено в версии 3.7.
- exception queue.Empty¶
Исключение возникает при вызове неблокирующего
get()
(илиget_nowait()
) для объектаQueue
, который является пустым.
- exception queue.Full¶
Исключение возникает при вызове неблокирующего
put()
(илиput_nowait()
) для объектаQueue
, который заполнен.
Объекты очереди¶
Объекты очереди (Queue
, LifoQueue
, или PriorityQueue
) предоставляют общедоступные методы, описанные ниже.
- Queue.qsize()¶
Возвращает приблизительный размер очереди. Обратите внимание, что qsize() > 0 не гарантирует, что последующая функция get() не будет заблокирована, а qsize() < maxsize не гарантирует, что функция put() не будет заблокирована.
- Queue.empty()¶
Возвращает
True
, если очередь пуста,False
в противном случае. Если функция empty() возвращаетTrue
, это не гарантирует, что последующий вызов функции put() не будет заблокирован. Аналогично, если функция empty() возвращаетFalse
, это не гарантирует, что последующий вызов функции get() не будет заблокирован.
- Queue.full()¶
Возвращает
True
, если очередь заполнена,False
в противном случае. Если функция full() возвращаетTrue
, это не гарантирует, что последующий вызов функции get() не будет заблокирован. Аналогично, если функция full() возвращаетFalse
, это не гарантирует, что последующий вызов функции put() не будет заблокирован.
- Queue.put(item, block=True, timeout=None)¶
Поместите item в очередь. Если необязательный аргумент block имеет значение true, а timeout -
None
(по умолчанию), при необходимости заблокируйте, пока не освободится свободное место. Если timeout является положительным числом, он блокирует не более timeout секунд и вызывает исключениеFull
, если в течение этого времени не было доступно ни одного свободного места. В противном случае (block имеет значение false), поместите элемент в очередь, если свободное место доступно немедленно, иначе вызовите исключениеFull
(timeout в этом случае игнорируется).
- Queue.put_nowait(item)¶
Эквивалентно
put(item, block=False)
.
- Queue.get(block=True, timeout=None)¶
Удалите и верните элемент из очереди. Если необязательный аргумент block имеет значение true, а timeout -
None
(по умолчанию), при необходимости заблокируйте, пока элемент не станет доступен. Если timeout является положительным числом, он блокирует не более timeout секунд и вызывает исключениеEmpty
, если в течение этого времени не было доступно ни одного элемента. В противном случае (block имеет значение false) верните элемент, если он доступен немедленно, иначе вызовите исключениеEmpty
(timeout в этом случае игнорируется).До версии 3.0 в системах POSIX и для всех версий Windows, если значение block равно true, а значение timeout равно
None
, эта операция переходит в режим непрерывного ожидания базовой блокировки. Это означает, что никаких исключений возникнуть не может, и, в частности, SIGINT не вызоветKeyboardInterrupt
.
- Queue.get_nowait()¶
Эквивалентно
get(False)
.
Предлагаются два метода для поддержки отслеживания того, были ли поставленные в очередь задачи полностью обработаны потоками-потребителями демона.
- Queue.task_done()¶
Указывает, что ранее поставленная в очередь задача завершена. Используется потоками-потребителями очереди. Для каждого
get()
, используемого для получения задачи, последующий вызовtask_done()
сообщает очереди, что обработка задачи завершена.Если
join()
в данный момент заблокирован, он возобновится, когда все элементы будут обработаны (это означает, что для каждого элемента, который был помещенput()
в очередь, был получен вызовtask_done()
).Вызывает
ValueError
, если вызывается больше раз, чем было элементов, помещенных в очередь.
- Queue.join()¶
Блокируется до тех пор, пока все элементы в очереди не будут получены и обработаны.
Количество незавершенных задач увеличивается всякий раз, когда элемент добавляется в очередь. Количество уменьшается всякий раз, когда поток-потребитель вызывает
task_done()
, чтобы указать, что элемент был извлечен и вся работа с ним завершена. Когда количество незавершенных задач упадет до нуля,join()
разблокируется.
Пример того, как дождаться завершения поставленных в очередь задач:
import threading
import queue
q = queue.Queue()
def worker():
while True:
item = q.get()
print(f'Working on {item}')
print(f'Finished {item}')
q.task_done()
# Turn-on the worker thread.
threading.Thread(target=worker, daemon=True).start()
# Send thirty task requests to the worker.
for item in range(30):
q.put(item)
# Block until all tasks are done.
q.join()
print('All work completed')
Простые объекты очереди¶
SimpleQueue
объекты предоставляют общедоступные методы, описанные ниже.
- SimpleQueue.qsize()¶
Возвращает приблизительный размер очереди. Обратите внимание, что size() > 0 не гарантирует, что последующий get() не будет заблокирован.
- SimpleQueue.empty()¶
Возвращает
True
, если очередь пуста,False
в противном случае. Если функция empty() возвращаетFalse
, это не гарантирует, что последующий вызов функции get() не будет заблокирован.
- SimpleQueue.put(item, block=True, timeout=None)¶
Поместите item в очередь. Метод никогда не блокируется и всегда выполняется успешно (за исключением потенциальных ошибок низкого уровня, таких как сбой в выделении памяти). Необязательные аргументы block и timeout игнорируются и предоставляются только для совместимости с
Queue.put()
.Детали реализации CPython: Этот метод имеет реализацию на языке Си, которая является реентерабельной. То есть вызов
put()
илиget()
может быть прерван другим вызовомput()
в том же потоке без взаимоблокировки или повреждения внутреннего состояния внутри очереди. Это делает его подходящим для использования в деструкторах, таких как методы__del__
или обратные вызовыweakref
.
- SimpleQueue.put_nowait(item)¶
Эквивалентно
put(item, block=False)
, предусмотрено для совместимости сQueue.put_nowait()
.
- SimpleQueue.get(block=True, timeout=None)¶
Удалите и верните элемент из очереди. Если необязательный аргумент block имеет значение true, а timeout -
None
(по умолчанию), при необходимости заблокируйте, пока элемент не станет доступен. Если timeout является положительным числом, он блокирует не более timeout секунд и вызывает исключениеEmpty
, если в течение этого времени не было доступно ни одного элемента. В противном случае (block имеет значение false) верните элемент, если он доступен немедленно, иначе вызовите исключениеEmpty
(timeout в этом случае игнорируется).
- SimpleQueue.get_nowait()¶
Эквивалентно
get(False)
.
См.также
- Класс
multiprocessing.Queue
Класс очереди для использования в многопроцессорном (а не многопоточном) контексте.
collections.deque
- это альтернативная реализация неограниченных очередей с быстрыми атомарными append()
и popleft()
операциями, которые не требуют блокировки, а также поддерживают индексацию.