Примитивы синхронизации

Исходный код: Lib/asyncio/locks.py


примитивы синхронизации asyncio разработаны так, чтобы быть похожими на примитивы модуля threading с двумя важными оговорками:

  • примитивы asyncio не являются потокобезопасными, поэтому их не следует использовать для синхронизации потоков операционной системы (для этого используйте threading).;

  • методы этих примитивов синхронизации не принимают аргумент timeout; используйте функцию asyncio.wait_for() для выполнения операций с тайм-аутами.

asyncio имеет следующие основные примитивы синхронизации:


Замок

class asyncio.Lock

Реализует блокировку мьютекса для задач asyncio. Не является потокобезопасным.

Асинхронная блокировка может использоваться для обеспечения эксклюзивного доступа к общему ресурсу.

Предпочтительным способом использования блокировки является оператор async with:

lock = asyncio.Lock()

# ... later
async with lock:
    # access shared state

что эквивалентно:

lock = asyncio.Lock()

# ... later
await lock.acquire()
try:
    # access shared state
finally:
    lock.release()

Изменено в версии 3.10: Удален параметр loop.

coroutine acquire()

Достаньте замок.

Этот метод ожидает, пока блокировка не будет разблокирована, устанавливает ее в значение заблокирована и возвращает True.

Если в режиме acquire() заблокировано более одной сопрограммы в ожидании разблокировки блокировки, в конечном итоге запускается только одна сопрограмма.

Получение блокировки является «справедливым»: запущенная сопрограмма будет первой сопрограммой, которая начала ожидать получения блокировки.

release()

Отпустите замок.

Когда замок будет заблокирован, установите его в положение разблокирован и вернитесь в исходное положение.

Если блокировка разблокирована, то выводится значение RuntimeError.

locked()

Верните True, если блокировка заблокирована.

Событие

class asyncio.Event

Объект события. Не является потокобезопасным.

Событие asyncio можно использовать для уведомления нескольких задач asyncio о том, что произошло какое-то событие.

Объект Event управляет внутренним флагом, который может быть установлен в значение true с помощью метода set() и сброшен в значение false с помощью метода clear(). Метод wait() блокируется до тех пор, пока для флага не будет установлено значение true. Изначально для флага установлено значение false.

Изменено в версии 3.10: Удален параметр loop.

Пример:

async def waiter(event):
    print('waiting for it ...')
    await event.wait()
    print('... got it!')

async def main():
    # Create an Event object.
    event = asyncio.Event()

    # Spawn a Task to wait until 'event' is set.
    waiter_task = asyncio.create_task(waiter(event))

    # Sleep for 1 second and set the event.
    await asyncio.sleep(1)
    event.set()

    # Wait until the waiter task is finished.
    await waiter_task

asyncio.run(main())
coroutine wait()

Подождите, пока событие не будет установлено.

Если событие установлено, немедленно верните True. В противном случае заблокируйте до тех пор, пока другая задача не вызовет set().

set()

Установите событие.

Все задачи, ожидающие установки события, будут немедленно запущены.

clear()

Очистите (отмените настройку) событие.

Задачи, ожидающие выполнения в режиме wait(), теперь будут блокироваться до тех пор, пока метод set() не будет вызван снова.

is_set()

Верните True, если событие задано.

Состояние

class asyncio.Condition(lock=None)

Объект Condition. Не является потокобезопасным.

Примитив asyncio condition может использоваться задачей для ожидания наступления какого-либо события и последующего получения эксклюзивного доступа к общему ресурсу.

По сути, объект Condition сочетает в себе функциональность Event и Lock. Возможно, что несколько объектов Condition совместно используют одну блокировку, что позволяет координировать монопольный доступ к общему ресурсу между различными задачами, заинтересованными в определенных состояниях этого общего ресурса.

Необязательным аргументом lock должен быть объект Lock или None. В последнем случае новый объект блокировки создается автоматически.

Изменено в версии 3.10: Удален параметр loop.

Предпочтительным способом использования условия является оператор async with:

cond = asyncio.Condition()

# ... later
async with cond:
    await cond.wait()

что эквивалентно:

cond = asyncio.Condition()

# ... later
await cond.acquire()
try:
    await cond.wait()
finally:
    cond.release()
coroutine acquire()

Получите базовую блокировку.

Этот метод ожидает, пока базовая блокировка не будет разблокирована, устанавливает для нее значение заблокирована и возвращает True.

notify(n=1)

Запускается не более чем при n задачах (по умолчанию 1), ожидающих выполнения этого условия. Метод не работает, если ни одна из задач не находится в ожидании.

Блокировка должна быть получена до вызова этого метода и снята вскоре после этого. При вызове с разблокированной блокировкой возникает ошибка RuntimeError.

locked()

Возвращает True, если получена базовая блокировка.

notify_all()

Активируйте все задачи, ожидающие выполнения в соответствии с этим условием.

Этот метод действует как notify(), но запускает все ожидающие задачи.

Блокировка должна быть получена до вызова этого метода и снята вскоре после этого. При вызове с разблокированной блокировкой возникает ошибка RuntimeError.

release()

Снимите основную блокировку.

При вызове разблокированной блокировки генерируется RuntimeError.

coroutine wait()

Дождитесь уведомления.

Если вызывающая задача не получила блокировку при вызове этого метода, генерируется RuntimeError.

Этот метод снимает базовую блокировку, а затем блокирует до тех пор, пока не будет активирован вызовом notify() или notify_all(). После активизации условие повторно блокируется, и этот метод возвращает True.

coroutine wait_for(predicate)

Подождите, пока предикат не станет истинным.

Предикат должен быть вызываемым, результат которого будет интерпретироваться как логическое значение. Конечным значением является возвращаемое значение.

Семафор

class asyncio.Semaphore(value=1)

Объект-семафор. Не является потокобезопасным.

Семафор управляет внутренним счетчиком, который уменьшается при каждом вызове acquire() и увеличивается при каждом вызове release(). Счетчик никогда не может опуститься ниже нуля; когда acquire() обнаруживает, что он равен нулю, он блокируется, ожидая, пока какая-нибудь задача не вызовет release().

Необязательный аргумент value задает начальное значение для внутреннего счетчика (по умолчанию 1). Если заданное значение меньше 0, то задается значение ValueError.

Изменено в версии 3.10: Удален параметр loop.

Предпочтительным способом использования семафора является оператор async with:

sem = asyncio.Semaphore(10)

# ... later
async with sem:
    # work with shared resource

что эквивалентно:

sem = asyncio.Semaphore(10)

# ... later
await sem.acquire()
try:
    # work with shared resource
finally:
    sem.release()
coroutine acquire()

Приобретите семафор.

Если внутренний счетчик больше нуля, уменьшите его на единицу и немедленно верните True. Если значение равно нулю, дождитесь вызова release() и верните True.

locked()

Возвращает True, если семафор не может быть получен немедленно.

release()

Разблокируйте семафор, увеличив внутренний счетчик на единицу. Может активировать задачу, ожидающую получения семафора.

В отличие от BoundedSemaphore, Semaphore позволяет совершать больше release() вызовов, чем acquire() вызовов.

Ограниченный семафор

class asyncio.BoundedSemaphore(value=1)

Объект с ограниченным семафором. Не является потокобезопасным.

Ограниченный семафор - это версия Semaphore, которая выдает значение ValueError в release(), если он увеличивает внутренний счетчик выше начального значения *.

Изменено в версии 3.10: Удален параметр loop.

Барьер

class asyncio.Barrier(parties)

Объект-барьер. Не является потокобезопасным.

Барьер - это простой примитив синхронизации, который позволяет блокировать до тех пор, пока на нем не будет ожидаться определенное количество задач. Задачи могут ожидать в методе wait() и будут заблокированы до тех пор, пока указанное количество задач не завершит ожидание в методе wait(). В этот момент все ожидающие задачи будут разблокированы одновременно.

async with может использоваться в качестве альтернативы ожиданию wait().

Барьер можно использовать повторно любое количество раз.

Пример:

async def example_barrier():
   # barrier with 3 parties
   b = asyncio.Barrier(3)

   # create 2 new waiting tasks
   asyncio.create_task(b.wait())
   asyncio.create_task(b.wait())

   await asyncio.sleep(0)
   print(b)

   # The third .wait() call passes the barrier
   await b.wait()
   print(b)
   print("barrier passed")

   await asyncio.sleep(0)
   print(b)

asyncio.run(example_barrier())

Результатом этого примера является:

<asyncio.locks.Barrier object at 0x... [filling, waiters:2/3]>
<asyncio.locks.Barrier object at 0x... [draining, waiters:0/3]>
barrier passed
<asyncio.locks.Barrier object at 0x... [filling, waiters:0/3]>

Добавлено в версии 3.11.

coroutine wait()

Преодолейте барьер. Когда все задачи, связанные с барьером, вызовут эту функцию, все они будут разблокированы одновременно.

Когда ожидающая или заблокированная задача в барьере отменяется, эта задача выходит из барьера, который остается в том же состоянии. Если состояние барьера «заполнено», количество ожидающих задач уменьшается на 1.

Возвращаемое значение представляет собой целое число в диапазоне от 0 до parties-1, различающееся для каждой задачи. Это может использоваться для выбора задачи для выполнения некоторых специальных операций, например:

...
async with barrier as position:
   if position == 0:
      # Only one task prints this
      print('End of *draining phase*')

Этот метод может вызвать исключение BrokenBarrierError, если барьер нарушен или сброшен во время ожидания задачи. Он может вызвать исключение CancelledError, если задача отменена.

coroutine reset()

Верните барьер в пустое состояние по умолчанию. Все ожидающие его задачи получат исключение BrokenBarrierError.

Если барьер сломан, возможно, будет лучше просто оставить его и создать новый.

coroutine abort()

Переведите барьер в нерабочее состояние. Это приведет к сбою всех активных или будущих вызовов wait() с помощью BrokenBarrierError. Используйте это, например, если необходимо прервать выполнение одной из задач, чтобы избежать бесконечного ожидания задач.

parties

Количество заданий, необходимых для прохождения барьера.

n_waiting

Количество задач, ожидающих в данный момент заполнения барьера.

broken

Логическое значение, равное True, если барьер находится в разрушенном состоянии.

exception asyncio.BrokenBarrierError

Это исключение, подкласс RuntimeError, возникает, когда объект Barrier сбрасывается или повреждается.


Изменено в версии 3.9: Получение блокировки с помощью инструкции await lock или yield from lock и/или with (with await lock, with (yield from lock)) было удалено. Вместо этого используйте async with lock.

Вернуться на верх