threading — Параллелизм на основе потоков

Исходный код: Lib/threading.py.


Этот модуль строит высокоуровневые потоковые интерфейсы поверх низкоуровневого модуля _thread.

Изменено в версии 3.7: Раньше этот модуль был необязательным, теперь он доступен всегда.

См.также

concurrent.futures.ThreadPoolExecutor предлагает интерфейс более высокого уровня для передачи задач в фоновый поток без блокировки выполнения вызывающего потока, при этом сохраняя возможность получить их результаты, когда это необходимо.

queue предоставляет потокобезопасный интерфейс для обмена данными между запущенными потоками.

asyncio предлагает альтернативный подход к достижению параллелизма на уровне задач, не требуя использования нескольких потоков операционной системы.

Примечание

В серии Python 2.x этот модуль содержал имена camelCase для некоторых методов и функций. Они устарели в Python 3.10, но все еще поддерживаются для совместимости с Python 2.5 и ниже.

CPython implementation detail: В CPython, благодаря Global Interpreter Lock, только один поток может выполнять код Python одновременно (хотя некоторые библиотеки, ориентированные на производительность, могут преодолеть это ограничение). Если вы хотите, чтобы ваше приложение лучше использовало вычислительные ресурсы многоядерных машин, вам рекомендуется использовать multiprocessing или concurrent.futures.ProcessPoolExecutor. Тем не менее, потоковая обработка по-прежнему является подходящей моделью, если вы хотите одновременно выполнять несколько задач, связанных с вводом-выводом.

Этот модуль определяет следующие функции:

threading.active_count()

Возвращает количество объектов Thread, существующих в данный момент. Возвращаемое количество равно длине списка, возвращаемого командой enumerate().

Функция activeCount является устаревшим псевдонимом для этой функции.

threading.current_thread()

Возвращает текущий объект Thread, соответствующий потоку управления вызывающей стороны. Если поток управления вызывающей стороны не был создан через модуль threading, возвращается фиктивный объект потока с ограниченной функциональностью.

Функция currentThread является устаревшим псевдонимом для этой функции.

threading.excepthook(args, /)

Обработать не пойманное исключение, вызванное Thread.run().

Аргумент args имеет следующие атрибуты:

  • exc_type: Тип исключения.

  • exc_value: Значение исключения, может быть None.

  • exc_traceback: Отслеживание исключения, может быть None.

  • thread: Поток, который вызвал исключение, может быть None.

Если exc_type равен SystemExit, исключение молча игнорируется. В противном случае исключение выводится на sys.stderr.

Если эта функция вызывает исключение, то для его обработки вызывается sys.excepthook().

threading.excepthook() может быть переопределена для управления тем, как обрабатываются не пойманные исключения, поднятые Thread.run().

Хранение exc_value с помощью пользовательского хука может создать цикл ссылок. Его следует очистить явным образом, чтобы разорвать цикл ссылок, когда исключение больше не нужно.

Хранение thread с помощью пользовательского хука может воскресить его, если он установлен на объект, который завершается. Избегайте хранения thread после завершения работы пользовательского хука, чтобы избежать воскрешения объектов.

См.также

sys.excepthook() обрабатывает не пойманные исключения.

Добавлено в версии 3.8.

threading.__excepthook__

Сохраняет исходное значение threading.excepthook(). Оно сохраняется для того, чтобы можно было восстановить исходное значение в случае их замены на сломанные или альтернативные объекты.

Добавлено в версии 3.10.

threading.get_ident()

Возвращает «идентификатор потока» текущего потока. Это ненулевое целое число. Его значение не имеет прямого смысла; оно предназначено для использования в качестве магического cookie, например, для индексации словаря данных, специфичных для потока. Идентификаторы потоков могут быть повторно использованы, когда поток завершается и создается другой поток.

Добавлено в версии 3.3.

threading.get_native_id()

Возвращает собственный интегральный Thread ID текущего потока, назначенный ядром. Это неотрицательное целое число. Его значение может использоваться для уникальной идентификации данного конкретного потока в масштабах всей системы (пока поток не завершится, после чего значение может быть утилизировано ОС).

Availability: Windows, FreeBSD, Linux, macOS, OpenBSD, NetBSD, AIX.

Добавлено в версии 3.8.

threading.enumerate()

Возвращает список всех объектов Thread, активных в данный момент. Список включает демонические потоки и объекты фиктивных потоков, созданные current_thread(). Он исключает завершенные потоки и потоки, которые еще не были запущены. Однако основной поток всегда является частью результата, даже если он завершен.

threading.main_thread()

Возвращает объект main Thread. В нормальных условиях основным потоком является поток, из которого был запущен интерпретатор Python.

Добавлено в версии 3.4.

threading.settrace(func)

Установите функцию трассировки для всех потоков, запущенных из модуля threading. Функция func будет передана в sys.settrace() для каждого потока, перед вызовом его метода run().

threading.gettrace()

Получить функцию трассировки, заданную командой settrace().

Добавлено в версии 3.10.

threading.setprofile(func)

Установите функцию профиля для всех потоков, запускаемых из модуля threading. Функция func будет передана в sys.setprofile() для каждого потока, перед вызовом его метода run().

threading.getprofile()

Получить функцию профилировщика, заданную командой setprofile().

Добавлено в версии 3.10.

threading.stack_size([size])

Возвращает размер стека потоков, используемый при создании новых потоков. Необязательный аргумент size определяет размер стека, который будет использоваться для последующих созданных потоков, и должен быть равен 0 (использовать значение по умолчанию платформы или конфигурации) или целочисленному положительному значению не менее 32 768 (32 KiB). Если size не указан, используется 0. Если изменение размера стека потока не поддерживается, выдается предупреждение RuntimeError. Если указанный размер стека недопустим, выдается предупреждение ValueError, и размер стека не изменяется. 32 КиБ - это минимальное поддерживаемое значение размера стека, гарантирующее достаточное пространство стека для самого интерпретатора. Обратите внимание, что некоторые платформы могут иметь особые ограничения на размер стека, например, требовать минимальный размер стека > 32 KiB или требовать выделения пространства, кратного размеру страницы системной памяти - для получения дополнительной информации следует обратиться к документации платформы (страницы размером 4 KiB являются обычными; использование кратных 4096 для размера стека является рекомендуемым подходом при отсутствии более конкретной информации).

Availability: Windows, системы с потоками POSIX.

Этот модуль также определяет следующую константу:

threading.TIMEOUT_MAX

Максимальное значение, допустимое для параметра timeout блокирующих функций (Lock.acquire(), RLock.acquire(), Condition.wait() и т.д.). Указание таймаута, превышающего это значение, вызовет ошибку OverflowError.

Добавлено в версии 3.2.

Этот модуль определяет ряд классов, которые подробно описаны в разделах ниже.

Дизайн этого модуля в значительной степени основан на модели потоков Java. Однако, если в Java блокировки и переменные состояния являются базовым поведением каждого объекта, то в Python это отдельные объекты. Класс Python Thread поддерживает подмножество поведения класса Thread языка Java; в настоящее время нет приоритетов, нет групп потоков, и потоки не могут быть уничтожены, остановлены, приостановлены, возобновлены или прерваны. Статические методы класса Java’s Thread, когда они реализованы, отображаются на функции уровня модуля.

Все описанные ниже методы выполняются атомарно.

Локальные данные

Локальные данные потока - это данные, значения которых зависят от конкретного потока. Чтобы управлять потоково-локальными данными, достаточно создать экземпляр local (или подкласс) и хранить на нем атрибуты:

mydata = threading.local()
mydata.x = 1

Значения экземпляра будут разными для отдельных потоков.

class threading.local

Класс, представляющий локальные данные потока.

Более подробную информацию и обширные примеры можно найти в строке документации модуля _threading_local.

Нитяные объекты

Класс Thread представляет деятельность, которая выполняется в отдельном потоке управления. Есть два способа задать активность: передав конструктору вызываемый объект или переопределив метод run() в подклассе. Никакие другие методы (кроме конструктора) не должны быть переопределены в подклассе. Другими словами, переопределяйте только* методы __init__() и run() этого класса.

После создания объекта потока его деятельность должна быть начата вызовом метода start() потока. Это вызывает метод run() в отдельном потоке управления.

После начала работы потока он считается «живым». Он перестает быть живым, когда его метод run() завершается - либо нормально, либо путем поднятия необработанного исключения. Метод is_alive() проверяет, жив ли поток.

Другие потоки могут вызывать метод потока join(). Это блокирует вызывающий поток до тех пор, пока поток, чей метод join() вызывается, не будет завершен.

У потока есть имя. Имя может быть передано конструктору, а также прочитано или изменено через атрибут name.

Если метод run() вызывает исключение, то для его обработки вызывается threading.excepthook(). По умолчанию threading.excepthook() молча игнорирует SystemExit.

Поток может быть помечен как «поток-демон». Значение этого флага заключается в том, что вся программа Python завершается, когда остаются только потоки-демоны. Начальное значение наследуется от создающего потока. Флаг может быть установлен через свойство daemon или аргумент конструктора daemon.

Примечание

Потоки демонов резко останавливаются при выключении. Их ресурсы (такие как открытые файлы, транзакции базы данных и т.д.) могут быть освобождены неправильно. Если вы хотите, чтобы ваши потоки останавливались изящно, сделайте их недемоническими и используйте подходящий механизм сигнализации, например, Event.

Существует объект «главный поток»; он соответствует начальному потоку управления в программе Python. Это не демонский поток.

Существует возможность создания «фиктивных потоковых объектов». Это объекты потоков, соответствующие «чужим потокам», которые являются потоками управления, запущенными вне модуля потоков, например, непосредственно из кода на языке Си. Фиктивные потоковые объекты имеют ограниченную функциональность; они всегда считаются живыми и демоническими, и не могут быть join()ed. Они никогда не удаляются, поскольку невозможно обнаружить завершение чужих потоков.

class threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

Этот конструктор всегда должен вызываться с аргументами в виде ключевых слов. Аргументами являются:

group должна быть None; зарезервирована для будущего расширения, когда будет реализован класс ThreadGroup.

target - вызываемый объект, который будет вызван методом run(). По умолчанию None, что означает, что ничего не вызывается.

name - это имя потока. По умолчанию уникальное имя строится в виде «Thread-N», где N - маленькое десятичное число, или «Thread-N (target)», где «target» - target.__name__, если указан аргумент target.

args - кортеж аргументов для целевого вызова. По умолчанию имеет значение ().

kwargs - словарь аргументов ключевых слов для целевого вызова. По умолчанию имеет значение {}.

Если не None, daemon явно устанавливает, является ли поток демоническим. Если None (по умолчанию), свойство daemonic наследуется от текущего потока.

Если подкласс переопределяет конструктор, он должен обязательно вызвать конструктор базового класса (Thread.__init__()), прежде чем делать что-либо еще с потоком.

Изменено в версии 3.10: Используйте имя цели, если аргумент имя опущен.

Изменено в версии 3.3: Добавлен аргумент daemon.

start()

Начните активность потока.

Она должна вызываться не более одного раза для каждого объекта потока. Он организует вызов метода run() объекта в отдельном потоке управления.

Этот метод вызовет ошибку RuntimeError, если будет вызван более одного раза на одном и том же объекте потока.

run()

Метод, представляющий активность потока.

Вы можете переопределить этот метод в подклассе. Стандартный метод run() вызывает вызываемый объект, переданный конструктору объекта в качестве аргумента target, если таковой имеется, с позиционными и ключевыми аргументами, взятыми из аргументов args и kwargs, соответственно.

join(timeout=None)

Подождите, пока поток не завершится. Это блокирует вызывающий поток до тех пор, пока поток, чей метод join() вызывается, не завершится - либо нормально, либо через необработанное исключение - или пока не произойдет необязательный тайм-аут.

Когда аргумент timeout присутствует, а не None, он должен быть числом с плавающей точкой, задающим тайм-аут для операции в секундах (или их долях). Поскольку join() всегда возвращает None, вы должны вызвать is_alive() после join(), чтобы определить, произошел ли таймаут - если поток еще жив, вызов join() завершился.

Если аргумент timeout отсутствует или None, операция будет блокироваться до тех пор, пока поток не завершится.

Нить может быть join()ed много раз.

join() вызывает исключение RuntimeError, если предпринимается попытка присоединиться к текущему потоку, так как это приведет к тупику. Также ошибкой является join() потока до того, как он был запущен, и попытки сделать это вызывают такое же исключение.

name

Строка, используемая только для целей идентификации. Она не имеет семантики. Одно и то же имя может быть присвоено нескольким потокам. Начальное имя задается конструктором.

getName()
setName()

Устаревший API getter/setter для name; вместо этого используйте его непосредственно как свойство.

Не рекомендуется, начиная с версии 3.10.

ident

„Идентификатор потока“ этого потока или None, если поток не был запущен. Это ненулевое целое число. См. функцию get_ident(). Идентификаторы потоков могут быть повторно использованы, когда поток завершается и создается другой поток. Идентификатор доступен даже после завершения потока.

native_id

Thread ID (TID) этого потока, присвоенный ОС (ядром). Это целое неотрицательное число, или None, если поток не был запущен. См. функцию get_native_id(). Это значение может использоваться для уникальной идентификации данного потока в масштабах всей системы (пока поток не завершится, после чего значение может быть утилизировано ОС).

Примечание

Как и идентификаторы процессов, идентификаторы потоков действительны (гарантированная уникальность в масштабах всей системы) только с момента создания потока до его завершения.

Availability: Требуется функция get_native_id().

Добавлено в версии 3.8.

is_alive()

Возвращает, жив ли данный поток.

Этот метод возвращает True непосредственно перед началом работы метода run() и сразу после завершения работы метода run(). Функция модуля enumerate() возвращает список всех живых потоков.

daemon

Булево значение, указывающее, является ли этот поток демоном (True) или нет (False). Это значение должно быть установлено до вызова start(), иначе будет вызван RuntimeError. Его начальное значение наследуется от создающего потока; главный поток не является демонским, и поэтому все потоки, созданные в главном потоке, по умолчанию имеют значение daemon = False.

Вся программа Python завершается, когда не остается живых недемонских потоков.

isDaemon()
setDaemon()

Устаревший API getter/setter для daemon; вместо этого используйте его непосредственно как свойство.

Не рекомендуется, начиная с версии 3.10.

Блокировка объектов

Примитивная блокировка - это примитив синхронизации, который при блокировке не принадлежит конкретному потоку. В Python в настоящее время это примитив синхронизации самого низкого уровня, реализуемый непосредственно модулем расширения _thread.

Примитивная блокировка находится в одном из двух состояний, «заблокирована» или «разблокирована». Она создается в разблокированном состоянии. У него есть два основных метода, acquire() и release(). Когда состояние разблокировано, acquire() изменяет состояние на заблокированное и немедленно возвращается. Когда состояние заблокировано, acquire() блокируется до тех пор, пока вызов release() в другом потоке не изменит его на разблокированное, затем вызов acquire() сбрасывает его на заблокированное и возвращается. Метод release() следует вызывать только в заблокированном состоянии; он изменяет состояние на разблокированное и немедленно возвращается. Если будет предпринята попытка освободить разблокированную блокировку, будет вызвана ошибка RuntimeError.

Замки также поддерживают context management protocol.

Если более одного потока заблокированы в acquire() в ожидании перехода состояния в разблокированное, только один поток продолжает выполнение, когда вызов release() сбрасывает состояние в разблокированное; какой из ожидающих потоков продолжает выполнение, не определено, и может варьироваться в разных реализациях.

Все методы выполняются атомарно.

class threading.Lock

Класс, реализующий примитивные объекты блокировки. После того как поток получил блокировку, последующие попытки получить ее блокируются, пока она не будет освобождена; любой поток может освободить ее.

Обратите внимание, что Lock на самом деле является фабричной функцией, которая возвращает экземпляр наиболее эффективной версии конкретного класса Lock, поддерживаемого платформой.

acquire(blocking=True, timeout=- 1)

Получение блокировки, блокирующей или неблокирующей.

При вызове с аргументом blocking, установленным в True (по умолчанию), блокирует до разблокировки блокировки, затем устанавливает ее в locked и возвращает True.

При вызове с аргументом blocking, установленным в False, блокировка не производится. Если вызов с blocking, установленным в True, будет блокировать, немедленно верните False; в противном случае установите блокировку на locked и верните True.

При вызове с аргументом timeout с плавающей точкой, установленным в положительное значение, блокировка длится не более нескольких секунд, указанных в timeout, и до тех пор, пока блокировка не будет получена. Аргумент timeout, равный -1, указывает на беспредельное ожидание. Запрещено указывать timeout, когда блокировка имеет значение False.

Возвращаемое значение True если блокировка получена успешно, False если нет (например, если истек timeout).

Изменено в версии 3.2: Параметр timeout является новым.

Изменено в версии 3.2: Получение блокировки теперь может быть прервано сигналами на POSIX, если базовая реализация потоков поддерживает это.

release()

Освободить блокировку. Эта функция может быть вызвана из любого потока, а не только из потока, который получил блокировку.

Когда блокировка заблокирована, сбросьте ее на разблокированную и вернитесь. Если какие-либо другие потоки заблокированы в ожидании разблокировки блокировки, разрешите продолжить выполнение только одному из них.

При вызове на разблокированной блокировке возникает ошибка RuntimeError.

Возвращаемое значение отсутствует.

locked()

Верните True, если блокировка получена.

Объекты RLock

Рекуррентная блокировка - это примитив синхронизации, который может быть получен несколько раз одним и тем же потоком. Внутри она использует понятия «владеющий поток» и «уровень рекурсии» в дополнение к состоянию заблокирован/разблокирован, используемому примитивными блокировками. В заблокированном состоянии блокировка принадлежит какому-либо потоку, в разблокированном состоянии она не принадлежит ни одному потоку.

Чтобы заблокировать блокировку, поток вызывает свой метод acquire(), который возвращается, как только поток становится владельцем блокировки. Чтобы разблокировать блокировку, поток вызывает свой метод release(). Пары вызовов acquire()/release() могут быть вложенными; только последний release() (release() крайней пары) сбрасывает блокировку на разблокированную и позволяет другому потоку, заблокированному в acquire(), продолжить выполнение.

Реентерабельные блокировки также поддерживают context management protocol.

class threading.RLock

Этот класс реализует объекты реентерабельной блокировки. Реентерабельная блокировка должна быть освобождена потоком, который ее приобрел. После того, как поток приобрел реентерабельную блокировку, этот же поток может получить ее снова без блокировки; поток должен освободить ее один раз за каждый раз, когда он ее приобрел.

Обратите внимание, что RLock на самом деле является фабричной функцией, которая возвращает экземпляр наиболее эффективной версии конкретного класса RLock, поддерживаемого платформой.

acquire(blocking=True, timeout=- 1)

Получение блокировки, блокирующей или неблокирующей.

При вызове без аргументов: если данный поток уже владеет блокировкой, увеличиваем уровень рекурсии на единицу и немедленно возвращаемся. В противном случае, если блокировка принадлежит другому потоку, блокируйте до тех пор, пока блокировка не будет разблокирована. Если блокировка разблокирована (не принадлежит ни одному потоку), то захватите право собственности, установите уровень рекурсии на единицу и вернитесь. Если несколько потоков заблокированы в ожидании разблокировки блокировки, только один из них сможет одновременно получить право собственности на блокировку. В этом случае возвращаемое значение отсутствует.

При вызове с аргументом blocking, установленным в True, делает то же самое, что и при вызове без аргументов, и возвращает True.

При вызове с аргументом blocking, установленным в False, не блокируйте. Если вызов без аргумента будет блокировать, немедленно верните False; в противном случае сделайте то же самое, что и при вызове без аргументов, и верните True.

При вызове с аргументом timeout с плавающей точкой, установленным в положительное значение, блокирует не более чем на количество секунд, указанное timeout, и до тех пор, пока блокировка не может быть получена. Возвращается True, если блокировка была получена, False, если таймаут истек.

Изменено в версии 3.2: Параметр timeout является новым.

release()

Освободите блокировку, уменьшив уровень рекурсии. Если после декремента он равен нулю, сбросьте блокировку на разблокированную (не принадлежащую ни одному потоку), и если другие потоки заблокированы в ожидании разблокировки блокировки, разрешите продолжить выполнение только одному из них. Если после декремента уровень рекурсии все еще ненулевой, блокировка остается заблокированной и принадлежит вызывающему потоку.

Вызывайте этот метод только тогда, когда вызывающий поток владеет блокировкой. Если этот метод вызывается, когда блокировка разблокирована, возникает ошибка RuntimeError.

Возвращаемое значение отсутствует.

Объекты состояния

Переменная условия всегда связана с каким-либо замком; его можно передать или он будет создан по умолчанию. Передача блокировки полезна, когда несколько переменных условий должны использовать одну и ту же блокировку. Блокировка является частью объекта условия: вам не нужно отслеживать ее отдельно.

Переменная условия подчиняется оператору context management protocol: использование оператора with приобретает связанную блокировку на время действия вложенного блока. Методы acquire() и release() также вызывают соответствующие методы связанной блокировки.

Другие методы должны вызываться с удержанием соответствующей блокировки. Метод wait() освобождает блокировку, а затем блокирует ее до тех пор, пока другой поток не пробудит ее вызовом notify() или notify_all(). После пробуждения, wait() снова получает блокировку и возвращается. Также можно указать тайм-аут.

Метод notify() пробуждает один из потоков, ожидающих переменную условия, если таковые имеются. Метод notify_all() пробуждает все потоки, ожидающие переменную условия.

Примечание: методы notify() и notify_all() не освобождают блокировку; это означает, что пробужденный поток или потоки вернутся после вызова wait() не сразу, а только когда поток, вызвавший notify() или notify_all(), окончательно откажется от владения блокировкой.

Типичный стиль программирования с использованием переменных состояния использует блокировку для синхронизации доступа к некоторому общему состоянию; потоки, заинтересованные в конкретном изменении состояния, вызывают wait() несколько раз, пока не увидят желаемое состояние, а потоки, изменяющие состояние, вызывают notify() или notify_all(), когда они изменяют состояние таким образом, что оно может стать желаемым состоянием для одного из ожидающих. Например, следующий код представляет собой типовую ситуацию производитель-потребитель с неограниченной емкостью буфера:

# Consume one item
with cv:
    while not an_item_is_available():
        cv.wait()
    get_an_available_item()

# Produce one item
with cv:
    make_an_item_available()
    cv.notify()

Цикл while, проверяющий условие приложения, необходим, поскольку wait() может вернуться через произвольно долгое время, и условие, вызвавшее вызов notify(), может перестать выполняться. Это свойственно многопоточному программированию. Метод wait_for() может быть использован для автоматизации проверки условия и облегчает вычисление таймаутов:

# Consume an item
with cv:
    cv.wait_for(an_item_is_available)
    get_an_available_item()

Чтобы выбрать между notify() и notify_all(), подумайте, может ли одно изменение состояния быть интересным только для одного или нескольких ожидающих потоков. Например, в типичной ситуации производитель-потребитель добавление одного элемента в буфер требует пробуждения только одного потока-потребителя.

class threading.Condition(lock=None)

Этот класс реализует объекты переменных условий. Переменная условия позволяет одному или нескольким потокам ждать, пока их не уведомит другой поток.

Если указан аргумент lock, а не None, то это должен быть объект Lock или RLock, и он используется в качестве базовой блокировки. В противном случае создается новый объект RLock и используется в качестве базовой блокировки.

Изменено в версии 3.3: превратился из фабричной функции в класс.

acquire(*args)

Приобрести базовую блокировку. Этот метод вызывает соответствующий метод на базовой блокировке; возвращаемое значение - то, что возвращает этот метод.

release()

Освободить базовую блокировку. Этот метод вызывает соответствующий метод на базовой блокировке; возвращаемое значение отсутствует.

wait(timeout=None)

Ждать до получения уведомления или до наступления тайм-аута. Если вызывающий поток не получил блокировку в момент вызова этого метода, то будет вызвана ошибка RuntimeError.

Этот метод освобождает базовую блокировку, а затем блокируется, пока не будет пробужден вызовом notify() или notify_all() для той же переменной условия в другом потоке, или пока не произойдет необязательный тайм-аут. После пробуждения или истечения таймаута он снова приобретает блокировку и возвращается.

Если аргумент timeout присутствует, а не None, он должен представлять собой число с плавающей точкой, задающее тайм-аут для операции в секундах (или их долях).

Если базовой блокировкой является RLock, она не освобождается с помощью метода release(), поскольку это может не разблокировать блокировку, если она была получена несколько раз рекурсивно. Вместо этого используется внутренний интерфейс класса RLock, который действительно разблокирует его, даже если он был рекурсивно получен несколько раз. Затем используется другой внутренний интерфейс для восстановления уровня рекурсии при повторном получении блокировки.

Возвращаемое значение равно True, если не истек заданный таймаут, в этом случае оно равно False.

Изменено в версии 3.2: Ранее метод всегда возвращал None.

wait_for(predicate, timeout=None)

Подождите, пока условие не будет оценено как истина. Предикат predicate должен быть вызываемой переменной, результат которой будет интерпретирован как булево значение. Может быть задан timeout, задающий максимальное время ожидания.

Этот полезный метод может вызывать wait() многократно, пока предикат не будет удовлетворен, или пока не произойдет тайм-аут. Возвращаемое значение является последним возвращаемым значением предиката и будет иметь значение False, если метод завершился по таймауту.

Если не учитывать функцию тайм-аута, вызов этого метода примерно эквивалентен написанию:

while not predicate():
    cv.wait()

Поэтому действуют те же правила, что и для wait(): блокировка должна удерживаться при вызове и вновь приобретается при возврате. Предикат оценивается с удержанной блокировкой.

Добавлено в версии 3.2.

notify(n=1)

По умолчанию, пробуждается один поток, ожидающий этого условия, если таковой имеется. Если вызывающий поток не получил блокировку в момент вызова этого метода, то будет вызвана ошибка RuntimeError.

Этот метод пробуждает не более n потоков, ожидающих переменную условия; он не работает, если ни один поток не ожидает.

Текущая реализация пробуждает ровно n потоков, если ожидают по крайней мере n потоков. Однако полагаться на такое поведение небезопасно. Будущая оптимизированная реализация может иногда пробуждать больше, чем n потоков.

Примечание: пробудившийся поток не возвращается из своего вызова wait() до тех пор, пока не сможет снова получить блокировку. Поскольку notify() не освобождает блокировку, это должен сделать его вызывающий.

notify_all()

Пробудить все потоки, ожидающие выполнения этого условия. Этот метод действует аналогично notify(), но пробуждает все ожидающие потоки вместо одного. Если вызывающий поток не получил блокировку в момент вызова этого метода, будет вызвана ошибка RuntimeError.

Метод notifyAll является устаревшим псевдонимом для этого метода.

Объекты семафора

Это один из самых старых примитивов синхронизации в истории информатики, изобретенный голландским ученым Эдсгером В. Дейкстрой (он использовал названия P() и V() вместо acquire() и release()).

Семафор управляет внутренним счетчиком, который уменьшается при каждом вызове acquire() и увеличивается при каждом вызове release(). Счетчик никогда не может опуститься ниже нуля; когда acquire() обнаруживает, что он равен нулю, он блокируется, ожидая, пока какой-нибудь другой поток не вызовет release().

Семафоры также поддерживают context management protocol.

class threading.Semaphore(value=1)

Этот класс реализует объекты семафора. Семафор управляет атомарным счетчиком, представляющим количество вызовов release() минус количество вызовов acquire() плюс начальное значение. Метод acquire() при необходимости блокируется, пока не сможет вернуться, не сделав счетчик отрицательным. Если значение не задано, value по умолчанию равно 1.

Необязательный аргумент задает начальное значение для внутреннего счетчика; по умолчанию он равен 1. Если заданное значение меньше 0, то выдается значение ValueError.

Изменено в версии 3.3: превратился из фабричной функции в класс.

acquire(blocking=True, timeout=None)

Приобретите семафор.

При вызове без аргументов:

  • Если при входе внутренний счетчик больше нуля, уменьшите его на единицу и немедленно верните True.

  • Если при входе внутренний счетчик равен нулю, блокируйте его до тех пор, пока он не будет разбужен вызовом release(). После пробуждения (и если счетчик больше 0), уменьшите счетчик на 1 и верните True. При каждом вызове release() будет пробужден ровно один поток. Не следует полагаться на порядок пробуждения потоков.

При вызове с блокировкой, установленной в False, не блокируйте. Если вызов без аргумента заблокировал бы, немедленно верните False; в противном случае сделайте то же самое, что и при вызове без аргументов, и верните True.

При вызове с timeout, отличным от None, он будет блокироваться не более чем на timeout секунд. Если acquire не завершится успешно за этот промежуток времени, верните False. В противном случае возвращается True.

Изменено в версии 3.2: Параметр timeout является новым.

release(n=1)

Освободите семафор, увеличив внутренний счетчик на n. Если при входе он был равен нулю и другие потоки ждут, когда он снова станет больше нуля, разбудите n этих потоков.

Изменено в версии 3.9: Добавлен параметр n для одновременного освобождения нескольких ожидающих потоков.

class threading.BoundedSemaphore(value=1)

Класс, реализующий объекты ограниченного семафора. Ограниченный семафор проверяет, не превышает ли его текущее значение начальное значение. Если превышает, то вызывается сигнал ValueError. В большинстве ситуаций семафоры используются для охраны ресурсов с ограниченной емкостью. Если семафор освобождается слишком часто, это признак ошибки. Если значение не задано, value по умолчанию равно 1.

Изменено в версии 3.3: превратился из фабричной функции в класс.

Semaphore Пример

Семафоры часто используются для охраны ресурсов с ограниченной емкостью, например, сервера базы данных. В любой ситуации, когда размер ресурса фиксирован, вы должны использовать ограниченный семафор. Прежде чем порождать рабочие потоки, ваш главный поток инициализирует семафор:

maxconnections = 5
# ...
pool_sema = BoundedSemaphore(value=maxconnections)

После порождения рабочие потоки вызывают методы приобретения и освобождения семафора, когда им нужно подключиться к серверу:

with pool_sema:
    conn = connectdb()
    try:
        # ... use connection ...
    finally:
        conn.close()

Использование ограниченного семафора уменьшает вероятность того, что ошибка программирования, в результате которой семафор освобождается чаще, чем приобретается, останется незамеченной.

Объекты событий

Это один из простейших механизмов взаимодействия между потоками: один поток подает сигнал о событии, а другие потоки ждут его.

Объект события управляет внутренним флагом, который может быть установлен в true с помощью метода set() и сброшен в false с помощью метода clear(). Метод wait() блокируется до тех пор, пока флаг не станет истинным.

class threading.Event

Класс, реализующий объекты событий. Событие управляет флагом, который может быть установлен в true с помощью метода set() и сброшен в false с помощью метода clear(). Метод wait() блокируется до тех пор, пока флаг не станет истинным. Изначально флаг имеет значение false.

Изменено в версии 3.3: превратился из фабричной функции в класс.

is_set()

Возвращает True тогда и только тогда, когда внутренний флаг истинен.

Метод isSet является устаревшим псевдонимом для этого метода.

set()

Установите внутренний флаг в true. Все потоки, ожидающие, пока он станет истинным, пробуждаются. Потоки, вызывающие wait() после того, как флаг станет истинным, не будут блокироваться вообще.

clear()

Сбросить внутренний флаг в false. Впоследствии потоки, вызывающие wait(), будут блокироваться до тех пор, пока не будет вызван set(), чтобы снова установить внутренний флаг в true.

wait(timeout=None)

Блокировать до тех пор, пока внутренний флаг не станет истинным. Если при входе внутренний флаг истинен, вернитесь немедленно. В противном случае, блокируйте до тех пор, пока другой поток не вызовет set(), чтобы установить флаг в true, или пока не произойдет необязательный тайм-аут.

Если аргумент timeout присутствует, а не None, он должен представлять собой число с плавающей точкой, задающее тайм-аут для операции в секундах (или их долях).

Этот метод возвращает True тогда и только тогда, когда внутренний флаг был установлен в true, либо до вызова ожидания, либо после начала ожидания, поэтому он всегда будет возвращать True, за исключением случаев, когда задан таймаут и операция завершается.

Изменено в версии 3.1: Ранее метод всегда возвращал None.

Объекты таймера

Этот класс представляет действие, которое должно быть выполнено только по истечении определенного времени — таймера. Timer является подклассом Thread и как таковой также функционирует как пример создания пользовательских потоков.

Таймеры запускаются, как и потоки, вызовом их метода start(). Таймер может быть остановлен (до начала его действия) вызовом метода cancel(). Интервал, который таймер будет ждать перед выполнением своего действия, может не совпадать с интервалом, заданным пользователем.

Например:

def hello():
    print("hello, world")

t = Timer(30.0, hello)
t.start()  # after 30 seconds, "hello, world" will be printed
class threading.Timer(interval, function, args=None, kwargs=None)

Создайте таймер, который выполнит функцию с аргументами args и ключевыми аргументами kwargs по истечении интервала секунд. Если args равно None (по умолчанию), то будет использоваться пустой список. Если kwargs равно None (по умолчанию), то будет использоваться пустой dict.

Изменено в версии 3.3: превратился из фабричной функции в класс.

cancel()

Остановите таймер и отмените выполнение действия таймера. Это сработает только в том случае, если таймер все еще находится на стадии ожидания.

Барьерные объекты

Добавлено в версии 3.2.

Этот класс предоставляет простой примитив синхронизации для использования фиксированным числом потоков, которым необходимо ждать друг друга. Каждый из потоков пытается пройти барьер, вызывая метод wait(), и будет блокироваться до тех пор, пока все потоки не сделают свои вызовы wait(). В этот момент потоки одновременно освобождаются.

Барьер можно использовать повторно любое количество раз для одного и того же числа потоков.

В качестве примера, вот простой способ синхронизации потока клиента и сервера:

b = Barrier(2, timeout=5)

def server():
    start_server()
    b.wait()
    while True:
        connection = accept_connection()
        process_server_connection(connection)

def client():
    b.wait()
    while True:
        connection = make_connection()
        process_client_connection(connection)
class threading.Barrier(parties, action=None, timeout=None)

Создайте объект барьера для партии количества потоков. Метод action, если он указан, представляет собой вызываемый объект, который будет вызван одним из потоков, когда они будут освобождены. timeout - это значение таймаута по умолчанию, если для метода wait() не указано ни одного.

wait(timeout=None)

Пройти барьер. Когда все потоки, участвующие в барьере, вызовут эту функцию, все они будут освобождены одновременно. Если указан таймаут, он используется предпочтительнее, чем тот, который был задан в конструкторе класса.

Возвращаемое значение - целое число в диапазоне от 0 до parties – 1, разное для каждого потока. Это может быть использовано для выбора потока для выполнения некоторых специальных операций, например:

i = barrier.wait()
if i == 0:
    # Only one thread needs to print this
    print("passed the barrier")

Если конструктору было предоставлено действие, то один из потоков вызовет его до освобождения. Если этот вызов приведет к ошибке, барьер перейдет в состояние разрушенного.

Если вызов завершается, барьер переходит в состояние «сломан».

Этот метод может вызвать исключение BrokenBarrierError, если барьер нарушен или сброшен во время ожидания потока.

reset()

Возвращает барьер в стандартное, пустое состояние. Все ожидающие на нем потоки получат исключение BrokenBarrierError.

Обратите внимание, что использование этой функции может потребовать внешней синхронизации, если есть другие потоки, состояние которых неизвестно. Если барьер нарушен, возможно, лучше просто оставить его и создать новый.

abort()

Переведите барьер в состояние разрушения. Это приводит к тому, что все активные или будущие вызовы wait() завершаются неудачей BrokenBarrierError. Используйте это, например, если один из потоков должен прерваться, чтобы избежать тупика приложения.

Может быть предпочтительнее просто создать барьер с разумным значением timeout для автоматической защиты от сбоя одного из потоков.

parties

Количество нитей, необходимое для прохождения барьера.

n_waiting

Количество потоков, ожидающих в настоящее время в барьере.

broken

Булево значение True, если барьер находится в разрушенном состоянии.

exception threading.BrokenBarrierError

Это исключение, подкласс RuntimeError, возникает, когда объект Barrier сбрасывается или разрушается.

Использование блокировок, условий и семафоров в операторе with

Все объекты, предоставляемые этим модулем, которые имеют методы acquire() и release(), могут быть использованы в качестве менеджеров контекста для оператора with. Метод acquire() будет вызван при входе в блок, а release() будет вызван при выходе из блока. Следовательно, следующий фрагмент:

with some_lock:
    # do something...

эквивалентно:

some_lock.acquire()
try:
    # do something...
finally:
    some_lock.release()

В настоящее время в качестве менеджеров контекста оператора Lock, RLock, Condition, Semaphore и BoundedSemaphore могут использоваться объекты with.

Вернуться на верх