concurrent.futures — Запуск параллельных задач

Добавлено в версии 3.2.

Исходный код: Lib/concurrent/futures/thread.py и Lib/concurrent/futures/process.py


Модуль concurrent.futures предоставляет высокоуровневый интерфейс для асинхронного выполнения вызываемых таблиц.

Асинхронное выполнение может осуществляться с помощью потоков, используя ThreadPoolExecutor, или отдельных процессов, используя ProcessPoolExecutor. Оба реализуют один и тот же интерфейс, который определяется абстрактным классом Executor.

Объекты-исполнители

class concurrent.futures.Executor

Абстрактный класс, предоставляющий методы для асинхронного выполнения вызовов. Его следует использовать не напрямую, а через его конкретные подклассы.

submit(fn, /, *args, **kwargs)

Планирует выполнение вызываемой переменной, fn, как fn(*args, **kwargs) и возвращает объект Future, представляющий выполнение вызываемой переменной.

with ThreadPoolExecutor(max_workers=1) as executor:
    future = executor.submit(pow, 323, 1235)
    print(future.result())
map(func, *iterables, timeout=None, chunksize=1)

Аналогично map(func, *iterables), за исключением:

  • iterables собираются немедленно, а не лениво;

  • func выполняется асинхронно, и несколько вызовов func могут выполняться одновременно.

Возвращаемый итератор вызывает ошибку concurrent.futures.TimeoutError, если вызывается __next__() и результат не доступен через timeout секунд после первоначального вызова Executor.map(). timeout может быть значением int или float. Если timeout не указан или None, время ожидания не ограничено.

Если вызов func вызывает исключение, то это исключение будет вызвано, когда его значение будет получено из итератора.

При использовании ProcessPoolExecutor этот метод разбивает iterables на несколько фрагментов, которые он отправляет в пул как отдельные задачи. (Приблизительный) размер этих фрагментов можно задать, установив chunksize в целое положительное число. Для очень длинных итераций использование большого значения chunksize может значительно повысить производительность по сравнению с размером по умолчанию 1. При ThreadPoolExecutor chunksize не влияет.

Изменено в версии 3.5: Добавлен аргумент chunksize.

shutdown(wait=True, *, cancel_futures=False)

Сигнализирует исполнителю, что он должен освободить все ресурсы, которые он использует, когда текущие ожидающие фьючерсы закончат выполняться. Вызовы Executor.submit() и Executor.map(), сделанные после выключения, вызовут ошибку RuntimeError.

Если wait имеет значение True, то этот метод не вернется, пока все ожидающие фьючерсы не закончат выполнение и ресурсы, связанные с исполнителем, не будут освобождены. Если wait имеет значение False, то этот метод вернется немедленно, а ресурсы, связанные с исполнителем, будут освобождены после завершения выполнения всех ожидающих фьючерсов. Независимо от значения wait, вся программа Python не завершится, пока не будут выполнены все ожидающие фьючерсы.

Если cancel_futures имеет значение True, этот метод отменит все ожидающие фьючерсы, которые исполнитель не начал выполнять. Любые фьючерсы, которые завершены или запущены, не будут отменены, независимо от значения cancel_futures.

Если и cancel_futures, и wait равны True, все фьючерсы, которые исполнитель начал выполнять, будут завершены до возвращения этого метода. Оставшиеся фьючерсы отменяются.

Вы можете избежать необходимости явного вызова этого метода, если используете оператор with, который выключит Executor (ожидание, как если бы Executor.shutdown() было вызвано с wait, установленным в True):

import shutil
with ThreadPoolExecutor(max_workers=4) as e:
    e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
    e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
    e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
    e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

Изменено в версии 3.9: Добавлено cancel_futures.

ThreadPoolExecutor

ThreadPoolExecutor - это подкласс Executor, который использует пул потоков для асинхронного выполнения вызовов.

Тупики могут возникать, когда вызываемый объект, связанный с Future, ожидает результатов другого Future. Например:

import time
def wait_on_b():
    time.sleep(5)
    print(b.result())  # b will never complete because it is waiting on a.
    return 5

def wait_on_a():
    time.sleep(5)
    print(a.result())  # a will never complete because it is waiting on b.
    return 6


executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)

И:

def wait_on_future():
    f = executor.submit(pow, 5, 2)
    # This will never complete because there is only one worker thread and
    # it is executing this function.
    print(f.result())

executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
class concurrent.futures.ThreadPoolExecutor(max_workers=None, thread_name_prefix='', initializer=None, initargs=())

Подкласс Executor, который использует пул из максимум max_workers потоков для асинхронного выполнения вызовов.

Все потоки, выстроенные в очередь на ThreadPoolExecutor, будут объединены до того, как интерпретатор сможет выйти. Обратите внимание, что обработчик выхода, который это делает, выполняется перед любыми обработчиками выхода, добавленными с помощью atexit. Это означает, что исключения в главном потоке должны быть пойманы и обработаны, чтобы дать сигнал потокам на изящный выход. По этой причине рекомендуется не использовать ThreadPoolExecutor для долго выполняющихся задач.

initializer - это необязательный вызываемый модуль, который вызывается в начале каждого рабочего потока; initargs - это кортеж аргументов, передаваемых инициализатору. Если initializer вызовет исключение, все текущие задания, ожидающие выполнения, вызовут сообщение BrokenThreadPool, а также все попытки отправить в пул дополнительные задания.

Изменено в версии 3.5: Если max_workers равно None или не задано, то по умолчанию оно будет равно количеству процессоров на машине, умноженному на 5, предполагая, что ThreadPoolExecutor часто используется для перекрытия ввода/вывода вместо работы процессора, и количество рабочих должно быть больше, чем количество рабочих для ProcessPoolExecutor.

Добавлено в версии 3.6: Аргумент thread_name_prefix был добавлен, чтобы позволить пользователям контролировать :class:`threading.Thread`имена рабочих потоков, создаваемых пулом, для облегчения отладки.

Изменено в версии 3.7: Добавлены аргументы initializer и initargs.

Изменено в версии 3.8: Значение по умолчанию max_workers изменено на min(32, os.cpu_count() + 4). Это значение по умолчанию сохраняет не менее 5 рабочих для задач, связанных с вводом/выводом. Оно использует не более 32 ядер процессора для задач, связанных с процессором, которые освобождают GIL. И оно позволяет избежать неявного использования очень больших ресурсов на многоядерных машинах.

ThreadPoolExecutor теперь повторно использует простаивающие рабочие потоки перед запуском рабочих потоков max_workers.

Пример ThreadPoolExecutor

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor

Класс ProcessPoolExecutor является подклассом Executor, который использует пул процессов для асинхронного выполнения вызовов. ProcessPoolExecutor использует модуль multiprocessing, что позволяет ему обойти Global Interpreter Lock, но также означает, что только picklable объекты могут быть выполнены и возвращены.

Модуль __main__ должен быть импортируемым рабочими подпроцессами. Это означает, что ProcessPoolExecutor не будет работать в интерактивном интерпретаторе.

Вызов методов Executor или Future из вызываемого объекта, переданного в ProcessPoolExecutor, приведет к тупику.

class concurrent.futures.ProcessPoolExecutor(max_workers=None, mp_context=None, initializer=None, initargs=())

Подкласс Executor, который выполняет вызовы асинхронно, используя пул из не более чем max_workers процессов. Если max_workers равно None или не задано, то по умолчанию будет задано количество процессоров на машине. Если max_workers меньше или равно 0, то будет выдано предупреждение ValueError. В Windows, max_workers должно быть меньше или равно 61. Если это не так, то будет выдано сообщение ValueError. Если max_workers равно None, то по умолчанию будет выбрано не более 61, даже если доступно больше процессоров. mp_context может быть многопроцессорным контекстом или None. Он будет использоваться для запуска рабочих. Если mp_context равен None или не указан, то используется многопроцессорный контекст по умолчанию.

initializer - это необязательный вызываемый модуль, который вызывается в начале каждого рабочего процесса; initargs - это кортеж аргументов, передаваемых инициализатору. Если initializer вызовет исключение, то все текущие задания, ожидающие выполнения, вызовут сообщение BrokenProcessPool, а также все попытки отправить дополнительные задания в пул.

Изменено в версии 3.3: При внезапном завершении одного из рабочих процессов теперь выдается ошибка BrokenProcessPool. Ранее поведение было неопределенным, но операции над исполнителем или его фьючерсами часто зависали или заходили в тупик.

Изменено в версии 3.7: Аргумент mp_context был добавлен, чтобы позволить пользователям контролировать метод start_method для рабочих процессов, созданных пулом.

Добавлены аргументы initializer и initargs.

Пример ProcessPoolExecutor

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

Объекты будущего

Класс Future инкапсулирует асинхронное выполнение вызываемого объекта. Экземпляры Future создаются с помощью Executor.submit().

class concurrent.futures.Future

Инкапсулирует асинхронное выполнение вызываемого объекта. Экземпляры Future создаются Executor.submit() и не должны создаваться напрямую, кроме как для тестирования.

cancel()

Попытка отменить вызов. Если вызов в данный момент выполняется или завершен и не может быть отменен, то метод вернет False, в противном случае вызов будет отменен и метод вернет True.

cancelled()

Верните True, если вызов был успешно отменен.

running()

Верните True, если вызов в данный момент выполняется и не может быть отменен.

done()

Верните True, если вызов был успешно отменен или завершен.

result(timeout=None)

Возвращает значение, возвращенное вызовом. Если вызов еще не завершен, то этот метод будет ждать до timeout секунд. Если вызов не завершился за timeout секунд, то будет выдано сообщение concurrent.futures.TimeoutError. timeout может быть значением int или float. Если timeout не указан или None, то время ожидания не ограничено.

Если будущее отменяется до завершения, то будет выдано сообщение CancelledError.

Если вызов вызвал исключение, этот метод вызовет такое же исключение.

exception(timeout=None)

Возвращает исключение, вызванное вызовом. Если вызов еще не завершен, то этот метод будет ждать до timeout секунд. Если вызов не завершился за timeout секунд, то будет выдано concurrent.futures.TimeoutError. timeout может быть значением int или float. Если timeout не указан или None, то время ожидания не ограничено.

Если будущее отменяется до завершения, то будет выдано сообщение CancelledError.

Если вызов завершился без повышения, возвращается None.

add_done_callback(fn)

Присоединяет вызываемую функцию fn к будущему. fn будет вызвана, с будущим в качестве единственного аргумента, когда будущее будет отменено или завершит выполнение.

Добавленные вызываемые файлы вызываются в том порядке, в котором они были добавлены, и всегда вызываются в потоке, принадлежащем процессу, который их добавил. Если вызываемая переменная вызывает подкласс Exception, она будет зарегистрирована и проигнорирована. Если вызываемая переменная вызывает подкласс BaseException, поведение не определено.

Если будущее уже завершилось или было отменено, немедленно вызывается fn.

Следующие Future методы предназначены для использования в модульных тестах и Executor реализациях.

set_running_or_notify_cancel()

Этот метод должен вызываться только реализациями Executor перед выполнением работы, связанной с Future, и модульными тестами.

Если метод возвращает False, то Future был отменен, т.е. был вызван Future.cancel(), который вернул True. Все потоки, ожидающие завершения Future (т.е. через as_completed() или wait()), будут разбужены.

Если метод возвращает True, то Future не был отменен и был переведен в состояние выполнения, т.е. вызовы Future.running() будут возвращать True.

Этот метод может быть вызван только один раз и не может быть вызван после вызова Future.set_result() или Future.set_exception().

set_result(result)

Устанавливает результат работы, связанной с Future, в результат.

Этот метод должен использоваться только в реализациях Executor и модульных тестах.

Изменено в версии 3.8: Этот метод поднимает concurrent.futures.InvalidStateError, если Future уже выполнен.

set_exception(exception)

Устанавливает результат работы, связанный с Future, на Exception исключение.

Этот метод должен использоваться только в реализациях Executor и модульных тестах.

Изменено в версии 3.8: Этот метод поднимает concurrent.futures.InvalidStateError, если Future уже выполнен.

Функции модуля

concurrent.futures.wait(fs, timeout=None, return_when=ALL_COMPLETED)

Дождаться завершения работы экземпляров Future (возможно, созданных разными экземплярами Executor), переданных fs. Дубликаты фьючерсов, переданных fs, удаляются и будут возвращены только один раз. Возвращает именованный 2-кортеж множеств. Первый набор, с именем done, содержит фьючерсы, которые завершились (завершенные или отмененные фьючерсы) до завершения ожидания. Второй набор, с именем not_done, содержит фьючерсы, которые не завершились (ожидающие или выполняющиеся фьючерсы).

timeout можно использовать для управления максимальным количеством секунд ожидания перед возвратом. timeout может быть значением int или float. Если timeout не указан или None, время ожидания не ограничено.

return_when указывает, когда эта функция должна вернуться. Это должна быть одна из следующих констант:

Постоянная

Описание

FIRST_COMPLETED

Функция возвращается, когда любое будущее завершается или отменяется.

FIRST_EXCEPTION

Функция вернется, когда любое будущее завершится, вызвав исключение. Если ни одно будущее не вызывает исключения, то это эквивалентно ALL_COMPLETED.

ALL_COMPLETED

Функция вернется, когда все фьючерсы завершатся или будут отменены.

concurrent.futures.as_completed(fs, timeout=None)

Возвращает итератор по экземплярам Future (возможно, созданным разными экземплярами Executor), заданным fs, который возвращает фьючерсы по мере их завершения (завершенные или отмененные фьючерсы). Любые фьючерсы, переданные fs, которые дублируются, будут возвращены один раз. Фьючерсы, которые завершились до вызова as_completed(), будут возвращены первыми. Возвращенный итератор вызывает ошибку concurrent.futures.TimeoutError, если вызван __next__() и результат не доступен через timeout секунд после первоначального вызова as_completed(). timeout может быть значением int или float. Если timeout не указан или None, время ожидания не ограничено.

См.также

PEP 3148 – фьючерсы - выполняют вычисления асинхронно

Предложение, описывающее эту возможность для включения в стандартную библиотеку Python.

Классы исключений

exception concurrent.futures.CancelledError

Возникает, когда будущее отменяется.

exception concurrent.futures.TimeoutError

Возникает, когда будущая операция превышает заданный тайм-аут.

exception concurrent.futures.BrokenExecutor

Производный от RuntimeError, этот класс исключений возникает, когда исполнитель по какой-то причине сломан и не может быть использован для отправки или выполнения новых задач.

Добавлено в версии 3.7.

exception concurrent.futures.InvalidStateError

Возникает, когда над будущим выполняется операция, которая недопустима в текущем состоянии.

Добавлено в версии 3.8.

exception concurrent.futures.thread.BrokenThreadPool

Производный от BrokenExecutor, этот класс исключений возникает, когда один из рабочих ThreadPoolExecutor не смог инициализироваться.

Добавлено в версии 3.7.

exception concurrent.futures.process.BrokenProcessPool

Производный от BrokenExecutor (ранее RuntimeError), этот класс исключений возникает, когда один из рабочих ProcessPoolExecutor завершается нечистым образом (например, если он был убит извне).

Добавлено в версии 3.3.

Вернуться на верх