concurrent.futures
— Запуск параллельных задач¶
Добавлено в версии 3.2.
Исходный код: Lib/concurrent/futures/thread.py и Lib/concurrent/futures/process.py
Модуль concurrent.futures
предоставляет высокоуровневый интерфейс для асинхронного выполнения вызываемых объектов.
Асинхронное выполнение может выполняться потоками, используя ThreadPoolExecutor
, или отдельными процессами, используя ProcessPoolExecutor
. Оба они реализуют один и тот же интерфейс, который определяется абстрактным классом Executor
.
Availability: это не Emscripten, это был не я.
Этот модуль не работает или недоступен на платформах WebAssembly wasm32-emscripten
и wasm32-wasi
. Дополнительную информацию смотрите в разделе Платформы веб-сборки.
Объекты исполнителя¶
- class concurrent.futures.Executor¶
Абстрактный класс, предоставляющий методы для асинхронного выполнения вызовов. Его следует использовать не напрямую, а через его конкретные подклассы.
- submit(fn, /, *args, **kwargs)¶
Планирует выполнение вызываемого объекта fn как
fn(*args, **kwargs)
и возвращает объектFuture
, представляющий выполнение вызываемого объекта.with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(pow, 323, 1235) print(future.result())
- map(fn, *iterables, timeout=None, chunksize=1)¶
Аналогично
map(fn, *iterables)
, за исключением:повторяющиеся объекты собираются немедленно, а не лениво;
fn выполняется асинхронно, и несколько вызовов fn могут выполняться одновременно.
Возвращаемый итератор генерирует
TimeoutError
, если вызывается__next__()
и результат недоступен по истечении времени ожидания (timeout) секунд с момента первоначального вызова доExecutor.map()
. Время ожидания (timeout) может быть как int, так и float. Если значение timeout не указано илиNone
, время ожидания не ограничено.Если вызов fn вызывает исключение, то это исключение будет вызвано при извлечении его значения из итератора.
При использовании
ProcessPoolExecutor
этот метод разбивает iterables на несколько блоков, которые он отправляет в пул в качестве отдельных задач. (Приблизительный) размер этих блоков можно задать, установив значение chunksize равным целому положительному числу. Для очень длинных итераций использование большого значения для chunksize может значительно повысить производительность по сравнению с размером по умолчанию, равным 1. При использованииThreadPoolExecutor
значение chunksize не имеет эффекта.Изменено в версии 3.5: Добавлен аргумент chunksize.
- shutdown(wait=True, *, cancel_futures=False)¶
Подайте сигнал исполнителю, что он должен освободить все ресурсы, которые он использует, когда завершится выполнение ожидающих выполнения фьючерсов. Вызовы
Executor.submit()
иExecutor.map()
, выполненные после завершения работы, вызовутRuntimeError
.Если значение wait равно
True
, то этот метод не будет возвращен до тех пор, пока не завершатся все ожидающие выполнения фьючерсы и не будут освобождены ресурсы, связанные с исполнителем. Если значение wait равноFalse
, то этот метод вернется немедленно, и ресурсы, связанные с исполнителем, будут освобождены, когда все ожидающие выполнения фьючерсы будут выполнены. Независимо от значения wait, вся программа на Python не завершит работу, пока не будут выполнены все ожидающие выполнения фьючерсы.Если значение cancel_futures равно
True
, этот метод отменит все ожидающие исполнения фьючерсы, которые еще не запущены исполнителем. Любые завершенные или запущенные функции не будут отменены, независимо от значения cancel_futures.Если значения cancel_futures и wait равны
True
, все функции, запущенные исполнителем, будут завершены до возврата этого метода. Остальные варианты выполнения отменяются.Вы можете избежать необходимости вызывать этот метод явно, если используете инструкцию
with
, которая завершит работуExecutor
(ожидание, как если быExecutor.shutdown()
вызывался с wait, установленным вTrue
):import shutil with ThreadPoolExecutor(max_workers=4) as e: e.submit(shutil.copy, 'src1.txt', 'dest1.txt') e.submit(shutil.copy, 'src2.txt', 'dest2.txt') e.submit(shutil.copy, 'src3.txt', 'dest3.txt') e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
Изменено в версии 3.9: Добавлено cancel_futures.
ThreadPoolExecutor - исполнитель потоков¶
ThreadPoolExecutor
- это подкласс Executor
, который использует пул потоков для асинхронного выполнения вызовов.
Взаимоблокировки могут возникать, когда вызываемый объект, связанный с Future
, ожидает результатов другого Future
. Например:
import time
def wait_on_b():
time.sleep(5)
print(b.result()) # b will never complete because it is waiting on a.
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a will never complete because it is waiting on b.
return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
И:
def wait_on_future():
f = executor.submit(pow, 5, 2)
# This will never complete because there is only one worker thread and
# it is executing this function.
print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
- class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())¶
Подкласс
Executor
, который использует пул не более чем из потоков max_workers для асинхронного выполнения вызовов.Все потоки, поставленные в очередь на
ThreadPoolExecutor
, будут объединены до того, как интерпретатор сможет завершить работу. Обратите внимание, что обработчик завершения, который выполняет это, выполняется до любых обработчиков завершения, добавленных с помощьюatexit
. Это означает, что исключения в главном потоке должны быть перехвачены и обработаны, чтобы потоки могли корректно завершить работу. По этой причине рекомендуется, чтобыThreadPoolExecutor
не использовался для длительно выполняющихся задач.initializer - это необязательный вызываемый объект, который вызывается в начале каждого рабочего потока; initargs - это набор аргументов, передаваемых инициализатору. Если инициализатор вызовет исключение, все ожидающие в данный момент задания вызовут
BrokenThreadPool
, а также любая попытка отправить больше заданий в пул.Изменено в версии 3.5: Если значение max_workers равно
None
или не задано, то по умолчанию будет указано количество процессоров на компьютере, умноженное на5
, при условии, чтоThreadPoolExecutor
часто используется для перекрытия операций ввода-вывода вместо работы процессора и количество работников должно быть больше, чем количество работников дляProcessPoolExecutor
.Изменено в версии 3.6: Добавлен параметр thread_name_prefix, позволяющий пользователям управлять именами
threading.Thread
для рабочих потоков, созданных пулом, для упрощения отладки.Изменено в версии 3.7: Добавлены аргументы initializer и initargs.
Изменено в версии 3.8: Значение по умолчанию max_workers изменено на
min(32, os.cpu_count() + 4)
. Это значение по умолчанию сохраняет не менее 5 рабочих элементов для задач, связанных с вводом-выводом. Для задач, связанных с процессором, которые освобождают GIL, используется не более 32 процессорных ядер. И это позволяет избежать неявного использования очень больших ресурсов на многоядерных машинах.ThreadPoolExecutor теперь повторно использует незанятые рабочие потоки и перед запуском рабочих потоков max_workers.
Пример ThreadPoolExecutor¶
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://nonexistant-subdomain.python.org/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
ProcessPoolExecutor - исполнитель¶
Класс ProcessPoolExecutor
является подклассом Executor
, который использует пул процессов для асинхронного выполнения вызовов. ProcessPoolExecutor
использует модуль multiprocessing
, который позволяет ему обойти Global Interpreter Lock но также означает, что могут быть выполнены и возвращены только объекты с возможностью выбора.
Модуль __main__
должен быть доступен для импорта рабочими подпроцессами. Это означает, что ProcessPoolExecutor
не будет работать в интерактивном интерпретаторе.
Вызов методов Executor
или Future
из вызываемого объекта, переданного в ProcessPoolExecutor
, приведет к взаимоблокировке.
- class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)¶
Подкласс
Executor
, который выполняет вызовы асинхронно, используя пул не более max_workers процессов. Если значение max_workers равноNone
или не задано, то по умолчанию будет указано количество процессоров на компьютере. Если значение max_workers меньше или равно0
, то будет поднято значениеValueError
. В Windows значение max_workers должно быть меньше или равно61
. Если это не так, то значениеValueError
будет увеличено. Если значение max_workers равноNone
, то по умолчанию будет выбрано не более61
, даже если доступно больше процессоров. mp_context может быть многопроцессорным контекстом или не иметь его. Он будет использоваться для запуска рабочих процессов. Если значение mp_context равноNone
или не задано, используется многопроцессорный контекст по умолчанию.initializer - это необязательный вызываемый объект, который вызывается в начале каждого рабочего процесса; initargs - это набор аргументов, передаваемых инициализатору. Если инициализатор вызовет исключение, все ожидающие в данный момент задания вызовут
BrokenProcessPool
, а также любая попытка отправить больше заданий в пул.max_tasks_per_child - необязательный аргумент, который определяет максимальное количество задач, которые может выполнить один процесс, прежде чем он завершит работу и будет заменен новым рабочим процессом. По умолчанию значение max_tasks_per_child равно
None
, что означает, что рабочие процессы будут жить столько, сколько существует пул. Если указано значение max, по умолчанию будет использоваться метод запуска многопроцессорной обработки «spawn» в отсутствие параметра mp_context. Эта функция несовместима с методом запуска «fork».Изменено в версии 3.3: Когда один из рабочих процессов внезапно завершается, теперь возникает ошибка
BrokenProcessPool
. Ранее поведение было неопределенным, но операции с исполнителем или его фьючерсами часто зависали или заходили в тупик.Изменено в версии 3.7: Аргумент mp_context был добавлен, чтобы позволить пользователям управлять start_method для рабочих процессов, созданных пулом.
Добавлены аргументы initializer и initargs.
Изменено в версии 3.11: Был добавлен аргумент max_tasks_per_child, позволяющий пользователям контролировать время жизни работников в пуле.
Пример ProcessPoolExecutor¶
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
Будущие объекты¶
Класс Future
инкапсулирует асинхронное выполнение вызываемого объекта. Future
Экземпляры создаются с помощью Executor.submit()
.
- class concurrent.futures.Future¶
Инкапсулирует асинхронное выполнение вызываемого объекта.
Future
экземпляры создаются с помощьюExecutor.submit()
и не должны создаваться напрямую, за исключением тестирования.- cancel()¶
Попытайтесь отменить вызов. Если вызов выполняется в данный момент или завершен и не может быть отменен, метод вернет значение
False
, в противном случае вызов будет отменен и метод вернет значениеTrue
.
- cancelled()¶
Верните
True
, если вызов был успешно отменен.
- running()¶
Возвращает
True
, если вызов выполняется в данный момент и не может быть отменен.
- done()¶
Верните
True
, если вызов был успешно отменен или завершен.
- result(timeout=None)¶
Возвращает значение, возвращенное вызовом. Если вызов еще не завершен, то этот метод будет ждать до истечения времени ожидания в секундах. Если вызов не завершился в течение времени ожидания в секундах, то будет поднят
TimeoutError
. timeout может быть значением int или float. Если значение timeout не указано илиNone
, время ожидания не ограничено.Если будущее отменено до завершения, то значение
CancelledError
будет увеличено.Если вызов вызвал исключение, этот метод вызовет такое же исключение.
- exception(timeout=None)¶
Возвращает исключение, вызванное вызовом. Если вызов еще не завершен, то этот метод будет ждать до истечения времени ожидания в секундах. Если вызов не завершился в течение времени ожидания в секундах, то будет выдано значение
TimeoutError
. timeout может быть значением int или float. Если значение timeout не указано илиNone
, время ожидания не ограничено.Если будущее отменено до завершения, то значение
CancelledError
будет увеличено.Если вызов завершен без повышения, возвращается
None
.
- add_done_callback(fn)¶
Присоединяет вызываемый fn к future. fn будет вызван с использованием future в качестве единственного аргумента, когда future будет отменен или завершит выполнение.
Добавленные вызываемые объекты вызываются в том порядке, в котором они были добавлены, и всегда вызываются в потоке, принадлежащем процессу, который их добавил. Если вызываемый объект создает подкласс
Exception
, он регистрируется и игнорируется. Если вызываемый объект создает подклассBaseException
, поведение не определено.Если будущее уже завершено или отменено, fn будет вызван немедленно.
Следующие методы
Future
предназначены для использования в модульных тестах и реализацияхExecutor
.- set_running_or_notify_cancel()¶
Этот метод должен вызываться только реализациями
Executor
перед выполнением работы, связанной сFuture
, и модульными тестами.Если метод возвращает
False
, тоFuture
был отменен, т.е. был вызванFuture.cancel()
и возвращенTrue
. Все потоки, ожидающие завершенияFuture
(т.е. черезas_completed()
илиwait()
), будут активированы.Если метод возвращает
True
, тоFuture
не был отменен и был переведен в состояние выполнения, т.е. вызовыFuture.running()
будут возвращатьTrue
.Этот метод может быть вызван только один раз и не может быть вызван после вызова
Future.set_result()
илиFuture.set_exception()
.
- set_result(result)¶
Присваивает результату работы, связанной с параметром
Future
, значение результат.Этот метод следует использовать только в
Executor
реализациях и модульных тестах.Изменено в версии 3.8: Этот метод вызывает
concurrent.futures.InvalidStateError
, еслиFuture
уже выполнено.
Функции модуля¶
- concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)¶
Дождитесь завершения создания экземпляров
Future
(возможно, созданных разными экземплярамиExecutor
), заданных fs. Повторяющиеся варианты будущего, заданные fs, удаляются и будут возвращены только один раз. Возвращает именованный набор из 2 наборов. Первый набор с именемdone
содержит объекты, которые завершили работу (завершили или отменили фьючерсы) до завершения ожидания. Второй набор, названныйnot_done
, содержит фьючерсы, которые не завершились (ожидающие или запущенные фьючерсы).timeout может использоваться для управления максимальным количеством секунд ожидания перед возвратом. timeout может быть значением int или float. Если значение timeout не указано или
None
, время ожидания не ограничено.return_when указывает, когда эта функция должна вернуться. Это должна быть одна из следующих констант:
Постоянный
Описание
- concurrent.futures.FIRST_COMPLETED¶
Функция будет возвращена, когда все последующие действия завершатся или будут отменены.
- concurrent.futures.FIRST_EXCEPTION¶
Функция вернет значение, когда любое будущее завершится созданием исключения. Если ни одно будущее не вызовет исключения, то это эквивалентно
ALL_COMPLETED
.- concurrent.futures.ALL_COMPLETED¶
Функция вернется, когда все фьючерсы завершатся или будут отменены.
- concurrent.futures.as_completed(fs, timeout=None)¶
Возвращает итератор по экземплярам
Future
(возможно, созданным разными экземплярамиExecutor
), заданный fs, который выдает фьючерсы по мере их завершения (завершенные или отмененные фьючерсы). Все фьючерсы, заданные с помощью fs, которые дублируются, будут возвращены один раз. Все фьючерсы, которые завершились до вызоваas_completed()
, будут возвращены первыми. Возвращаемый итератор генерируетTimeoutError
, если вызывается__next__()
и результат недоступен по истечении времени ожидания (timeout) секунд с момента первоначального вызова доas_completed()
. Время ожидания (timeout) может быть int или float. Если значение timeout не указано илиNone
, то время ожидания не ограничено.
См.также
- PEP 3148 – фьючерсы - выполняют вычисления асинхронно.
Предложение, в котором описывается эта функция для включения в стандартную библиотеку Python.
Классы исключений¶
- exception concurrent.futures.CancelledError¶
Возникает при отмене будущего.
- exception concurrent.futures.TimeoutError¶
Устаревший псевдоним
TimeoutError
, вызываемый, когда будущая операция превышает заданный тайм-аут.Изменено в версии 3.11: Этому классу был присвоен псевдоним
TimeoutError
.
- exception concurrent.futures.BrokenExecutor¶
Производный от
RuntimeError
, этот класс исключений генерируется, когда по какой-либо причине нарушается работа executor, и его нельзя использовать для отправки или выполнения новых задач.Добавлено в версии 3.7.
- exception concurrent.futures.InvalidStateError¶
Возникает, когда выполняется операция с будущим, которая запрещена в текущем состоянии.
Добавлено в версии 3.8.
- exception concurrent.futures.thread.BrokenThreadPool¶
Производный от
BrokenExecutor
, этот класс исключений генерируется, когда один из рабочих элементовThreadPoolExecutor
не прошел инициализацию.Добавлено в версии 3.7.
- exception concurrent.futures.process.BrokenProcessPool¶
Производный от
BrokenExecutor
(ранееRuntimeError
), этот класс исключений генерируется, когда один из рабочих элементовProcessPoolExecutor
завершается некорректным образом (например, если он был удален извне).Добавлено в версии 3.3.