queue
— Класс синхронизированной очереди¶
Исходный код: Lib/queue.py.
Модуль queue
реализует очереди с несколькими производителями и потребителями. Он особенно полезен в потоковом программировании, когда необходимо безопасно обмениваться информацией между несколькими потоками. Класс Queue
в этом модуле реализует всю необходимую семантику блокировки.
Модуль реализует три типа очередей, которые отличаются только порядком получения записей. В очереди FIFO первыми извлекаются первые добавленные задачи. В очереди LIFO первой извлекается самая последняя добавленная запись (работает как стек). В очереди с приоритетом записи сортируются (с помощью модуля heapq
), и первой извлекается запись с наименьшим значением.
Внутри эти три типа очередей используют блокировки для временного блокирования конкурирующих потоков; однако они не предназначены для обработки реентерабельности внутри потока.
Кроме того, модуль реализует «простой» тип очереди FIFO, SimpleQueue
, специфическая реализация которого обеспечивает дополнительные гарантии в обмен на меньшую функциональность.
Модуль queue
определяет следующие классы и исключения:
-
class
queue.
Queue
(maxsize=0)¶ Конструктор для очереди FIFO. maxsize - целое число, задающее верхний предел количества элементов, которые могут быть помещены в очередь. При достижении этого размера вставка будет блокироваться до тех пор, пока элементы очереди не будут израсходованы. Если maxsize меньше или равно нулю, размер очереди бесконечен.
-
class
queue.
LifoQueue
(maxsize=0)¶ Конструктор для очереди LIFO. maxsize - целое число, задающее верхний предел количества элементов, которые могут быть помещены в очередь. При достижении этого размера вставка будет блокироваться до тех пор, пока элементы очереди не будут израсходованы. Если maxsize меньше или равно нулю, размер очереди бесконечен.
-
class
queue.
PriorityQueue
(maxsize=0)¶ Конструктор для приоритетной очереди. maxsize - целое число, задающее верхний предел количества элементов, которые могут быть помещены в очередь. При достижении этого размера вставка будет блокироваться до тех пор, пока элементы очереди не будут израсходованы. Если maxsize меньше или равно нулю, размер очереди бесконечен.
Сначала извлекаются записи с наименьшим значением (запись с наименьшим значением - это та, которую возвращает
sorted(list(entries))[0]
). Типичным шаблоном для записей является кортеж в форме:(priority_number, data)
.Если элементы данных не сравниваются, данные можно обернуть в класс, который игнорирует элемент данных и сравнивает только номер приоритета:
from dataclasses import dataclass, field from typing import Any @dataclass(order=True) class PrioritizedItem: priority: int item: Any=field(compare=False)
-
class
queue.
SimpleQueue
¶ Конструктор для беспредельной очереди FIFO. Простым очередям не хватает расширенной функциональности, такой как отслеживание задач.
Добавлено в версии 3.7.
-
exception
queue.
Empty
¶ Исключение, возникающее, когда неблокирующий
get()
(илиget_nowait()
) вызывается на пустом объектеQueue
.
-
exception
queue.
Full
¶ Исключение, возникающее при вызове неблокирующего
put()
(илиput_nowait()
) объектаQueue
, который переполнен.
Объекты очереди¶
Объекты очереди (Queue
, LifoQueue
или PriorityQueue
) предоставляют публичные методы, описанные ниже.
-
Queue.
qsize
()¶ Возвращает приблизительный размер очереди. Обратите внимание, qsize() > 0 не гарантирует, что последующая get() не заблокируется, также как qsize() < maxsize не гарантирует, что put() не заблокируется.
-
Queue.
empty
()¶ Возвращает
True
, если очередь пуста,False
в противном случае. Если empty() возвращаетTrue
, это не гарантирует, что последующий вызов put() не заблокируется. Аналогично, если empty() возвращаетFalse
, это не гарантирует, что последующий вызов get() не заблокируется.
-
Queue.
full
()¶ Возвращает
True
, если очередь заполнена,False
в противном случае. Если full() возвращаетTrue
, это не гарантирует, что последующий вызов get() не заблокируется. Аналогично, если full() возвращаетFalse
, это не гарантирует, что последующий вызов put() не заблокируется.
-
Queue.
put
(item, block=True, timeout=None)¶ Поместить элемент в очередь. Если опциональный args block равен true и timeout равен
None
(по умолчанию), то при необходимости блокируется до тех пор, пока не появится свободный слот. Если timeout - положительное число, то блокируется не более timeout секунд и вызывает исключениеFull
, если в течение этого времени свободный слот не был доступен. Иначе (block равно false), помещает элемент в очередь, если свободный слот доступен немедленно, иначе вызывает исключениеFull
(timeout в этом случае игнорируется).
-
Queue.
put_nowait
(item)¶ Эквивалентно
put(item, block=False)
.
-
Queue.
get
(block=True, timeout=None)¶ Удалить и вернуть элемент из очереди. Если опциональный args block равен true и timeout равен
None
(по умолчанию), то при необходимости блокируется, пока элемент не будет доступен. Если timeout - положительное число, то блокируется не более timeout секунд и вызывает исключениеEmpty
, если в течение этого времени элемент не был доступен. В противном случае (block равно false), возвращает элемент, если он доступен немедленно, иначе вызывает исключениеEmpty
(timeout в этом случае игнорируется).До версии 3.0 на POSIX-системах и во всех версиях на Windows, если block равен true и timeout равен
None
, эта операция переходит в режим непрерывного ожидания на базовой блокировке. Это означает, что не может произойти никаких исключений, в частности, SIGINT не вызоветKeyboardInterrupt
.
-
Queue.
get_nowait
()¶ Эквивалентно
get(False)
.
Для отслеживания того, были ли поставленные в очередь задачи полностью обработаны потребительскими потоками демона, предлагаются два метода.
-
Queue.
task_done
()¶ Указывает на то, что ранее поставленная в очередь задача завершена. Используется потоками-потребителями очереди. Для каждого
get()
, использованного для получения задачи, последующий вызовtask_done()
сообщает очереди, что обработка задачи завершена.Если
join()
в настоящее время блокируется, он возобновится, когда все элементы будут обработаны (это означает, что вызовtask_done()
был получен для каждого элемента, который былput()
в очереди).Вызывает ошибку
ValueError
, если вызывается большее количество раз, чем было помещено элементов в очередь.
-
Queue.
join
()¶ Блокируется до тех пор, пока все элементы в очереди не будут получены и обработаны.
Счетчик незавершенных задач увеличивается всякий раз, когда в очередь добавляется элемент. Счетчик уменьшается всякий раз, когда поток-потребитель вызывает
task_done()
, чтобы указать, что элемент был получен и вся работа над ним завершена. Когда счетчик незавершенных задач падает до нуля,join()
разблокируется.
Пример ожидания завершения поставленных в очередь задач:
import threading
import queue
q = queue.Queue()
def worker():
while True:
item = q.get()
print(f'Working on {item}')
print(f'Finished {item}')
q.task_done()
# Turn-on the worker thread.
threading.Thread(target=worker, daemon=True).start()
# Send thirty task requests to the worker.
for item in range(30):
q.put(item)
# Block until all tasks are done.
q.join()
print('All work completed')
Объекты SimpleQueue¶
Объекты SimpleQueue
предоставляют публичные методы, описанные ниже.
-
SimpleQueue.
qsize
()¶ Возвращает приблизительный размер очереди. Обратите внимание, что qsize() > 0 не гарантирует, что последующий get() не заблокируется.
-
SimpleQueue.
empty
()¶ Возвращает
True
, если очередь пуста,False
в противном случае. Если empty() возвращаетFalse
, это не гарантирует, что последующий вызов get() не заблокируется.
-
SimpleQueue.
put
(item, block=True, timeout=None)¶ Поместить элемент в очередь. Метод никогда не блокируется и всегда завершается успешно (за исключением потенциальных низкоуровневых ошибок, таких как невозможность выделения памяти). Необязательные аргументы block и timeout игнорируются и предоставляются только для совместимости с
Queue.put()
.CPython implementation detail: This method has a C implementation which is reentrant. That is, a
put()
orget()
call can be interrupted by anotherput()
call in the same thread without deadlocking or corrupting internal state inside the queue. This makes it appropriate for use in destructors such as__del__
methods orweakref
callbacks.
-
SimpleQueue.
put_nowait
(item)¶ Эквивалент
put(item, block=False)
, предоставляется для совместимости сQueue.put_nowait()
.
-
SimpleQueue.
get
(block=True, timeout=None)¶ Удалить и вернуть элемент из очереди. Если опциональный args block равен true и timeout равен
None
(по умолчанию), то при необходимости блокируется, пока элемент не будет доступен. Если timeout - положительное число, то блокируется не более timeout секунд и вызывает исключениеEmpty
, если в течение этого времени элемент не был доступен. В противном случае (block равно false), возвращает элемент, если он доступен немедленно, иначе вызывает исключениеEmpty
(timeout в этом случае игнорируется).
-
SimpleQueue.
get_nowait
()¶ Эквивалентно
get(False)
.
См.также
- Класс
multiprocessing.Queue
Класс очереди для использования в многопроцессорном (а не многопоточном) контексте.
collections.deque
- это альтернативная реализация неограниченных очередей с быстрыми атомарными операциями append()
и popleft()
, которые не требуют блокировки, а также поддерживают индексацию.