concurrent.futures
— Запуск параллельных задач¶
Добавлено в версии 3.2.
Исходный код: Lib/concurrent/futures/thread.py и Lib/concurrent/futures/process.py
Модуль concurrent.futures
предоставляет высокоуровневый интерфейс для асинхронного выполнения вызываемых таблиц.
Асинхронное выполнение может осуществляться с помощью потоков, используя ThreadPoolExecutor
, или отдельных процессов, используя ProcessPoolExecutor
. Оба реализуют один и тот же интерфейс, который определяется абстрактным классом Executor
.
Объекты-исполнители¶
-
class
concurrent.futures.
Executor
¶ Абстрактный класс, предоставляющий методы для асинхронного выполнения вызовов. Его следует использовать не напрямую, а через его конкретные подклассы.
-
submit
(fn, /, *args, **kwargs)¶ Планирует выполнение вызываемой переменной, fn, как
fn(*args, **kwargs)
и возвращает объектFuture
, представляющий выполнение вызываемой переменной.with ThreadPoolExecutor(max_workers=1) as executor: future = executor.submit(pow, 323, 1235) print(future.result())
-
map
(func, *iterables, timeout=None, chunksize=1)¶ Аналогично
map(func, *iterables)
, за исключением:iterables собираются немедленно, а не лениво;
func выполняется асинхронно, и несколько вызовов func могут выполняться одновременно.
Возвращаемый итератор вызывает ошибку
concurrent.futures.TimeoutError
, если вызывается__next__()
и результат не доступен через timeout секунд после первоначального вызоваExecutor.map()
. timeout может быть значением int или float. Если timeout не указан илиNone
, время ожидания не ограничено.Если вызов func вызывает исключение, то это исключение будет вызвано, когда его значение будет получено из итератора.
При использовании
ProcessPoolExecutor
этот метод разбивает iterables на несколько фрагментов, которые он отправляет в пул как отдельные задачи. (Приблизительный) размер этих фрагментов можно задать, установив chunksize в целое положительное число. Для очень длинных итераций использование большого значения chunksize может значительно повысить производительность по сравнению с размером по умолчанию 1. ПриThreadPoolExecutor
chunksize не влияет.Изменено в версии 3.5: Добавлен аргумент chunksize.
-
shutdown
(wait=True, *, cancel_futures=False)¶ Сигнализирует исполнителю, что он должен освободить все ресурсы, которые он использует, когда текущие ожидающие фьючерсы закончат выполняться. Вызовы
Executor.submit()
иExecutor.map()
, сделанные после выключения, вызовут ошибкуRuntimeError
.Если wait имеет значение
True
, то этот метод не вернется, пока все ожидающие фьючерсы не закончат выполнение и ресурсы, связанные с исполнителем, не будут освобождены. Если wait имеет значениеFalse
, то этот метод вернется немедленно, а ресурсы, связанные с исполнителем, будут освобождены после завершения выполнения всех ожидающих фьючерсов. Независимо от значения wait, вся программа Python не завершится, пока не будут выполнены все ожидающие фьючерсы.Если cancel_futures имеет значение
True
, этот метод отменит все ожидающие фьючерсы, которые исполнитель не начал выполнять. Любые фьючерсы, которые завершены или запущены, не будут отменены, независимо от значения cancel_futures.Если и cancel_futures, и wait равны
True
, все фьючерсы, которые исполнитель начал выполнять, будут завершены до возвращения этого метода. Оставшиеся фьючерсы отменяются.Вы можете избежать необходимости явного вызова этого метода, если используете оператор
with
, который выключитExecutor
(ожидание, как если быExecutor.shutdown()
было вызвано с wait, установленным вTrue
):import shutil with ThreadPoolExecutor(max_workers=4) as e: e.submit(shutil.copy, 'src1.txt', 'dest1.txt') e.submit(shutil.copy, 'src2.txt', 'dest2.txt') e.submit(shutil.copy, 'src3.txt', 'dest3.txt') e.submit(shutil.copy, 'src4.txt', 'dest4.txt')
Изменено в версии 3.9: Добавлено cancel_futures.
-
ThreadPoolExecutor¶
ThreadPoolExecutor
- это подкласс Executor
, который использует пул потоков для асинхронного выполнения вызовов.
Тупики могут возникать, когда вызываемый объект, связанный с Future
, ожидает результатов другого Future
. Например:
import time
def wait_on_b():
time.sleep(5)
print(b.result()) # b will never complete because it is waiting on a.
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a will never complete because it is waiting on b.
return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
И:
def wait_on_future():
f = executor.submit(pow, 5, 2)
# This will never complete because there is only one worker thread and
# it is executing this function.
print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
-
class
concurrent.futures.
ThreadPoolExecutor
(max_workers=None, thread_name_prefix='', initializer=None, initargs=())¶ Подкласс
Executor
, который использует пул из максимум max_workers потоков для асинхронного выполнения вызовов.Все потоки, выстроенные в очередь на
ThreadPoolExecutor
, будут объединены до того, как интерпретатор сможет выйти. Обратите внимание, что обработчик выхода, который это делает, выполняется перед любыми обработчиками выхода, добавленными с помощью atexit. Это означает, что исключения в главном потоке должны быть пойманы и обработаны, чтобы дать сигнал потокам на изящный выход. По этой причине рекомендуется не использоватьThreadPoolExecutor
для долго выполняющихся задач.initializer - это необязательный вызываемый модуль, который вызывается в начале каждого рабочего потока; initargs - это кортеж аргументов, передаваемых инициализатору. Если initializer вызовет исключение, все текущие задания, ожидающие выполнения, вызовут сообщение
BrokenThreadPool
, а также все попытки отправить в пул дополнительные задания.Изменено в версии 3.5: Если max_workers равно
None
или не задано, то по умолчанию оно будет равно количеству процессоров на машине, умноженному на5
, предполагая, чтоThreadPoolExecutor
часто используется для перекрытия ввода/вывода вместо работы процессора, и количество рабочих должно быть больше, чем количество рабочих дляProcessPoolExecutor
.Добавлено в версии 3.6: Аргумент thread_name_prefix был добавлен, чтобы позволить пользователям контролировать :class:`threading.Thread`имена рабочих потоков, создаваемых пулом, для облегчения отладки.
Изменено в версии 3.7: Добавлены аргументы initializer и initargs.
Изменено в версии 3.8: Значение по умолчанию max_workers изменено на
min(32, os.cpu_count() + 4)
. Это значение по умолчанию сохраняет не менее 5 рабочих для задач, связанных с вводом/выводом. Оно использует не более 32 ядер процессора для задач, связанных с процессором, которые освобождают GIL. И оно позволяет избежать неявного использования очень больших ресурсов на многоядерных машинах.ThreadPoolExecutor теперь повторно использует простаивающие рабочие потоки перед запуском рабочих потоков max_workers.
Пример ThreadPoolExecutor¶
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
ProcessPoolExecutor¶
Класс ProcessPoolExecutor
является подклассом Executor
, который использует пул процессов для асинхронного выполнения вызовов. ProcessPoolExecutor
использует модуль multiprocessing
, что позволяет ему обойти Global Interpreter Lock, но также означает, что только picklable объекты могут быть выполнены и возвращены.
Модуль __main__
должен быть импортируемым рабочими подпроцессами. Это означает, что ProcessPoolExecutor
не будет работать в интерактивном интерпретаторе.
Вызов методов Executor
или Future
из вызываемого объекта, переданного в ProcessPoolExecutor
, приведет к тупику.
-
class
concurrent.futures.
ProcessPoolExecutor
(max_workers=None, mp_context=None, initializer=None, initargs=())¶ Подкласс
Executor
, который выполняет вызовы асинхронно, используя пул из не более чем max_workers процессов. Если max_workers равноNone
или не задано, то по умолчанию будет задано количество процессоров на машине. Если max_workers меньше или равно0
, то будет выдано предупреждениеValueError
. В Windows, max_workers должно быть меньше или равно61
. Если это не так, то будет выдано сообщениеValueError
. Если max_workers равноNone
, то по умолчанию будет выбрано не более61
, даже если доступно больше процессоров. mp_context может быть многопроцессорным контекстом или None. Он будет использоваться для запуска рабочих. Если mp_context равенNone
или не указан, то используется многопроцессорный контекст по умолчанию.initializer - это необязательный вызываемый модуль, который вызывается в начале каждого рабочего процесса; initargs - это кортеж аргументов, передаваемых инициализатору. Если initializer вызовет исключение, то все текущие задания, ожидающие выполнения, вызовут сообщение
BrokenProcessPool
, а также все попытки отправить дополнительные задания в пул.Изменено в версии 3.3: При внезапном завершении одного из рабочих процессов теперь выдается ошибка
BrokenProcessPool
. Ранее поведение было неопределенным, но операции над исполнителем или его фьючерсами часто зависали или заходили в тупик.Изменено в версии 3.7: Аргумент mp_context был добавлен, чтобы позволить пользователям контролировать метод start_method для рабочих процессов, созданных пулом.
Добавлены аргументы initializer и initargs.
Пример ProcessPoolExecutor¶
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
Объекты будущего¶
Класс Future
инкапсулирует асинхронное выполнение вызываемого объекта. Экземпляры Future
создаются с помощью Executor.submit()
.
-
class
concurrent.futures.
Future
¶ Инкапсулирует асинхронное выполнение вызываемого объекта. Экземпляры
Future
создаютсяExecutor.submit()
и не должны создаваться напрямую, кроме как для тестирования.-
cancel
()¶ Попытка отменить вызов. Если вызов в данный момент выполняется или завершен и не может быть отменен, то метод вернет
False
, в противном случае вызов будет отменен и метод вернетTrue
.
-
cancelled
()¶ Верните
True
, если вызов был успешно отменен.
-
running
()¶ Верните
True
, если вызов в данный момент выполняется и не может быть отменен.
-
done
()¶ Верните
True
, если вызов был успешно отменен или завершен.
-
result
(timeout=None)¶ Возвращает значение, возвращенное вызовом. Если вызов еще не завершен, то этот метод будет ждать до timeout секунд. Если вызов не завершился за timeout секунд, то будет выдано сообщение
concurrent.futures.TimeoutError
. timeout может быть значением int или float. Если timeout не указан илиNone
, то время ожидания не ограничено.Если будущее отменяется до завершения, то будет выдано сообщение
CancelledError
.Если вызов вызвал исключение, этот метод вызовет такое же исключение.
-
exception
(timeout=None)¶ Возвращает исключение, вызванное вызовом. Если вызов еще не завершен, то этот метод будет ждать до timeout секунд. Если вызов не завершился за timeout секунд, то будет выдано
concurrent.futures.TimeoutError
. timeout может быть значением int или float. Если timeout не указан илиNone
, то время ожидания не ограничено.Если будущее отменяется до завершения, то будет выдано сообщение
CancelledError
.Если вызов завершился без повышения, возвращается
None
.
-
add_done_callback
(fn)¶ Присоединяет вызываемую функцию fn к будущему. fn будет вызвана, с будущим в качестве единственного аргумента, когда будущее будет отменено или завершит выполнение.
Добавленные вызываемые файлы вызываются в том порядке, в котором они были добавлены, и всегда вызываются в потоке, принадлежащем процессу, который их добавил. Если вызываемая переменная вызывает подкласс
Exception
, она будет зарегистрирована и проигнорирована. Если вызываемая переменная вызывает подклассBaseException
, поведение не определено.Если будущее уже завершилось или было отменено, немедленно вызывается fn.
Следующие
Future
методы предназначены для использования в модульных тестах иExecutor
реализациях.-
set_running_or_notify_cancel
()¶ Этот метод должен вызываться только реализациями
Executor
перед выполнением работы, связанной сFuture
, и модульными тестами.Если метод возвращает
False
, тоFuture
был отменен, т.е. был вызванFuture.cancel()
, который вернул True. Все потоки, ожидающие завершенияFuture
(т.е. черезas_completed()
илиwait()
), будут разбужены.Если метод возвращает
True
, тоFuture
не был отменен и был переведен в состояние выполнения, т.е. вызовыFuture.running()
будут возвращать True.Этот метод может быть вызван только один раз и не может быть вызван после вызова
Future.set_result()
илиFuture.set_exception()
.
-
set_result
(result)¶ Устанавливает результат работы, связанной с
Future
, в результат.Этот метод должен использоваться только в реализациях
Executor
и модульных тестах.Изменено в версии 3.8: Этот метод поднимает
concurrent.futures.InvalidStateError
, еслиFuture
уже выполнен.
-
Функции модуля¶
-
concurrent.futures.
wait
(fs, timeout=None, return_when=ALL_COMPLETED)¶ Дождаться завершения работы экземпляров
Future
(возможно, созданных разными экземплярамиExecutor
), переданных fs. Дубликаты фьючерсов, переданных fs, удаляются и будут возвращены только один раз. Возвращает именованный 2-кортеж множеств. Первый набор, с именемdone
, содержит фьючерсы, которые завершились (завершенные или отмененные фьючерсы) до завершения ожидания. Второй набор, с именемnot_done
, содержит фьючерсы, которые не завершились (ожидающие или выполняющиеся фьючерсы).timeout можно использовать для управления максимальным количеством секунд ожидания перед возвратом. timeout может быть значением int или float. Если timeout не указан или
None
, время ожидания не ограничено.return_when указывает, когда эта функция должна вернуться. Это должна быть одна из следующих констант:
Постоянная
Описание
FIRST_COMPLETED
Функция возвращается, когда любое будущее завершается или отменяется.
FIRST_EXCEPTION
Функция вернется, когда любое будущее завершится, вызвав исключение. Если ни одно будущее не вызывает исключения, то это эквивалентно
ALL_COMPLETED
.ALL_COMPLETED
Функция вернется, когда все фьючерсы завершатся или будут отменены.
-
concurrent.futures.
as_completed
(fs, timeout=None)¶ Возвращает итератор по экземплярам
Future
(возможно, созданным разными экземплярамиExecutor
), заданным fs, который возвращает фьючерсы по мере их завершения (завершенные или отмененные фьючерсы). Любые фьючерсы, переданные fs, которые дублируются, будут возвращены один раз. Фьючерсы, которые завершились до вызоваas_completed()
, будут возвращены первыми. Возвращенный итератор вызывает ошибкуconcurrent.futures.TimeoutError
, если вызван__next__()
и результат не доступен через timeout секунд после первоначального вызоваas_completed()
. timeout может быть значением int или float. Если timeout не указан илиNone
, время ожидания не ограничено.
См.также
- PEP 3148 – фьючерсы - выполняют вычисления асинхронно
Предложение, описывающее эту возможность для включения в стандартную библиотеку Python.
Классы исключений¶
-
exception
concurrent.futures.
CancelledError
¶ Возникает, когда будущее отменяется.
-
exception
concurrent.futures.
TimeoutError
¶ Возникает, когда будущая операция превышает заданный тайм-аут.
-
exception
concurrent.futures.
BrokenExecutor
¶ Производный от
RuntimeError
, этот класс исключений возникает, когда исполнитель по какой-то причине сломан и не может быть использован для отправки или выполнения новых задач.Добавлено в версии 3.7.
-
exception
concurrent.futures.
InvalidStateError
¶ Возникает, когда над будущим выполняется операция, которая недопустима в текущем состоянии.
Добавлено в версии 3.8.
-
exception
concurrent.futures.thread.
BrokenThreadPool
¶ Производный от
BrokenExecutor
, этот класс исключений возникает, когда один из рабочихThreadPoolExecutor
не смог инициализироваться.Добавлено в версии 3.7.
-
exception
concurrent.futures.process.
BrokenProcessPool
¶ Производный от
BrokenExecutor
(ранееRuntimeError
), этот класс исключений возникает, когда один из рабочихProcessPoolExecutor
завершается нечистым образом (например, если он был убит извне).Добавлено в версии 3.3.