concurrent.futures — Запуск параллельных задач

Добавлено в версии 3.2.

Исходный код: Lib/concurrent/futures/thread.py и Lib/concurrent/futures/process.py


Модуль concurrent.futures предоставляет высокоуровневый интерфейс для асинхронного выполнения вызываемых объектов.

Асинхронное выполнение может выполняться потоками, используя ThreadPoolExecutor, или отдельными процессами, используя ProcessPoolExecutor. Оба они реализуют один и тот же интерфейс, который определяется абстрактным классом Executor.

Availability: это не Emscripten, это был не я.

Этот модуль не работает или недоступен на платформах WebAssembly wasm32-emscripten и wasm32-wasi. Дополнительную информацию смотрите в разделе Платформы веб-сборки.

Объекты исполнителя

class concurrent.futures.Executor

Абстрактный класс, предоставляющий методы для асинхронного выполнения вызовов. Его следует использовать не напрямую, а через его конкретные подклассы.

submit(fn, /, *args, **kwargs)

Планирует выполнение вызываемого объекта fn как fn(*args, **kwargs) и возвращает объект Future, представляющий выполнение вызываемого объекта.

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())
map(fn, *iterables, timeout=None, chunksize=1)

Аналогично map(fn, *iterables), за исключением:

  • повторяющиеся объекты собираются немедленно, а не лениво;

  • fn выполняется асинхронно, и несколько вызовов fn могут выполняться одновременно.

Возвращаемый итератор генерирует TimeoutError, если вызывается __next__() и результат недоступен по истечении времени ожидания (timeout) секунд с момента первоначального вызова до Executor.map(). Время ожидания (timeout) может быть как int, так и float. Если значение timeout не указано или None, время ожидания не ограничено.

Если вызов fn вызывает исключение, то это исключение будет вызвано при извлечении его значения из итератора.

При использовании ProcessPoolExecutor этот метод разбивает iterables на несколько блоков, которые он отправляет в пул в качестве отдельных задач. (Приблизительный) размер этих блоков можно задать, установив значение chunksize равным целому положительному числу. Для очень длинных итераций использование большого значения для chunksize может значительно повысить производительность по сравнению с размером по умолчанию, равным 1. При использовании ThreadPoolExecutor значение chunksize не имеет эффекта.

Изменено в версии 3.5: Добавлен аргумент chunksize.

shutdown(wait=True, *, cancel_futures=False)

Подайте сигнал исполнителю, что он должен освободить все ресурсы, которые он использует, когда завершится выполнение ожидающих выполнения фьючерсов. Вызовы Executor.submit() и Executor.map(), выполненные после завершения работы, вызовут RuntimeError.

Если значение wait равно True, то этот метод не будет возвращен до тех пор, пока не завершатся все ожидающие выполнения фьючерсы и не будут освобождены ресурсы, связанные с исполнителем. Если значение wait равно False, то этот метод вернется немедленно, и ресурсы, связанные с исполнителем, будут освобождены, когда все ожидающие выполнения фьючерсы будут выполнены. Независимо от значения wait, вся программа на Python не завершит работу, пока не будут выполнены все ожидающие выполнения фьючерсы.

Если значение cancel_futures равно True, этот метод отменит все ожидающие исполнения фьючерсы, которые еще не запущены исполнителем. Любые завершенные или запущенные функции не будут отменены, независимо от значения cancel_futures.

Если значения cancel_futures и wait равны True, все функции, запущенные исполнителем, будут завершены до возврата этого метода. Остальные варианты выполнения отменяются.

Вы можете избежать необходимости вызывать этот метод явно, если используете инструкцию with, которая завершит работу Executor (ожидание, как если бы Executor.shutdown() вызывался с wait, установленным в True):

import shutil
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

Изменено в версии 3.9: Добавлено cancel_futures.

ThreadPoolExecutor - исполнитель потоков

ThreadPoolExecutor - это подкласс Executor, который использует пул потоков для асинхронного выполнения вызовов.

Взаимоблокировки могут возникать, когда вызываемый объект, связанный с Future, ожидает результатов другого Future. Например:

import time
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

И:

def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

Подкласс Executor, который использует пул не более чем из потоков max_workers для асинхронного выполнения вызовов.

Все потоки, поставленные в очередь на ThreadPoolExecutor, будут объединены до того, как интерпретатор сможет завершить работу. Обратите внимание, что обработчик завершения, который выполняет это, выполняется до любых обработчиков завершения, добавленных с помощью atexit. Это означает, что исключения в главном потоке должны быть перехвачены и обработаны, чтобы потоки могли корректно завершить работу. По этой причине рекомендуется, чтобы ThreadPoolExecutor не использовался для длительно выполняющихся задач.

initializer - это необязательный вызываемый объект, который вызывается в начале каждого рабочего потока; initargs - это набор аргументов, передаваемых инициализатору. Если инициализатор вызовет исключение, все ожидающие в данный момент задания вызовут BrokenThreadPool, а также любая попытка отправить больше заданий в пул.

Изменено в версии 3.5: Если значение max_workers равно None или не задано, то по умолчанию будет указано количество процессоров на компьютере, умноженное на 5, при условии, что ThreadPoolExecutor часто используется для перекрытия операций ввода-вывода вместо работы процессора и количество работников должно быть больше, чем количество работников для ProcessPoolExecutor.

Изменено в версии 3.6: Добавлен параметр thread_name_prefix, позволяющий пользователям управлять именами threading.Thread для рабочих потоков, созданных пулом, для упрощения отладки.

Изменено в версии 3.7: Добавлены аргументы initializer и initargs.

Изменено в версии 3.8: Значение по умолчанию max_workers изменено на min(32, os.cpu_count() + 4). Это значение по умолчанию сохраняет не менее 5 рабочих элементов для задач, связанных с вводом-выводом. Для задач, связанных с процессором, которые освобождают GIL, используется не более 32 процессорных ядер. И это позволяет избежать неявного использования очень больших ресурсов на многоядерных машинах.

ThreadPoolExecutor теперь повторно использует незанятые рабочие потоки и перед запуском рабочих потоков max_workers.

Пример ThreadPoolExecutor

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://nonexistant-subdomain.python.org/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor - исполнитель

Класс ProcessPoolExecutor является подклассом Executor, который использует пул процессов для асинхронного выполнения вызовов. ProcessPoolExecutor использует модуль multiprocessing, который позволяет ему обойти Global Interpreter Lock но также означает, что могут быть выполнены и возвращены только объекты с возможностью выбора.

Модуль __main__ должен быть доступен для импорта рабочими подпроцессами. Это означает, что ProcessPoolExecutor не будет работать в интерактивном интерпретаторе.

Вызов методов Executor или Future из вызываемого объекта, переданного в ProcessPoolExecutor, приведет к взаимоблокировке.

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=(), max_tasks_per_child=None)

Подкласс Executor, который выполняет вызовы асинхронно, используя пул не более max_workers процессов. Если значение max_workers равно None или не задано, то по умолчанию будет указано количество процессоров на компьютере. Если значение max_workers меньше или равно 0, то будет поднято значение ValueError. В Windows значение max_workers должно быть меньше или равно 61. Если это не так, то значение ValueError будет увеличено. Если значение max_workers равно None, то по умолчанию будет выбрано не более 61, даже если доступно больше процессоров. mp_context может быть многопроцессорным контекстом или не иметь его. Он будет использоваться для запуска рабочих процессов. Если значение mp_context равно None или не задано, используется многопроцессорный контекст по умолчанию.

initializer - это необязательный вызываемый объект, который вызывается в начале каждого рабочего процесса; initargs - это набор аргументов, передаваемых инициализатору. Если инициализатор вызовет исключение, все ожидающие в данный момент задания вызовут BrokenProcessPool, а также любая попытка отправить больше заданий в пул.

max_tasks_per_child - необязательный аргумент, который определяет максимальное количество задач, которые может выполнить один процесс, прежде чем он завершит работу и будет заменен новым рабочим процессом. По умолчанию значение max_tasks_per_child равно None, что означает, что рабочие процессы будут жить столько, сколько существует пул. Если указано значение max, по умолчанию будет использоваться метод запуска многопроцессорной обработки «spawn» в отсутствие параметра mp_context. Эта функция несовместима с методом запуска «fork».

Изменено в версии 3.3: Когда один из рабочих процессов внезапно завершается, теперь возникает ошибка BrokenProcessPool. Ранее поведение было неопределенным, но операции с исполнителем или его фьючерсами часто зависали или заходили в тупик.

Изменено в версии 3.7: Аргумент mp_context был добавлен, чтобы позволить пользователям управлять start_method для рабочих процессов, созданных пулом.

Добавлены аргументы initializer и initargs.

Изменено в версии 3.11: Был добавлен аргумент max_tasks_per_child, позволяющий пользователям контролировать время жизни работников в пуле.

Пример ProcessPoolExecutor

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

Будущие объекты

Класс Future инкапсулирует асинхронное выполнение вызываемого объекта. Future Экземпляры создаются с помощью Executor.submit().

class concurrent.futures.Future

Инкапсулирует асинхронное выполнение вызываемого объекта. Future экземпляры создаются с помощью Executor.submit() и не должны создаваться напрямую, за исключением тестирования.

cancel()

Попытайтесь отменить вызов. Если вызов выполняется в данный момент или завершен и не может быть отменен, метод вернет значение False, в противном случае вызов будет отменен и метод вернет значение True.

cancelled()

Верните True, если вызов был успешно отменен.

running()

Возвращает True, если вызов выполняется в данный момент и не может быть отменен.

done()

Верните True, если вызов был успешно отменен или завершен.

result(timeout=None)

Возвращает значение, возвращенное вызовом. Если вызов еще не завершен, то этот метод будет ждать до истечения времени ожидания в секундах. Если вызов не завершился в течение времени ожидания в секундах, то будет поднят TimeoutError. timeout может быть значением int или float. Если значение timeout не указано или None, время ожидания не ограничено.

Если будущее отменено до завершения, то значение CancelledError будет увеличено.

Если вызов вызвал исключение, этот метод вызовет такое же исключение.

exception(timeout=None)

Возвращает исключение, вызванное вызовом. Если вызов еще не завершен, то этот метод будет ждать до истечения времени ожидания в секундах. Если вызов не завершился в течение времени ожидания в секундах, то будет выдано значение TimeoutError. timeout может быть значением int или float. Если значение timeout не указано или None, время ожидания не ограничено.

Если будущее отменено до завершения, то значение CancelledError будет увеличено.

Если вызов завершен без повышения, возвращается None.

add_done_callback(fn)

Присоединяет вызываемый fn к future. fn будет вызван с использованием future в качестве единственного аргумента, когда future будет отменен или завершит выполнение.

Добавленные вызываемые объекты вызываются в том порядке, в котором они были добавлены, и всегда вызываются в потоке, принадлежащем процессу, который их добавил. Если вызываемый объект создает подкласс Exception, он регистрируется и игнорируется. Если вызываемый объект создает подкласс BaseException, поведение не определено.

Если будущее уже завершено или отменено, fn будет вызван немедленно.

Следующие методы Future предназначены для использования в модульных тестах и реализациях Executor.

set_running_or_notify_cancel()

Этот метод должен вызываться только реализациями Executor перед выполнением работы, связанной с Future, и модульными тестами.

Если метод возвращает False, то Future был отменен, т.е. был вызван Future.cancel() и возвращен True. Все потоки, ожидающие завершения Future (т.е. через as_completed() или wait()), будут активированы.

Если метод возвращает True, то Future не был отменен и был переведен в состояние выполнения, т.е. вызовы Future.running() будут возвращать True.

Этот метод может быть вызван только один раз и не может быть вызван после вызова Future.set_result() или Future.set_exception().

set_result(result)

Присваивает результату работы, связанной с параметром Future, значение результат.

Этот метод следует использовать только в Executor реализациях и модульных тестах.

Изменено в версии 3.8: Этот метод вызывает concurrent.futures.InvalidStateError, если Future уже выполнено.

set_exception(exception)

Устанавливает результат работы, связанный с Future, в Exception исключение.

Этот метод следует использовать только в Executor реализациях и модульных тестах.

Изменено в версии 3.8: Этот метод вызывает concurrent.futures.InvalidStateError, если Future уже выполнено.

Функции модуля

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

Дождитесь завершения создания экземпляров Future (возможно, созданных разными экземплярами Executor), заданных fs. Повторяющиеся варианты будущего, заданные fs, удаляются и будут возвращены только один раз. Возвращает именованный набор из 2 наборов. Первый набор с именем done содержит объекты, которые завершили работу (завершили или отменили фьючерсы) до завершения ожидания. Второй набор, названный not_done, содержит фьючерсы, которые не завершились (ожидающие или запущенные фьючерсы).

timeout может использоваться для управления максимальным количеством секунд ожидания перед возвратом. timeout может быть значением int или float. Если значение timeout не указано или None, время ожидания не ограничено.

return_when указывает, когда эта функция должна вернуться. Это должна быть одна из следующих констант:

Постоянный

Описание

concurrent.futures.FIRST_COMPLETED

Функция будет возвращена, когда все последующие действия завершатся или будут отменены.

concurrent.futures.FIRST_EXCEPTION

Функция вернет значение, когда любое будущее завершится созданием исключения. Если ни одно будущее не вызовет исключения, то это эквивалентно ALL_COMPLETED.

concurrent.futures.ALL_COMPLETED

Функция вернется, когда все фьючерсы завершатся или будут отменены.

concurrent.futures.as_completed(fs, timeout=None)

Возвращает итератор по экземплярам Future (возможно, созданным разными экземплярами Executor), заданный fs, который выдает фьючерсы по мере их завершения (завершенные или отмененные фьючерсы). Все фьючерсы, заданные с помощью fs, которые дублируются, будут возвращены один раз. Все фьючерсы, которые завершились до вызова as_completed(), будут возвращены первыми. Возвращаемый итератор генерирует TimeoutError, если вызывается __next__() и результат недоступен по истечении времени ожидания (timeout) секунд с момента первоначального вызова до as_completed(). Время ожидания (timeout) может быть int или float. Если значение timeout не указано или None, то время ожидания не ограничено.

См.также

PEP 3148 – фьючерсы - выполняют вычисления асинхронно.

Предложение, в котором описывается эта функция для включения в стандартную библиотеку Python.

Классы исключений

exception concurrent.futures.CancelledError

Возникает при отмене будущего.

exception concurrent.futures.TimeoutError

Устаревший псевдоним TimeoutError, вызываемый, когда будущая операция превышает заданный тайм-аут.

Изменено в версии 3.11: Этому классу был присвоен псевдоним TimeoutError.

exception concurrent.futures.BrokenExecutor

Производный от RuntimeError, этот класс исключений генерируется, когда по какой-либо причине нарушается работа executor, и его нельзя использовать для отправки или выполнения новых задач.

Добавлено в версии 3.7.

exception concurrent.futures.InvalidStateError

Возникает, когда выполняется операция с будущим, которая запрещена в текущем состоянии.

Добавлено в версии 3.8.

exception concurrent.futures.thread.BrokenThreadPool

Производный от BrokenExecutor, этот класс исключений генерируется, когда один из рабочих элементов ThreadPoolExecutor не прошел инициализацию.

Добавлено в версии 3.7.

exception concurrent.futures.process.BrokenProcessPool

Производный от BrokenExecutor (ранее RuntimeError), этот класс исключений генерируется, когда один из рабочих элементов ProcessPoolExecutor завершается некорректным образом (например, если он был удален извне).

Добавлено в версии 3.3.

Вернуться на верх