ProcessPoolExecutor в Python: полное руководство

Исполнитель ProcessPoolExecutor позволяет создавать и управлять пулами процессов в Python.

Хотя ProcessPoolExecutor доступен с Python 3.2, он не получил широкого распространения, возможно, из-за непонимания возможностей и ограничений процессов и потоков в Python.

Это руководство содержит подробный и всесторонний обзор ProcessPoolExecutor в Python, включая то, как он работает, как его использовать, общие вопросы и лучшие практики.

Давайте погрузимся внутрь.

Оглавление

Процессы Python и необходимость в пулах процессов

Итак, что такое процессы и почему мы заботимся о пулах процессов?

Что такое процессы Python

Процесс относится к компьютерной программе.

Каждая программа Python представляет собой процесс и имеет один поток, называемый главный поток, используемый для выполнения инструкций вашей программы. Каждый процесс, по сути, является одним экземпляром интерпретатора Python, который выполняет инструкции Python (байт-код Python), что является немного более низким уровнем, чем код, который вы вводите в свою программу Python.

Иногда нам может понадобиться создать новые процессы для одновременного выполнения дополнительных задач.

Python предоставляет реальные процессы системного уровня через класс Process в модуле multiprocessing.

Базовая операционная система контролирует, как создаются новые процессы. В некоторых системах это может потребовать порождения нового процесса, а в других - создания развилки процесса. Метод создания новых процессов в Python, специфичный для операционной системы, - это не то, о чем нам нужно беспокоиться, поскольку им управляет установленный вами интерпретатор Python.

Задача может быть запущена в новом процессе путем создания экземпляра класса Process и указания функции для запуска в новом процессе через аргумент "target".


...
# define a task to run in a new process
p = Process(target=task)

После создания процесса его необходимо запустить, вызвав функцию start().


...
# start the task in a new process
p.start()

Затем мы можем подождать завершения задачи, присоединившись к процессу; например:


...
# wait for the task to complete
p.join()

Когда мы создаем новые процессы, мы должны защитить точку входа программы.


# entry point for the program
if __name__ == '__main__':
# do things...

Ниже приведен полный пример создания Process для запуска функции специальной задачи.


# SuperFastPython.com
# example of running a function in a new process
from multiprocessing import Process
 
# a task to execute in another process
def task():
    print('This is another process', flush=True)
 
# entry point for the program
if __name__ == '__main__':
    # define a task to run in a new process
    p = Process(target=task)
    # start the task in a new process
    p.start()
    # wait for the task to complete
    p.join()

Это полезно для выполнения разовых специальных задач в отдельном процессе, хотя это становится громоздким, когда у вас много задач для выполнения.

Каждый создаваемый процесс требует применения ресурсов (например, экземпляр интерпретатора Python и память для стекового пространства главного потока процесса). Вычислительные затраты на создание процессов могут стать дорогостоящими, если мы будем создавать и уничтожать множество процессов снова и снова для специальных задач.

Вместо этого, мы предпочли бы сохранить рабочие процессы для повторного использования, если мы ожидаем выполнения множества специальных задач в нашей программе.

Этого можно достичь, используя пул процессов.

Что такое пул процессов

Пул процессов - это шаблон программирования для автоматического управления пулом рабочих процессов.

Пул отвечает за фиксированное количество процессов.

Пул может предоставлять общий интерфейс для выполнения специальных задач с переменным количеством аргументов, подобно свойству target объекта Process, но не требует выбора процесса для выполнения задачи, запуска процесса или ожидания завершения задачи.

Python предоставляет пул процессов через класс ProcessPoolExecutor.

ProcessPoolExecutor для пулов процессов в Python

Python-класс ProcessPoolExecutor используется для создания и управления пулами процессов и предоставляется в модуле concurrent.futures.

Модуль concurrent.futures был представлен в Python 3.2, написанном Брайаном Куинланом, и предоставляет как пулы потоков, так и пулы процессов, хотя в этом руководстве мы сосредоточим свое внимание на пулах процессов.

Если вам интересно, вы можете получить доступ к исходному коду класса ProcessPoolExecutor непосредственно через process.py. Может быть интересно покопаться в том, как класс работает внутри, возможно, после того, как вы ознакомитесь с тем, как он работает снаружи.

Класс ProcessPoolExecutor расширяет класс Executor и при вызове будет возвращать объекты Future.

Давайте подробнее рассмотрим экзекуторы, фьючерсы и жизненный цикл использования класса ProcessPoolExecutor.

Что такое экзекуторы

Класс ProcessPoolExecutor расширяет абстрактный класс Executor.

Класс Executor определяет три метода, используемые для управления нашим пулом процессов; это: submit(), map() и shutdown().

Executor запускается при создании класса и должен быть выключен явным образом вызовом shutdown(), который освободит все ресурсы, удерживаемые Executor. Мы также можем завершить работу автоматически, но это мы рассмотрим чуть позже.

Функции submit() и map() используются для отправки заданий в Executor для асинхронного выполнения.

Функция map() работает так же, как и встроенная функция map(), и используется для применения функции к каждому элементу итерируемого объекта, например, списка. В отличие от встроенной функции map(), каждое применение функции к элементу будет происходить асинхронно, а не последовательно.

Функция submit() принимает функцию, а также любые аргументы и выполняет ее асинхронно, хотя вызов возвращается немедленно и предоставляет объект Future.

Сейчас мы подробнее рассмотрим каждую из этих трех функций. Во-первых, что такое фьючерсы?

Что такое фьючерсы

А future - это объект, представляющий отложенный результат для асинхронной задачи.

Иногда его также называют обещанием или задержкой. Он обеспечивает контекст для результата задачи, которая может выполняться или не выполняться, и способ получения результата, как только он станет доступен.

В Python объект Future возвращается из Executor, например ProcessPoolExecutor, при вызове функции submit() для отправки задачи на асинхронное выполнение.

В общем, мы не создаем объекты Future; мы только получаем их, и нам может понадобиться вызвать функции на них.

Для каждой задачи, отправленной в ProcessPoolExecutor через вызов submit().

всегда существует один объект Future.

Объект Future предоставляет ряд полезных функций для проверки состояния задачи, таких как: cancelled(), running() и done(), чтобы определить, была ли задача отменена, выполняется ли она в настоящее время или завершила выполнение.

Выполняемая задача не может быть отменена, а выполненная задача могла быть отменена.

Объект Future также предоставляет доступ к результату выполнения задачи через функцию result(). Если во время выполнения задачи было вызвано исключение, оно будет повторно вызвано при вызове функции result(), или доступ к нему можно получить через функцию exception().

В функциях result() и exception() в качестве аргумента может быть указан таймаут - количество секунд ожидания возвращаемого значения, если задача еще не выполнена. Если таймаут истечет, то будет вызвана ошибка TimeoutError.

Наконец, мы можем захотеть, чтобы пул процессов автоматически вызывал функцию после завершения задачи.

Этого можно достичь, присоединив обратный вызов к объекту Future для задачи с помощью функции add_done_callback().

Мы можем добавить более одного обратного вызова к каждой задаче, и они будут выполняться в том порядке, в котором были добавлены. Если задача уже завершилась до того, как мы добавили обратный вызов, то обратный вызов будет выполнен немедленно.

Любые исключения, возникающие в функции обратного вызова, не влияют на пул задач или процессов.

Мы подробнее рассмотрим объект Future в одном из последующих разделов.

Теперь, когда мы знакомы с функциональностью ProcessPoolExecutor, предоставляемой классом Executor и объектами Future, возвращаемыми вызовом submit(), давайте более подробно рассмотрим жизненный цикл класса ProcessPoolExecutor.

Жизненный цикл исполнителя ProcessPoolExecutor

Исполнители ProcessPoolExecutor предоставляют пул общих рабочих процессов.

Исполнитель ProcessPoolExecutor был разработан, чтобы быть простым и понятным в использовании.

Если бы мультипроцессинг был подобен коробке передач для переключения передач в автомобиле, то использование multiprocessing.Process является механической коробкой передач (например, трудно изучаемой. например, трудно изучить и использовать), тогда как concurrency.futures.ProcessPoolExecutor является автоматической коробкой передач (например, легко изучить и использовать).

В жизненном цикле использования класса ProcessPoolExecutor есть четыре основных этапа: создание, отправка, ожидание и завершение работы.

Следующий рисунок помогает представить жизненный цикл класса ProcessPoolExecutor.

ProcessPoolExecutor Life-Cycle

Давайте рассмотрим каждый этап жизненного цикла по очереди.

Шаг 1. Создайте пул процессов

Сначала необходимо создать экземпляр ProcessPoolExecutor.

Когда создается экземпляр ProcessPoolExecutor, он должен быть настроен на фиксированное количество процессов в пуле, метод, используемый для создания новых процессов (например, spawn или fork), и имя функции для вызова при инициализации каждого процесса вместе с любыми аргументами для этой функции.

Пул создается с одним процессом для каждого ЦП в вашей системе. Это подходит для большинства целей.

Например, если у вас 4 процессора, каждый из которых имеет гиперпоточность (большинство современных процессоров имеют такую возможность), то Python будет видеть 8 процессоров и по умолчанию выделит в пул 6 процессов.


...
# create a process pool with the default number of worker processes
executor = ProcessPoolExecutor()

Хорошо бы протестировать ваше приложение, чтобы определить количество процессов, которое приводит к наилучшей производительности.

Например, для некоторых задач с интенсивными вычислениями вы можете достичь наилучшей производительности, установив количество процессов равным количеству физических ядер процессора (до гиперпоточности), а не логическому количеству ядер процессора (после гиперпоточности).

Мы обсудим настройку количества процессов для вашего пула позже.

Вы можете указать количество процессов для создания пула через аргумент max_workers; например:


...
# create a process pool with 4 workers
executor = ProcessPoolExecutor(max_workers=4)

Вспомните, всякий раз, когда мы используем процессы, мы должны защитить точку входа программы.

Этого можно достичь с помощью if-выражения; например:


...
# entry point of the program
if __name__ == '__main__':
# create a process pool with 4 workers
executor = ProcessPoolExecutor(max_workers=4)

Шаг 2. Отправить задания в пул процессов

После создания пула процессов можно отправлять задания для асинхронного выполнения.

Как уже говорилось, существует два основных подхода к представлению задач, определенных на родительском классе Executor. Это: map() и submit().

Шаг 2a. Отправить задачи с помощью map()

Функция map() представляет собой асинхронную версию встроенной функции map() для применения функции к каждому элементу итерабельной системы, например, списка.

Вы можете вызвать функцию map() на пуле и передать ей имя вашей функции и итерабель.

Вы, скорее всего, используете map() при преобразовании цикла for для запуска с использованием одного процесса на итерацию цикла.


...
# perform all tasks in parallel
results = executor.map(my_task, my_items) # does not block

Где "my_task" - это ваша функция, которую вы хотите выполнить, а "my_items - это итерация объектов, каждый из которых будет выполнен вашей функцией "my_task".

Задания будут поставлены в очередь в пуле процессов и выполняться рабочими процессами в пуле по мере их поступления.

Функция map() сразу же вернет итерабельность. Эта итерабель может быть использована для доступа к результатам из функции целевой задачи, поскольку они доступны в том порядке, в котором задачи были представлены (например, в порядке предоставленной вами итерабельности).


...
# iterate over results as they become available
for result in executor.map(my_task, my_items):
print(result)

Вы также можете установить таймаут при вызове map() через аргумент "timeout" в секундах, если вы хотите установить ограничение на то, как долго вы готовы ждать завершения каждой задачи в процессе итерации, после чего будет выдана ошибка TimeOut.


...
# iterate over results as they become available
for result in executor.map(my_task, my_items, timeout=5):
# wait for task to complete or timeout expires
print(result)

Шаг 2b. Отправить задачи с помощью submit()

Функция submit() отправляет одну задачу в пул процессов для выполнения.

Функция принимает имя вызываемой функции и все аргументы функции, затем сразу же возвращает объект Future.

Объект Future представляет собой обещание вернуть результаты выполнения задачи (если таковые имеются) и предоставляет способ определить, была ли конкретная задача выполнена или нет.


...
# submit a task with arguments and get a future object
future = executor.submit(my_task, arg1, arg2) # does not block

Где "my_task" - функция, которую вы хотите выполнить, а "arg1" и "arg2" - первый и второй аргументы для передачи функции "my_task"

Вы можете использовать функцию submit() для отправки заданий, которые не принимают никаких аргументов; например:


...
# submit a task with no arguments and get a future object
future = executor.submit(my_task) # does not block

Вы можете получить доступ к результату задачи через функцию result() на возвращаемом объекте Future. Этот вызов будет блокироваться до завершения задачи.


...
# get the result from a future
result = future.result() # blocks

Вы также можете установить таймаут при вызове result() через аргумент "timeout" в секундах, если вы хотите установить ограничение на то, как долго вы готовы ждать завершения каждой задачи, после чего будет выдана ошибка TimeOut.


...
# wait for task to complete or timeout expires
result = future.result(timeout=5) # blocks

Шаг 3. Дождитесь завершения заданий (необязательно)

Модуль concurrent.futures предоставляет две служебные функции для ожидания задач через их объекты Future.

Напомните, что объекты Future создаются только тогда, когда мы вызываем submit(), чтобы поместить задачи в пул процессов.

Эти функции ожидания использовать необязательно, так как вы можете ждать результатов непосредственно после вызова map() или submit() или ждать завершения всех задач в пуле процессов.

Эти две функции модуля - wait() для ожидания завершения объектов Future и as_completed() для получения объектов Future по мере завершения их задач.

Вы можете использовать обе функции с объектами Future, созданными одним или несколькими пулами процессов; они не являются специфичными для какого-либо конкретного пула процессов в вашем приложении. Это полезно, если вы хотите выполнять операции ожидания в нескольких пулах процессов, которые выполняют различные типы задач.

Обе функции полезно использовать с идиомой отправки нескольких задач в пул процессов через submit в сжатии списка; например:


...
# dispatch tasks into the process pool and create a list of futures
futures = [executor.submit(my_task, my_data) for my_data in my_datalist]

Здесь my_task - это наша пользовательская функция целевой задачи, "my_data" - это один элемент данных, переданный в качестве аргумента "my_task", а "my_datalist" - это наш источник объектов my_data.

Затем мы можем передать "futures" в wait() или as_completed().

Создание списка фьючерсов таким образом не является обязательным, это обычная схема при преобразовании циклов for в задачи, передаваемые в пул процессов.

Шаг 3a. Дождаться завершения фьючерсов

Функция wait() может принимать один или несколько фьючерсов и вернется, когда произойдет заданное действие, например, завершение всех заданий, завершение одного задания или исключение одного задания.

Функция вернет один набор объектов будущего, которые соответствуют условию, заданному через "return_when". Второй набор будет содержать все фьючерсы для задач, которые не удовлетворяют условию. Они называются наборами фьючерсов "done" и "not_done".

Это полезно для ожидания большой партии работы и прекращения ожидания при получении первого результата.

Этого можно достичь с помощью константы FIRST_COMPLETED, передаваемой в аргументе "return_when".


...
# wait until we get the first result
done, not_done = wait(futures, return_when=FIRST_COMPLETED)

Альтернативно, мы можем ждать завершения всех задач через константу ALL_COMPLETED.

Это может быть полезно, если вы используете submit() для отправки задач и ищете простой способ дождаться завершения всей работы.


...
# wait for all tasks to complete
done, not_done = wait(futures, return_when=ALL_COMPLETED)

Также есть возможность дождаться первого исключения через константу FIRST_EXCEPTION.


...
# wait for the first exception
done, not_done = wait(futures, return_when=FIRST_EXCEPTION)

Шаг 3b. Дождаться фьючерсов как завершенного

Прелесть параллельного выполнения задач в том, что мы можем получать результаты по мере их поступления, а не ждать завершения всех задач.

Функция as_completed() будет возвращать Future объекты для задач по мере их завершения в пуле процессов.

Мы можем вызвать функцию и предоставить ей список объектов Future, созданных вызовом submit(), и она будет возвращать объекты Future по мере их завершения в любом порядке.

Обычно функция as_completed() используется в цикле над списком Futures, созданным при вызове submit(); например:


...
# iterate over all submitted tasks and get results as they are available
for future in as_completed(futures):
# get the result for the next completed task
result = future.result() # blocks

Примечание: это отличается от итерации по результатам вызова map() двумя способами.

Во-первых, map() возвращает итератор по объектам, а не по Futures.

Во-вторых, map() возвращает результаты в порядке подачи заданий, а не в порядке их выполнения.

Шаг 4. Выключите пул процессов

После завершения всех задач мы можем закрыть пул процессов, что освободит каждый процесс и все ресурсы, которые он может иметь.

Например, каждый процесс запускает экземпляр интерпретатора Python и по крайней мере один поток (главный поток), который имеет собственное стековое пространство.


...
# shutdown the process pool
executor.shutdown() # blocks

Функция shutdown() по умолчанию будет ждать завершения всех задач в пуле процессов перед возвратом.

Это поведение можно изменить, установив аргумент "wait" в False при вызове shutdown(), в этом случае функция вернется немедленно. Ресурсы, используемые пулом процессов, не будут освобождены, пока все текущие и поставленные в очередь задачи не будут завершены.


...
# shutdown the process pool
executor.shutdown(wait=False) # does not blocks

Мы также можем дать указание пулу отменить все поставленные в очередь задачи, чтобы предотвратить их выполнение. Этого можно добиться, установив аргумент "cancel_futures" в значение True. По умолчанию задачи, поставленные в очередь, не отменяются при вызове shutdown().


...
# cancel all queued tasks
executor.shutdown(cancel_futures=True) # blocks

Если мы забудем закрыть пул процессов, пул процессов будет закрыт автоматически при выходе из главного потока. Если мы забудем закрыть пул, а задачи все еще будут выполняться, главный процесс не завершится, пока не будут выполнены все задачи в пуле и все задачи в очереди.

ProcessPoolExecutor Context Manager

Предпочтительным способом работы с классом ProcessPoolExecutor является использование менеджера контекста.

Это соответствует предпочтительному способу работы с другими ресурсами, такими как файлы и сокеты.

Использование ProcessPoolExecutor с контекстным менеджером предполагает использование ключевого слова "with" для создания блока, в котором вы можете использовать пул процессов для выполнения задач и получения результатов.

После завершения блока пул процессов автоматически выключается. Внутренне менеджер контекста вызовет функцию shutdown() с аргументами по умолчанию, ожидая завершения всех поставленных в очередь и выполняющихся задач, прежде чем вернуться и продолжить работу.

Ниже приведен фрагмент кода, демонстрирующий создание пула процессов с помощью менеджера контекста.


...
# create a process pool
with ProcessPoolExecutor(max_workers=10) as pool:
# submit tasks and get results
# ...
# automatically shutdown the process pool...
# the pool is shutdown at this point

Это очень удобная идиома, если вы преобразуете цикл for для асинхронного выполнения.

Это менее полезно, если вы хотите, чтобы пул процессов работал в фоновом режиме, пока вы выполняете другую работу в основном потоке вашей программы, или если вы хотите повторно использовать пул процессов несколько раз в течение вашей программы.

Теперь, когда мы знакомы с тем, как использовать ProcessoolExecutor, давайте рассмотрим некоторые рабочие примеры.

Пример исполнителя ProcessPoolExecutor

В этом разделе мы рассмотрим более полный пример использования ProcessPoolExecutor.

Рассмотрим ситуацию, когда мы можем захотеть проверить, известно ли слово программе или нет, например, есть ли оно в словаре известных слов.

Если слово известно, это прекрасно, но если нет, мы, возможно, захотим принять меры для пользователя, возможно, подчеркнуть его при чтении, как автоматическая проверка орфографии.

Одним из подходов к реализации этой функции может быть загрузка словаря известных слов и создание хэша каждого слова. Затем мы можем хэшировать новые слова и проверять, существуют ли они в наборе известных хэшированных слов или нет.

Это хорошая проблема для изучения с помощью ProcessPoolExecutor, поскольку хэширование слов может быть относительно медленным, особенно для больших словарей с сотнями тысяч или миллионами известных слов.

Сначала разработаем серийную (неконкурентную) версию программы.

Хэш словарь слов серийно

Первым шагом является выбор словаря слов для использования.

В системах Unix, таких как MacOS и Linux, уже установлен словарь, который называется Unix Words.

Он расположен в одном из следующих мест:

В моей системе он находится в папке '/usr/share/dict/words' и содержит 235 886 слов, рассчитанных с помощью команды:


cat /usr/share/dict/words | wc -l

Мы можем использовать этот словарь слов.

В качестве альтернативы, если мы работаем под Windows или хотим иметь больший словарь, мы можем загрузить один из многих бесплатных списков слов в Интернете.

Например, вы можете скачать список одного миллиона английских слов отсюда:

Скачайте этот файл и сохраните его в текущем рабочем каталоге с именем "1m_words.txt".

Заглянув в файл, мы видим, что действительно имеем длинный список слов, по одному на строку.


aaccf
aalders
aaren
aarika
aaron
aartjan
aasen
ab
abacus
abadines
abagael
abagail
abahri
abasolo
abazari
...

Сначала нужно загрузить в память список слов.

Этого можно достичь, сначала открыв файл, затем вызвав функцию readlines(), которая автоматически прочитает ASCII строки текста в список.

Приведенная ниже функция load_words() принимает путь к текстовому файлу и возвращает список слов, загруженных из файла.


# load a file of words
def load_words(path):
    # open the file
    with open(path, encoding='utf-8') as file:
        # read all data as lines
        return file.readlines()

Далее нам нужно хэшировать каждое слово.

В этом примере мы намеренно выберем медленную хэш-функцию, а именно алгоритм SHA512.

Это доступно в Python через hashlib.ha512() функцию.

Сначала мы можем создать экземпляр объекта хэширования, вызвав функцию sha512().


...
# create the hash object
hash_object = sha512()

Далее мы можем преобразовать данное слово в байты и затем хэшировать его с помощью хэш-функции.


...
# convert the string to bytes
byte_data = word.encode('utf-8')
# hash the word
hash_object.update(byte_data)

Наконец, мы можем получить строковое представление хэша для слова в формате HEX, вызвав функцию hexdigest()..


...
# get the hex hash of the word
h = hash_object.hexdigest()

Связывая это вместе, функция hash_word() ниже принимает слово и возвращает HEX хэш-код этого слова.


# hash one word using the SHA algorithm
def hash_word(word):
    # create the hash object
    hash_object = sha512()
    # convert the string to bytes
    byte_data = word.encode('utf-8')
    # hash the word
    hash_object.update(byte_data)
    # get the hex hash of the word
    return hash_object.hexdigest()

Вот и все.

Мы можем определить функцию, которая будет управлять программой, сначала загружая список слов вызовом нашей load_words(), затем создавая набор хэшей известных слов вызовом нашей hash_word() для каждого загруженного слова.

Приведенная ниже функция main() реализует this.


# entry point
def main():
    # load a file of words
    path = '1m_words.txt'
    words = load_words(path)
    print(f'Loaded {len(words)} words from {path}')
    # hash all known words
    known_words = {hash_word(word) for word in words}
    print(f'Done, with {len(known_words)} hashes')

Ниже приведен полный пример загрузки словаря слов и создания набора известных хэшей слов.


# SuperFastPython.com
# example of hashing a word list serially
from hashlib import sha512
 
# hash one word using the SHA algorithm
def hash_word(word):
    # create the hash object
    hash_object = sha512()
    # convert the string to bytes
    byte_data = word.encode('utf-8')
    # hash the word
    hash_object.update(byte_data)
    # get the hex hash of the word
    return hash_object.hexdigest()
 
# load a file of words
def load_words(path):
    # open the file
    with open(path, encoding='utf-8') as file:
        # read all data as lines
        return file.readlines()
 
# entry point
def main():
    # load a file of words
    path = '1m_words.txt'
    words = load_words(path)
    print(f'Loaded {len(words)} words from {path}')
    # hash all known words
    known_words = {hash_word(word) for word in words}
    print(f'Done, with {len(known_words)} hashes')
 
if __name__ == '__main__':
    main()

Запуск примера сначала загружает файл и сообщает, что всего было загружено 1,049,938 слов.

Затем список слов хэшируется, и хэши сохраняются в наборе.

Программа сообщает, что всего было сохранено 979 250 хэшей, что предполагает тысячи дубликатов в словаре.

Для выполнения программы на современной системе требуется около 1,4 секунды.

Как долго пример выполняется на вашей системе?
Дайте мне знать в комментариях ниже.


Loaded 1049938 words from 1m_words.txt
Done, with 979250 hashes

Далее мы можем обновить программу, чтобы хэшировать слова одновременно.

Хэширование словаря слов одновременно с map()

Хеширование слов происходит относительно медленно, но даже в этом случае хеширование почти миллиона слов занимает менее двух секунд.

Тем не менее, мы можем ускорить процесс, используя все процессоры в системе и хэшируя слова одновременно.

Этого можно достичь, используя ProcessPoolExecutor.

Сначала мы можем создать пул процессов и указать количество одновременно выполняемых процессов. Я рекомендую настроить пул в соответствии с количеством физических ядер процессора в вашей системе.

У меня четыре ядра, поэтому пример будет использовать четыре ядра, но обновите его для того количества ядер, которое доступно вам.


...
# create the process pool
with ProcessPoolExecutor(4) as executor:
# ...

Далее нам нужно отправить задания в пул процессов, то есть хеширование каждого слова.

Поскольку задача заключается в простом применении функции для каждого элемента списка, мы можем использовать функцию map() напрямую.

Например:


...
# create a set of word hashes
known_words = set(executor.map(hash_word, words))

И это все.

Например, ниже приведена обновленная версия функции main() для одновременного хэширования слов.


# entry point
def main():
    # load a file of words
    path = '1m_words.txt'
    words = load_words(path)
    print(f'Loaded {len(words)} words from {path}')
    # create the process pool
    with ProcessPoolExecutor(4) as executor:
        # create a set of word hashes
        known_words = set(executor.map(hash_word, words))
    print(f'Done, with {len(known_words)} hashes')

Ну, не так быстро.

Это будет выполнено, но на это уйдет очень много времени.

Причина в том, что мы добавляем почти миллион задач в пул для выполнения четырьмя процессами, и каждая задача должна быть пикирована и поставлена в очередь. Повторение этих операций столько раз приводит к накладным расходам, которые намного превышают время выполнения задачи.

Мы должны уменьшить накладные расходы, сократив количество внутренних задач в пуле процессов.

Этого можно достичь, задав параметр "chunksize" при вызове map().

Этот параметр определяет, сколько элементов в итерабельной таблице сопоставляется с одной задачей в пуле процессов. По умолчанию один элемент сопоставляется с одной задачей, что означает, что у нас почти миллион задач.

Возможно, хорошим первым подходом будет разделить количество элементов на количество доступных процессов, в данном случае на четыре. Это создаст четыре задачи, например, четыре больших фрагмента слов, каждый из которых будет обрабатываться одним процессом, вероятно, на одном ядре процессора.

Этого можно достичь, вычислив длину списка слов и разделив ее на количество рабочих процессов. Деление может быть не чистым, поэтому мы можем использовать функцию math.ceil() math для округления количества элементов на задачу до ближайшего целого числа.


...
# select a chunk size
chunksize = ceil(len(words) / 4)

Мы можем оценить, что это будет (1049938 / 4) или около 262484,5 слов на задачу, то есть чуть больше полумиллиона.

Затем мы можем использовать этот размер чанков при вызове функции map().


...
# create a set of word hashes
known_words = set(executor.map(hash_word, words, chunksize=chunksize))

Ниже приведен полный пример хэширования словаря слов одновременно с использованием ProcessPoolExecutor.


# SuperFastPython.com
# example of hashing a word list concurrently
from math import ceil
from hashlib import sha512
from concurrent.futures import ProcessPoolExecutor
 
# hash one word using the SHA algorithm
def hash_word(word):
    # create the hash object
    hash_object = sha512()
    # convert the string to bytes
    byte_data = word.encode('utf-8')
    # hash the word
    hash_object.update(byte_data)
    # get the hex hash of the word
    return hash_object.hexdigest()
 
# load a file of words
def load_words(path):
    # open the file
    with open(path) as file:
        # read all data as lines
        return file.readlines()
 
# entry point
def main():
    # load a file of words
    path = '1m_words.txt'
    words = load_words(path)
    print(f'Loaded {len(words)} words from {path}')
    # create the process pool
    with ProcessPoolExecutor(4) as executor:
        # select a chunk size
        chunksize = ceil(len(words) / 4)
        # create a set of word hashes
        known_words = set(executor.map(hash_word, words, chunksize=chunksize))
    print(f'Done, with {len(known_words)} hashes')
 
if __name__ == '__main__':
    main()

Запуск примера загружает слова как раньше, затем создает набор хэшированных слов параллельно, разбивая его на четыре задачи, по одной для каждого процесса в пуле.

Эта параллельная версия действительно предлагает очень незначительное ускорение, занимая около 1,2 секунды на моей системе, предлагая небольшое ускорение.


Loaded 1049938 words from 1m_words.txt
Done, with 979250 hashes

Далее, давайте посмотрим, сможем ли мы получить дальнейшее улучшение, настроив аргумент chunksize.

Тестирование значений chunksize при хэшировании словаря слов с помощью map()

Разделение предметов на задачи для пула процессов - это больше искусство, чем наука.

Неправильная установка этого параметра, например, установка его на единицу при большом количестве задач, может привести к гораздо худшей производительности, чем в последовательном случае. Наивная установка этого значения может привести к эквивалентной или немного лучшей производительности, чем в последовательном случае.

Поэтому мы можем настроить производительность приложения, тестируя различные значения аргумента "chunksize".

В предыдущем разделе мы видели, что размер чанков 262485 привел к производительности, аналогичной серийному случаю.

Я рекомендую протестировать различные размеры чанков, чтобы определить, что хорошо работает на вашей конкретной системе; например, некоторые числа, которые вы можете попробовать, включают:

Обычно такой тип настройки выполняется при работе с распределенными системами и многопроцессными системами, поскольку удельная стоимость сериализации и передачи данных между рабочими зависит от аппаратного обеспечения и конкретных данных.

Если задействованные задачи долго выполняются или являются чувствительными в каком-то смысле, вы можете разработать тестовый жгут с макетами задач.

Мы можем определить функцию для проверки заданного аргумента chunksize, которая также вычисляет, сколько времени потребуется для выполнения задачи, включая фиксированную стоимость создания пула процессов.

Приведенная ниже функция test_chunksize() реализует это, принимая загруженный словарь слов и chunksize для тестирования, и сообщает, сколько времени заняло выполнение задания для заданного chunksize.


# test a chunksize
def test_chunksize(words, size):
    time1 = time()
    # create the process pool
    with ProcessPoolExecutor(4) as executor:
        # create a set of word hashes
        _ = set(executor.map(hash_word, words, chunksize=size))
    time2 = time()
    total = time2 - time1
    print(f'{size}: {total:.3f} seconds')

Мы можем вызвать эту функцию из нашей функции main() со списком различных значений размера чанка для тестирования; например:


# entry point
def main():
    # load a file of words
    path = '1m_words.txt'
    words = load_words(path)
    print(f'Loaded {len(words)} words from {path}')
    # test chunk sizes
    base = ceil(len(words) / 4)
    sizes = [base, 100000, 50000, 10000, 5000, 1000, 500]
    for size in sizes:
        test_chunksize(words, size)

Ниже приведен полный пример тестирования различных значений chunksize.


# SuperFastPython.com
# example of testing chunksize when hashing a word list concurrently
from math import ceil
from time import time
from hashlib import sha512
from concurrent.futures import ProcessPoolExecutor
 
# hash one word using the SHA algorithm
def hash_word(word):
    # create the hash object
    hash_object = sha512()
    # convert the string to bytes
    byte_data = word.encode('utf-8')
    # hash the word
    hash_object.update(byte_data)
    # get the hex hash of the word
    return hash_object.hexdigest()
 
# load a file of words
def load_words(path):
    # open the file
    with open(path, encoding='utf-8') as file:
        # read all data as lines
        return file.readlines()
 
# test a chunksize
def test_chunksize(words, size):
    time1 = time()
    # create the process pool
    with ProcessPoolExecutor(4) as executor:
        # create a set of word hashes
        _ = set(executor.map(hash_word, words, chunksize=size))
    time2 = time()
    total = time2 - time1
    print(f'{size}: {total:.3f} seconds')
 
# entry point
def main():
    # load a file of words
    path = '1m_words.txt'
    words = load_words(path)
    print(f'Loaded {len(words)} words from {path}')
    # test chunk sizes
    base = ceil(len(words) / 4)
    sizes = [base, 100000, 50000, 10000, 5000, 1000, 500]
    for size in sizes:
        test_chunksize(words, size)
 
if __name__ == '__main__':
    main()

Запустив пример, мы видим, что размер чанков около 10 000 или 5 000 будет работать хорошо, выполняя задачу примерно за 0,8 секунды по сравнению с примерно 1,4 в последовательном случае и 1,2 для наивной конфигурации размера чанков, по крайней мере, на моей системе.

Это подчеркивает важность настройки размера чанков для конкретной задачи и аппаратного обеспечения компьютера.


Loaded 1049938 words from 1m_words.txt
262485: 1.242 seconds
100000: 1.122 seconds
50000: 1.157 seconds
10000: 0.871 seconds
5000: 0.842 seconds
1000: 1.036 seconds
500: 1.112 seconds

Что хорошо работало на вашей системе?
Дайте мне знать в комментариях ниже.

Паттерны использования исполнителя ProcessPoolExecutor

Исполнитель ProcessPoolExecutor обеспечивает большую гибкость для выполнения параллельных задач в Python.

Тем не менее, существует несколько обычных схем использования, которые подойдут для большинства программных сценариев.

В этом разделе перечислены общие шаблоны использования с примерами, которые вы можете скопировать и вставить в свой собственный проект и адаптировать по мере необходимости.

Шаблоны, которые мы рассмотрим, следующие:

В каждом примере мы будем использовать надуманную задачу, которая будет спать в течение случайного времени, меньшего одной секунды. Вы можете легко заменить эту задачу на свою собственную задачу в каждом примере.

Также напомним, что каждая программа Python является процессом и по умолчанию имеет один поток, называемый главным, в котором мы выполняем свою работу. Мы будем создавать пул процессов в главном потоке в каждом примере и можем ссылаться на действия в главном потоке в некоторых паттернах, в отличие от действий в процессах в пуле процессов.

Паттерн карты и ожидания

Пожалуй, наиболее распространенным шаблоном при использовании ProcessPoolExecutor является преобразование цикла for, выполняющего функцию для каждого элемента коллекции, для использования мультипроцессинга.

Предполагается, что функция не имеет побочных эффектов, то есть она не обращается к данным за пределами функции и не изменяет предоставленные ей данные. Она принимает данные и выдает результат.

Такие типы циклов for могут быть явно написаны в Python; например:


...
# apply a function to each element in a collection
for item in mylist:
result = task(item)

Лучше использовать встроенную map() функцию, которая применит функцию к каждому элементу итерабельной таблицы за вас.


...
# apply the function to each element in the collection
results = map(task, mylist)

Это не выполняет функцию task() для каждого элемента, пока мы не выполним итерацию результатов, так называемую ленивую оценку:


...
# iterate the results from map
for result in results:
print(result)

Поэтому обычно эта операция сводится к следующему:


...
# iterate the results from map
for result in map(task, mylist):
print(result)

Мы можем выполнить эту же операцию, используя пул процессов, за исключением того, что каждое применение функции к элементу списка является задачей, которая выполняется асинхронно. Например:


...
# iterate the results from map
for result in executor.map(task, mylist):
print(result)

Хотя задачи выполняются асинхронно, результаты итерируются в порядке итерации, предоставленной функции map(), аналогично встроенной функции map().

Таким образом, мы можем рассматривать версию функции map() для пула процессов как параллельную версию функции map() и идеально подходит, если вы хотите обновить цикл for для использования процессов.

Приведенный ниже пример демонстрирует использование шаблона map and wait с задачей, которая будет спать случайное количество времени менее одной секунды и вернет заданное значение.


# SuperFastPython.com
# example of the map and wait pattern for the ProcessPoolExecutor
from time import sleep
from random import random
from concurrent.futures import ProcessPoolExecutor
 
# custom task that will sleep for a variable amount of time
def task(name):
    # sleep for less than a second
    sleep(random())
    return name
 
# entry point
def main():
    # start the process pool
    with ProcessPoolExecutor() as executor:
        # execute tasks concurrently and process results in order
        for result in executor.map(task, range(10)):
            # retrieve the result
            print(result)
 
if __name__ == '__main__':
    main()

Выполняя пример, мы видим, что результаты сообщаются в том порядке, в котором задачи были созданы и отправлены в пул процессов.


0
1
2
3
4
5
6
7
8
9

Функция map() поддерживает целевые функции, принимающие более одного аргумента, предоставляя в качестве аргументов для вызова map() больше, чем iterable.

Например, мы можем определить целевую функцию для map, которая принимает два аргумента, а затем предоставить две итерационные таблицы одинаковой длины для вызова map.

Полный пример приведен ниже.


# SuperFastPython.com
# example of calling map on a process pool with two iterables
from time import sleep
from random import random
from concurrent.futures import ProcessPoolExecutor
 
# custom task that will sleep for a variable amount of time
def task(value1, value2):
    # sleep for less than a second
    sleep(random())
    return (value1, value2)
 
# entry point
def main():
    # start the process pool
    with ProcessPoolExecutor() as executor:
        # submit all tasks
        for result in executor.map(task, ['1', '2', '3'], ['a', 'b', 'c']):
            print(result)
 
if __name__ == '__main__':
    main()

Запуск примера выполняет задачи, как и ожидалось, предоставляя два аргумента для map и сообщая результат, который объединяет оба аргумента.


('1', 'a')
('2', 'b')
('3', 'c')

Вызов функции map немедленно выдаст все задания в пул процессов, даже если вы не будете итерировать итерабель результатов.

Это отличается от встроенной функции map(), которая является ленивой и не вычисляет каждый вызов, пока вы не запросите результат во время итерации.

Приведенный ниже пример подтверждает это, выдавая все задания с картой и не итерируя результаты.


# SuperFastPython.com
# example of calling map on the process pool and not iterating the results
from time import sleep
from random import random
from concurrent.futures import ProcessPoolExecutor
 
# custom task that will sleep for a variable amount of time
def task(value):
    # sleep for less than a second
    sleep(random())
    print(f'Done: {value}')
    return value
 
# entry point
def main():
    # start the process pool
    with ProcessPoolExecutor() as executor:
        # submit all tasks
        executor.map(task, range(5))
    print('All done!')
 
if __name__ == '__main__':
    main()

Выполняя пример, мы видим, что задания отправляются в пул процессов и выполняются без необходимости явной передачи итерабельной таблицы результатов, которая была возвращена.

Использование менеджера контекста обеспечило то, что пул процессов не закрывался до завершения всех задач.


Done: 0
Done: 2
Done: 1
Done: 3
Done: 4
All done!

Отправить и использовать как завершенную

Пожалуй, второй наиболее распространенной схемой использования ProcessPoolExecutor является отправка заданий и использование результатов по мере их поступления.

Этого можно достичь, используя функцию submit() для проталкивания задач в пул процессов, которая возвращает объекты Future, затем вызов метода модуля as_completed() на списке объектов Future, который будет возвращать каждый объект Future по мере завершения его задачи.

Приведенный ниже пример демонстрирует эту модель, представляя задания в порядке от 0 до 9 и показывая результаты в том порядке, в котором они были выполнены.


# SuperFastPython.com
# example of the submit and use as completed pattern for the ProcessPoolExecutor
from time import sleep
from random import random
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
 
# custom task that will sleep for a variable amount of time
def task(name):
    # sleep for less than a second
    sleep(random())
    return name
 
# entry point
def main():
    # start the process pool
    with ProcessPoolExecutor() as executor:
        # submit tasks and collect futures
        futures = [executor.submit(task, i) for i in range(10)]
        # process task results as they are available
        for future in as_completed(futures):
            # retrieve the result
            print(future.result())
 
if __name__ == '__main__':
    main()

Выполняя пример, мы видим, что результаты извлекаются и печатаются в порядке выполнения заданий, а не в том порядке, в котором задания были переданы в пул процессов.


9
6
7
0
3
8
1
4
5
2

Представить и использовать последовательно

Мы можем потребовать результаты выполнения заданий в том порядке, в котором они были представлены.

Это может быть связано с тем, что задачи имеют естественный порядок.

Мы можем реализовать этот паттерн, вызывая submit() для каждой задачи, чтобы получить список объектов Future, затем итерируя объекты Future в том порядке, в котором задачи были представлены, и получая результаты.

 


...
# process task results in the order they were submitted
for future in futures:
# retrieve the result
print(future.result())

Приведенный ниже пример демонстрирует эту модель, отправляя задания в порядке от 0 до 9 и показывая результаты в том порядке, в котором они были отправлены.


# SuperFastPython.com
# example of the submit and use sequentially pattern for the ProcessPoolExecutor
from time import sleep
from random import random
from concurrent.futures import ProcessPoolExecutor
 
# custom task that will sleep for a variable amount of time
def task(name):
    # sleep for less than a second
    sleep(random())
    return name
 
# entry point
def main():
    # start the process pool
    with ProcessPoolExecutor() as executor:
        # submit tasks and collect futures
        futures = [executor.submit(task, i) for i in range(10)]
        # process task results in the order they were submitted
        for future in futures:
            # retrieve the result
            print(future.result())
 
if __name__ == '__main__':
    main()

Выполняя пример, мы видим, что результаты извлекаются и печатаются в том порядке, в котором задания были представлены, а не в том порядке, в котором задания были выполнены.


0
1
2
3
4
5
6
7
8
9

Отправить и использовать обратный вызов

Мы можем не захотеть явно обрабатывать результаты, как только они станут доступны; вместо этого мы хотим вызвать функцию для результата.

Вместо того чтобы делать это вручную, как в приведенном выше шаблоне as completed, мы можем попросить пул процессов вызвать для нас функцию с результатом автоматически.

Этого можно достичь, установив обратный вызов на каждом будущем объекте, вызвав функцию add_done_callback() и передав имя функции.

Затем пул процессов будет вызывать функцию обратного вызова по мере завершения каждой задачи, передавая объекты Future для задачи.

Приведенный ниже пример демонстрирует этот шаблон, регистрируя пользовательскую функцию обратного вызова, которая будет применяться к каждой задаче по мере ее выполнения.


# SuperFastPython.com
# example of the submit and use a callback pattern for the ProcessPoolExecutor
from time import sleep
from random import random
from concurrent.futures import ProcessPoolExecutor
 
# custom task that will sleep for a variable amount of time
def task(name):
    # sleep for less than a second
    sleep(random())
    return name
 
# custom callback function called on tasks when they complete
def custom_callback(future):
    # retrieve the result
    print(future.result())
 
# entry point
def main():
    # start the process pool
    with ProcessPoolExecutor() as executor:
        # submit tasks and collect futures
        futures = [executor.submit(task, i) for i in range(10)]
        # register the callback on all tasks
        for future in futures:
            future.add_done_callback(custom_callback)
        # wait for tasks to complete...
 
if __name__ == '__main__':
    main()

Выполняя пример, мы видим, что результаты извлекаются и печатаются в порядке их выполнения, а не в порядке выполнения задач.


8
0
7
1
4
6
5
3
2
9

Мы можем зарегистрировать несколько обратных вызовов на каждом объекте Future; он не ограничен одним обратным вызовом.

Функции обратного вызова вызываются в том порядке, в котором они были зарегистрированы на каждом объекте Future.

Следующий пример демонстрирует наличие двух обратных вызовов на каждом Future.


# SuperFastPython.com
# example of the submit and use multiple callbacks for the ProcessPoolExecutor
from time import sleep
from random import random
from concurrent.futures import ProcessPoolExecutor
 
# custom task that will sleep for a variable amount of time
def task(name):
    # sleep for less than a second
    sleep(random())
    return name
 
# custom callback function called on tasks when they complete
def custom_callback1(future):
    # retrieve the result
    print(f'Callback 1: {future.result()}')
 
# custom callback function called on tasks when they complete
def custom_callback2(future):
    # retrieve the result
    print(f'Callback 2: {future.result()}')
 
# entry point
def main():
    # start the process pool
    with ProcessPoolExecutor() as executor:
        # submit tasks and collect futures
        futures = [executor.submit(task, i) for i in range(10)]
        # register the callbacks on all tasks
        for future in futures:
            future.add_done_callback(custom_callback1)
            future.add_done_callback(custom_callback2)
        # wait for tasks to complete...
 
if __name__ == '__main__':
    main()

Выполняя пример, мы видим, что результаты сообщаются в порядке выполнения задач, а две функции обратного вызова вызываются для каждой задачи в том порядке, в котором мы зарегистрировали их в каждом объекте Future.


Callback 1: 2
Callback 2: 2
Callback 1: 3
Callback 2: 3
Callback 1: 6
Callback 2: 6
Callback 1: 8
Callback 2: 8
Callback 1: 7
Callback 2: 7
Callback 1: 5
Callback 2: 5
Callback 1: 0
Callback 2: 0
Callback 1: 1
Callback 2: 1
Callback 1: 4
Callback 2: 4
Callback 1: 9
Callback 2: 9

Отправить и дождаться всех

Обычно принято отправлять все задания, а затем ждать завершения всех заданий в пуле процессов.

Этот шаблон может быть полезен, когда задачи не возвращают результат напрямую, например, если каждая задача хранит результат в ресурсе, например, в файле.

Существует два способа дождаться завершения задач: вызов функции модуля wait() или вызов shutdown().

Наиболее вероятный случай - вы хотите явно дождаться завершения набора или подмножества задач в пуле процессов.

Вы можете достичь этого, передав список задач в функцию wait(), которая по умолчанию будет ждать завершения всех задач.


...
# wait for all tasks to complete
wait(futures)

Мы можем явно указать ожидание всех задач, установив аргумент "return_when" в константу ALL_COMPLETED; например:


...
# wait for all tasks to complete
wait(futures, return_when=ALL_COMPLETED)

Приведенный ниже пример демонстрирует этот паттерн. Обратите внимание, что мы намеренно игнорируем возврат от вызова wait(), поскольку в данном случае нам не нужно его проверять.


# SuperFastPython.com
# example of the submit and wait for all pattern for the ProcessPoolExecutor
from time import sleep
from random import random
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import wait
 
# custom task that will sleep for a variable amount of time
def task(name):
    # sleep for less than a second
    sleep(random())
    # display the result
    print(name)
 
# entry point
def main():
    # start the process pool
    with ProcessPoolExecutor() as executor:
        # submit tasks and collect futures
        futures = [executor.submit(task, i) for i in range(10)]
        # wait for all tasks to complete
        wait(futures)
    print('All tasks are done!')
 
if __name__ == '__main__':
    main()

Запустив пример, мы видим, что результаты обрабатываются каждой задачей по мере ее завершения. Важно отметить, что главный процесс ожидает завершения всех задач, прежде чем продолжить работу и напечатать сообщение.


3
9
0
8
4
6
2
1
5
7
All tasks are done!

Альтернативным подходом может быть выключение пула процессов и ожидание завершения всех выполняющихся и поставленных в очередь задач, прежде чем двигаться дальше.

Это может быть предпочтительным, когда у нас нет списка объектов Future или когда мы собираемся использовать пул процессов только один раз для набора задач.

Мы можем реализовать этот паттерн, используя менеджер контекста; например:


# SuperFastPython.com
# example of the submit and wait for all with shutdown pattern for the process pool
from time import sleep
from random import random
from concurrent.futures import ProcessPoolExecutor
 
# custom task that will sleep for a variable amount of time
def task(name):
    # sleep for less than a second
    sleep(random())
    # display the result
    print(name)
 
# entry point
def main():
    # start the process pool
    with ProcessPoolExecutor() as executor:
        # submit tasks and collect futures
        futures = [executor.submit(task, i) for i in range(10)]
        # wait for all tasks to complete
    print('All tasks are done!')
 
if __name__ == '__main__':
    main()

Выполняя пример, мы видим, что главный процесс не переходит к печати сообщения до тех пор, пока не будут выполнены все задачи, после того как пул процессов будет автоматически выключен менеджером контекста.


1
2
8
4
5
3
9
0
7
6
All tasks are done!

Шаблон автоматического выключения контекстного менеджера может быть непонятен разработчикам, не привыкшим к тому, как работают пулы процессов, поэтому в предыдущем примере комментарий в конце блока контекстного менеджера.

Мы можем добиться того же эффекта без менеджера контекста и явного вызова shutdown.


...
# wait for all tasks to complete and close the pool
executor.shutdown()

Напомните, что функция shutdown() по умолчанию будет ждать завершения всех задач и не отменит ни одной поставленной в очередь задачи, но мы можем сделать это явным, установив аргумент "wait" в True, а аргумент "cancel_futures" в False; например:


...
# wait for all tasks to complete and close the pool
executor.shutdown(wait=True, cancel_futures=False)

Приведенный ниже пример демонстрирует модель ожидания завершения всех задач в пуле процессов путем вызова shutdown() перед тем, как двигаться дальше.


# SuperFastPython.com
# example of the submit and wait for all with shutdown pattern for the process pool
from time import sleep
from random import random
from concurrent.futures import ProcessPoolExecutor
 
# custom task that will sleep for a variable amount of time
def task(name):
    # sleep for less than a second
    sleep(random())
    # display the result
    print(name)
 
# entry point
def main():
    # start the process pool
    executor = ProcessPoolExecutor()
    # submit tasks and collect futures
    futures = [executor.submit(task, i) for i in range(10)]
    # wait for all tasks to complete
    executor.shutdown()
    print('All tasks are done!')
 
if __name__ == '__main__':
    main()

Выполняя пример, мы видим, что все задачи сообщают о своих результатах по мере их завершения, и что главный поток не переходит к следующему, пока все задачи не завершатся и пул процессов не будет закрыт.


3
5
2
6
8
9
7
1
4
0
All tasks are done!

Отправить и ждать первого

Обычно можно выдать много заданий и быть заинтересованным только в первом полученном результате.

То есть, не результат первой задачи, а результат любой задачи, которая первой завершает свое выполнение.

Это может произойти, если вы пытаетесь получить доступ к одному и тому же ресурсу из нескольких мест, например, к файлу или каким-либо данным.

Этого можно достичь, используя функцию модуля wait() и установив аргумент "return_when" в константу FIRST_COMPLETED.


...
# wait until any task completes
done, not_done = wait(futures, return_when=FIRST_COMPLETED)

Мы также должны управлять пулом процессов вручную, создавая его и вызывая shutdown() вручную, чтобы мы могли продолжить выполнение главного процесса, не дожидаясь завершения всех остальных задач.

Приведенный ниже пример демонстрирует этот паттерн и прекратит ожидание, как только первая задача будет выполнена.


# SuperFastPython.com
# example of the submit and wait for first the ProcessPoolExecutor
from time import sleep
from random import random
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import wait
from concurrent.futures import FIRST_COMPLETED
 
# custom task that will sleep for a variable amount of time
def task(name):
    # sleep for less than a second
    sleep(random())
    return name
 
# entry point
def main():
    # start the process pool
    executor = ProcessPoolExecutor()
    # submit tasks and collect futures
    futures = [executor.submit(task, i) for i in range(10)]
    # wait until any task completes
    done, not_done = wait(futures, return_when=FIRST_COMPLETED)
    # get the result from the first task to complete
    print(done.pop().result())
    # shutdown without waiting
    executor.shutdown(wait=False, cancel_futures=True)
 
if __name__ == '__main__':
    main()

Запуск примера будет ждать завершения любой из задач, затем получит результат первой выполненной задачи и выключит пул процессов.

Важно, что задачи будут продолжать выполняться в пуле процессов в фоновом режиме, и главный поток не закроется, пока все задачи не будут завершены.


2

Теперь, когда мы рассмотрели некоторые общие модели использования ProcessPoolExecutor, давайте посмотрим, как можно настроить конфигурацию пула процессов.

Как настроить ProcessPoolExecutor

Мы можем настроить конфигурацию пула процессов при создании экземпляра ProcessPoolExecutor.

Есть три аспекта пула процессов, которые мы можем захотеть настроить для нашего приложения; это количество рабочих, имена процессов в пуле и инициализация каждого процесса в пуле.

Давайте рассмотрим каждую по очереди.

Настройте количество процессов

Количество процессов в пуле процессов может быть настроено аргументом "max_workers".

Принимает целое положительное число и по умолчанию равно количеству процессоров в вашей системе.

Например, если у вас в системе 2 физических процессора, и каждый из них имеет гиперпоточность (распространенную в современных процессорах), то у вас будет 2 физических и 4 логических процессора. Python будет видеть 4 процессора. Тогда количество рабочих процессов по умолчанию в вашей системе будет равно 4.

Количество рабочих должно быть меньше или равно 61, если Windows является вашей операционной системой.

Обычно в системе больше процессов, чем CPU (физических или логических), если целевая функция задачи выполняет операции ввода-вывода.

Причина этого в том, что задачи, связанные с IO, проводят большую часть времени в ожидании, а не используют ЦП. Примерами являются чтение или запись с жестких дисков, DVD-приводов, принтеров, сетевых подключений и многое другое. Мы обсудим наилучшее применение процессов в одном из последующих разделов.

Если вам требуются сотни или несколько процессов для задач, связанных с вводом-выводом, возможно, вам стоит подумать об использовании потоков и модуля ThreadPoolExecutor. Если для задач, связанных с вводом-выводом, вам требуются тысячи процессов, возможно, вам стоит рассмотреть возможность использования модуля AsyncIO.

Сначала проверим, сколько процессов создано для пулов процессов в вашей системе.

Посмотрев на исходный код для ProcessPoolExecutor, мы можем увидеть, что количество рабочих процессов, выбранных по умолчанию, хранится в свойстве _max_workers, к которому мы можем получить доступ и сообщить о нем после создания пула процессов.

Примечание: "_max_workers" является защищенным членом и может измениться в будущем.

В примере ниже сообщается о количестве процессов по умолчанию в пуле процессов в вашей системе.


# SuperFastPython.com
# report the default number of worker processes on your system
from concurrent.futures import ProcessPoolExecutor
 
# entry point
def main():
    # create a process pool with the default number of worker processes
    pool = ProcessPoolExecutor()
    # report the number of worker processes chosen by default
    print(pool._max_workers)
 
if __name__ == '__main__':
    main()

Запуск примера сообщает о количестве рабочих процессов, используемых по умолчанию в вашей системе.

У меня четыре физических ядра процессора, восемь логических ядер; поэтому по умолчанию используется 8 процессов.


8

Сколько рабочих процессов выделено по умолчанию в вашей системе?
Дайте мне знать в комментариях ниже.

Мы можем указать количество рабочих процессов напрямую, и это хорошая идея в большинстве приложений.

В примере ниже показано, как настроить 60 рабочих процессов.


# SuperFastPython.com
# configure and report the default number of worker processes
from concurrent.futures import ProcessPoolExecutor
 
# entry point
def main():
    # create a process pool with a large number of worker processes
    pool = ProcessPoolExecutor(60)
    # report the number of worker processes
    print(pool._max_workers)
 
if __name__ == '__main__':
    main()

Запуск примера настраивает пул процессов на использование 60 процессов и подтверждает, что он создаст 60 процессов.


60

Сколько процессов следует использовать?

Это сложный вопрос и зависит от специфики вашей программы.

Возможно, если у вас меньше 100 IO-bound задач (или 60 в Windows), то вы захотите установить количество рабочих процессов равным количеству задач.

Если вы работаете с задачами, связанными с IO, то вы, возможно, захотите ограничить число работников количеством логических CPU в вашей системе, например, по умолчанию для ProcessPoolExecutor.

Если ваше приложение предполагается выполнять несколько раз в будущем, вы можете протестировать различное количество процессов и сравнить общее время выполнения, а затем выбрать количество процессов, которое дает приблизительно наилучшую производительность. Возможно, вы захотите сымитировать задачу в этих тестах с помощью случайной операции сна или вычисления.

Настроить многопроцессный контекст

Различные операционные системы предоставляют разные способы создания новых процессов.

Некоторые операционные системы поддерживают несколько способов создания процессов.

Пожалуй, два самых распространенных способа создания новых процессов - это spawn и fork.

Ваша установка Python выберет наиболее подходящий метод создания нового процесса для вашей операционной системы. Тем не менее, вы можете указать, как создаются новые процессы, и это называется "контекст процесса."

Вы можете получить новый контекст процесса для определенного метода создания новых процессов (например, fork или spawn) и передать этот контекст в ProcessPoolExecutor. Это позволит всем новым процессам, созданным пулом процессов, создаваться с использованием предоставленного контекста и использовать предпочитаемый вами метод для запуска процессов.

Для начала давайте посмотрим, какие методы запуска процессов поддерживаются вашей операционной системой, и выясним, какой метод используется по умолчанию.

Программа ниже сообщит о методах запуска процессов и методе запуска по умолчанию в вашей системе.


# SuperFastPython.com
from multiprocessing import get_all_start_methods
from multiprocessing import get_start_method
# list of all process start methods supported on the os
result = get_all_start_methods()
print(result)
# get the default process start method
result = get_start_method()
print(result)

Запуск примера сначала сообщает обо всех методах запуска процессов, поддерживаемых вашей системой.

Далее, поддерживается метод запуска процесса по умолчанию.

В данном случае, запуская программу на MacOS, мы видим, что операционная система поддерживает все три метода запуска процессов, а по умолчанию используется метод "spawn".


['spawn', 'fork', 'forkserver']
spawn

Далее мы можем проверить контекст по умолчанию, используемый для запуска процессов в пуле процессов.

Исполнитель ProcessPoolExecutor будет использовать контекст по умолчанию, если он не настроен на использование другого контекста.

Мы можем проверить контекст запуска процесса, используемый ProcessPoolExecutor через защищенное свойство "_mp_context".

В примере ниже создается пул процессов и сообщается контекст по умолчанию, используемый пулом процессов


# SuperFastPython.com
# example of checking the process start context
from concurrent.futures import ProcessPoolExecutor
 
# entry point
def main():
    # create a process pool
    with ProcessPoolExecutor() as executor:
        # report the context used
        print(executor._mp_context)
 
if __name__ == '__main__':
    main()

Запуск примера создает пул процессов и сообщает контекст запуска процесса по умолчанию, используемый пулом.

В данном случае мы видим, что это контекст 'spawn', обозначаемый объектом "SpawnContext".


<multiprocessing.context.SpawnContext object at 0x1034fd4c0>

Далее мы можем создать контекст и передать его в пул процессов.

Исполнитель ProcessPoolExecutor принимает аргумент с именем "mp_context", который определяет контекст, используемый для создания процессов в пуле.

По умолчанию он установлен в None, в этом случае используется контекст по умолчанию.

Мы можем установить контекст, сначала вызвав функцию get_context() и указав предпочтительный метод в виде строки, совпадающей со строкой, полученной при вызове функции get_all_start_methods(), напр. например, 'fork' или 'spawn'.

Возможно, мы хотим заставить все процессы создаваться с помощью метода 'fork', независимо от значения по умолчанию.

Примечание: использование 'fork' не будет работать на windows. Вы можете изменить его на использование 'spawn' или сообщить о появлении сообщения об ошибке в комментариях ниже.

Сначала мы создаем контекст, затем передаем этот контекст пулу процессов. Затем мы можем получить доступ и сообщить о менеджере контекста, используемом пулом процессов; например.


...
# create a start process context
context = get_context('fork')
# create a process pool
with ProcessPoolExecutor(mp_context=context) as executor:
    # report the context used
    print(executor._mp_context)

Ниже приведен полный пример установки менеджера контекста для пула процессов и последующего подтверждения того, что он был изменен.


# SuperFastPython.com
# example of setting the process start context
from multiprocessing import get_context
from concurrent.futures import ProcessPoolExecutor
 
# entry point
def main():
    # create a start process context
    context = get_context('fork')
    # create a process pool
    with ProcessPoolExecutor(mp_context=context) as executor:
        # report the context used
        print(executor._mp_context)
 
if __name__ == '__main__':
    main()

При выполнении примера сначала создается новый контекст запуска процесса, затем он передается новому ProcessPoolExecutor.

После создания пула сообщается об используемом им менеджере контекста, которым в данном случае является 'fork', обозначаемый объектом 'ForkContext'.

Настройте инициализатор процесса

Процессы рабочего могут вызвать функцию до того, как они начнут выполнять задания.

Это называется функция-инициализатор и может быть указана через аргумент "initializer" при создании пула процессов. Если функция инициализатора принимает аргументы, их можно передать через аргумент "initargs" пула процессов, который представляет собой кортеж аргументов для передачи функции инициализатора.

По умолчанию функция инициализатора отсутствует.

Мы можем выбрать функцию инициализации для рабочих процессов, если хотим, чтобы каждый процесс устанавливал ресурсы, специфичные для него.

Примером может служить специфический для процесса файл журнала или специфическое для процесса соединение с удаленным ресурсом, таким как сервер или база данных. Тогда ресурс будет доступен для всех задач, выполняемых процессом, а не будет создаваться и отбрасываться или открываться и закрываться для каждой задачи.

Затем эти специфические для процесса ресурсы могут быть сохранены где-нибудь, куда рабочий процесс может ссылаться, например, в глобальной переменной или в локальной переменной процесса. Необходимо позаботиться о том, чтобы правильно закрыть эти ресурсы после завершения работы с пулом процессов.

В приведенном ниже примере создается пул процессов с двумя рабочими и используется пользовательская функция инициализации. В этом случае функция не делает ничего, кроме печати сообщения. Затем мы выполняем десять задач с помощью пула процессов.


# SuperFastPython.com
# example of a custom worker process initialization function
from time import sleep
from random import random
from concurrent.futures import ProcessPoolExecutor
 
# function for initializing the worker processes
def initializer_worker():
    # report an initialization message
    print(f'Initializing worker process.', flush=True)
 
# a mock task that sleeps for a random amount of time less than one second
def task(identifier):
    sleep(random())
    # get the unique name
    return identifier
 
# entry point
def main():
    # create a process pool
    with ProcessPoolExecutor(max_workers=2, initializer=initializer_worker) as executor:
        # execute asks
        for result in executor.map(task, range(10)):
            print(result)
 
if __name__ == '__main__':
    main()

Выполняя пример, мы видим, что два процесса инициализируются перед запуском любых задач, затем все десять задач успешно завершаются.


Initializing worker process.
Initializing worker process.
0
1
2
3
4
5
6
7
8
9

Теперь, когда мы знакомы с тем, как настраивать пулы процессов, давайте узнаем больше о том, как проверять и управлять задачами с помощью объектов Future.

Как использовать объекты будущего в деталях

Future объекты создаются, когда мы вызываем submit() для отправки задач в ProcessPoolExecutor для асинхронного выполнения.

Объекты

Future предоставляют возможность проверить состояние задачи (например, запущена ли она?) и управлять выполнением задачи (например, отменить).

В этом разделе мы рассмотрим некоторые примеры проверки и манипулирования объектами Future, созданными нашим пулом процессов.

В частности, мы рассмотрим следующее:

Сначала рассмотрим подробнее жизненный цикл объекта Future.

Жизненный цикл будущего объекта

Объект Future создается, когда мы вызываем submit() для задачи на ProcessPoolExecutor.

Во время выполнения задачи объект Future имеет статус "running".

Когда задача завершается, она имеет статус "done", и если целевая функция возвращает значение, его можно получить.

Прежде чем задача будет запущена, она будет вставлена в очередь задач для рабочего процесса, который должен принять ее и начать выполнение. В этом состоянии "pre-running" задача может быть отменена и имеет состояние "cancelled". Задача в состоянии "выполняется" не может быть отменена.

Задача "отмененная" всегда также находится в состоянии "выполнено".

Во время выполнения задачи может возникнуть не пойманное исключение, что приведет к остановке выполнения задачи. Исключение будет сохранено и может быть извлечено напрямую или будет повторно поднято при попытке извлечь результат.

На рисунке ниже представлен жизненный цикл объекта Future.

Overview of the Life-Cycle of a Python Future Object..
Обзор жизненного цикла объекта будущего в Python.

Теперь, когда мы знакомы с жизненным циклом объекта Future, давайте рассмотрим, как мы можем использовать проверку и манипулировать им.

Как проверить состояние фьючерсов

Существует два типа нормального состояния объекта Future, которые мы можем захотеть проверить: выполняется и сделано.

Каждая имеет свою собственную функцию, которая возвращает True, если объект Future находится в этом состоянии или False в противном случае; например:

Мы можем разработать простые примеры, демонстрирующие, как проверить состояние объекта Future.

В этом примере мы можем запустить задачу, затем проверить, что она запущена и не выполнена, подождать ее завершения, затем проверить, что она выполнена и не запущена.


# SuperFastPython.com
# check the status of a Future object for task executed by a process pool
from time import sleep
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import wait
 
# mock task that will sleep for a moment
def work():
    sleep(0.5)
 
# entry point
def main():
    # create a process pool
    with ProcessPoolExecutor() as executor:
        # start one process
        future = executor.submit(work)
        # confirm that the task is running
        running = future.running()
        done = future.done()
        print(f'Future running={running}, done={done}')
        # wait for the task to complete
        wait([future])
        # confirm that the task is done
        running = future.running()
        done = future.done()
        print(f'Future running={running}, done={done}')
 
if __name__ == '__main__':
    main()

Выполняя пример, мы видим, что сразу после отправки задания оно отмечается как выполняющееся, а после завершения задания мы можем подтвердить, что оно выполнено.


Future running=True, done=False
Future running=False, done=True

Как получить результаты от фьючерсов

Когда задача завершена, мы можем получить результат задачи, вызвав функцию result() на Future.

Возвращается результат от функции возврата задания, которое мы выполнили, или None, если функция не вернула значение.

Функция будет блокироваться до тех пор, пока задание не завершится и можно будет получить результат. Если задание уже выполнено, она вернет результат немедленно.

В примере ниже показано, как получить результат из объекта Future


# SuperFastPython.com
# get the result from a completed future task
from time import sleep
from concurrent.futures import ProcessPoolExecutor
 
# mock task that will sleep for a moment
def work():
    sleep(1)
    return "all done"
 
# entry point
def main():
    # create a process pool
    with ProcessPoolExecutor() as executor:
        # start one process
        future = executor.submit(work)
        # get the result from the task, wait for task to complete
        result = future.result()
        print(f'Got Result: {result}')
 
if __name__ == '__main__':
    main()

Выполнение примера отправляет задание, затем пытается получить результат, блокируется до тех пор, пока результат не будет доступен, затем сообщает о полученном результате.


Got Result: all done

Мы также можем установить таймаут, сколько времени мы хотим ждать результата в секундах.

Если таймаут истекает до получения результата, возникает ошибка TimeoutError.

Приведенный ниже пример демонстрирует тайм-аут, показывая, как отказаться от ожидания до завершения задачи.


# SuperFastPython.com
# set a timeout when getting results from a future
from time import sleep
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import TimeoutError
 
# mock task that will sleep for a moment
def work():
    sleep(1)
    return "all done"
 
# entry point
def main():
    # create a process pool
    with ProcessPoolExecutor() as executor:
        # start one process
        future = executor.submit(work)
        # get the result from the task, wait for task to complete
        try:
            result = future.result(timeout=0.5)
            print(f'Got Result: {result}')
        except TimeoutError:
            print('Gave up waiting for a result')
 
if __name__ == '__main__':
    main()

Запуск примера показывает, что мы отказались от ожидания результата через полсекунды.


Gave up waiting for a result

Как отменить фьючерсы

Мы также можем отменить задачу, которая еще не начала выполняться.

Напомните, что когда мы помещаем задачи в пул с помощью submit() или map(), задачи добавляются во внутреннюю очередь работ, из которой рабочие процессы могут извлекать задачи и выполнять их.

Пока задача находится в очереди и еще не запущена, мы можем отменить ее, вызвав cancel() на объекте Future, связанном с задачей. Функция cancel() вернет True, если задача была отменена, False в противном случае.

Давайте продемонстрируем это на рабочем примере.

Мы можем создать пул процессов с одним процессом, затем запустить долго выполняющуюся задачу, затем подать вторую задачу, запросить ее отмену, затем подтвердить, что она действительно была отменена.


# SuperFastPython.com
# example of cancelling a task via it's future
from time import sleep
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import wait
 
# mock task that will sleep for a moment
def work(sleep_time):
    sleep(sleep_time)
 
# entry point
def main():
    # create a process pool
    with ProcessPoolExecutor(1) as executor:
        # start a long running task
        future1 = executor.submit(work, 2)
        running = future1.running()
        print(f'First task running={running}')
        # start a second
        future2 = executor.submit(work, 0.1)
        running = future2.running()
        print(f'Second task running={running}')
        # cancel the second task
        was_cancelled = future2.cancel()
        print(f'Second task was cancelled: {was_cancelled}')
        # wait for the second task to finish, just in case
        wait([future2])
        # confirm it was cancelled
        running = future2.running()
        cancelled = future2.cancelled()
        done = future2.done()
        print(f'Second task running={running}, cancelled={cancelled}, done={done}')
        # wait for the long running task to finish
        wait([future1])
 
if __name__ == '__main__':
    main()

Выполнив пример, мы видим, что первая задача запущена и работает нормально.

Вторая задача запланирована, но еще не запущена, потому что пул процессов занят первой задачей. Затем мы отменяем вторую задачу и подтверждаем, что она действительно не запущена; она была отменена и выполнена.


First task running=True
Second task running=False
Second task was cancelled: True
Second task running=False, cancelled=True, done=True

Отмена запущенного будущего

Далее попробуем отменить задачу, которая уже завершила выполнение.

Полный пример приведен ниже.


# SuperFastPython.com
# example of trying to cancel a running task via its future
from time import sleep
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import wait
 
# mock task that will sleep for a moment
def work(sleep_time):
    sleep(sleep_time)
 
# entry point
def main():
    # create a process pool
    with ProcessPoolExecutor(1) as executor:
        # start a long running task
        future = executor.submit(work, 2)
        running = future.running()
        print(f'Task running={running}')
        # try to cancel the task
        was_cancelled = future.cancel()
        print(f'Task was cancelled: {was_cancelled}')
        # wait for the task to finish
        wait([future])
        # check if it was cancelled
        running = future.running()
        cancelled = future.cancelled()
        done = future.done()
        print(f'Task running={running}, cancelled={cancelled}, done={done}')
 
if __name__ == '__main__':
    main()

Запустив пример, мы видим, что задача была запущена в обычном режиме.

Затем мы попытались отменить задание, но это не удалось, как мы и ожидали, поскольку задание уже было запущено.

Затем мы ждем завершения задачи и проверяем ее статус. Мы видим, что задача больше не выполняется и не была отменена, как мы ожидаем, а была помечена как выполненная.


Task running=True
Task was cancelled: False
Task running=False, cancelled=False, done=True

Отмена выполненного будущего

Подумайте, что произойдет, если мы попытаемся отменить уже выполненное задание.

Мы можем ожидать, что отмена уже выполненного задания не имеет никакого эффекта, и так оно и есть.

Это можно продемонстрировать на коротком примере.

Мы запускаем и выполняем задачу в обычном режиме, затем ждем ее завершения и сообщаем о ее состоянии. Затем мы пытаемся отменить задание.


# SuperFastPython.com
# example of trying to cancel a done task via its future
from time import sleep
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import wait
 
# mock task that will sleep for a moment
def work(sleep_time):
    sleep(sleep_time)
 
# entry point
def main():
    # create a process pool
    with ProcessPoolExecutor(1) as executor:
        # start a long running task
        future = executor.submit(work, 2)
        running = future.running()
        # wait for the task to finish
        wait([future])
        # check the status
        running = future.running()
        cancelled = future.cancelled()
        done = future.done()
        print(f'Task running={running}, cancelled={cancelled}, done={done}')
        # try to cancel the task
        was_cancelled = future.cancel()
        print(f'Task was cancelled: {was_cancelled}')
        # check if it was cancelled
        running = future.running()
        cancelled = future.cancelled()
        done = future.done()
        print(f'Task running={running}, cancelled={cancelled}, done={done}')
 
if __name__ == '__main__':
    main()

Запуск примера подтверждает, что задание выполняется и помечается выполненным, как обычно.

Попытка отменить задание не удалась, и проверка статуса после попытки отмены подтверждает, что задание не было затронуто попыткой.


Task running=False, cancelled=False, done=True
Task was cancelled: False
Task running=False, cancelled=False, done=True

Как добавить обратный вызов к фьючерсам

Мы уже видели выше как добавить обратный вызов к Future; тем не менее, давайте рассмотрим еще несколько примеров для полноты, включая некоторые крайние случаи.

Мы можем зарегистрировать одну или несколько функций обратного вызова на объекте Future, вызвав функцию add_done_callback() и указав имя вызываемой функции.

Функции обратного вызова будут вызваны с объектом Future в качестве аргумента сразу после завершения задачи. Если зарегистрировано более одной функции обратного вызова, то они будут вызываться в порядке их регистрации, а любые исключения в каждой функции обратного вызова будут пойманы, зарегистрированы и проигнорированы.

Обратный вызов будет вызван рабочим процессом, который выполнил задание.

В примере ниже показано, как добавить функцию обратного вызова к объекту Future.


# SuperFastPython.com
# add a callback option to a future object
from time import sleep
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import wait
 
# callback function to call when a task is completed
def custom_callback(future):
    print('Custom callback was called', flush=True)
 
# mock task that will sleep for a moment
def work():
    sleep(1)
    print('Task is done')
 
# entry point
def main():
    # create a process pool
    with ProcessPoolExecutor() as executor:
        # execute the task
        future = executor.submit(work)
        # add the custom callback
        future.add_done_callback(custom_callback)
        # wait for the task to complete
        wait([future])
 
if __name__ == '__main__':
    main()

Выполняя пример, мы видим, что сначала выполняется задание, затем обратный вызов, как мы и ожидали.


Task is done
Custom callback was called

Общая ошибка при использовании будущих обратных вызовов

Частая ошибка - забыть добавить объект Future в качестве аргумента к пользовательскому обратному вызову.

Например:


# callback function to call when a task is completed
def custom_callback():
    print('Custom callback was called')

Если вы зарегистрируете эту функцию и попытаетесь выполнить код, вы получите TypeError следующим образом:


exception calling callback for <Future at 0x10d8e2730 state=finished returned NoneType>
Traceback (most recent call last):
  ...
    callback(self)
TypeError: custom_callback() takes 0 positional arguments but 1 was given

Сообщение в TypeError дает понять, как исправить проблему: добавьте единственный аргумент в функцию для объекта Future, даже если вы не собираетесь использовать его в обратном вызове.

Выполнение обратных вызовов при отмене будущего

Мы также можем увидеть эффект обратных вызовов на объектах Future для задач, которые отменяются.

Этот эффект не документирован в API, но можно ожидать, что обратный вызов будет выполняться всегда, независимо от того, выполняется ли задача нормально или отменяется. Так и происходит.

Нижеприведенный пример демонстрирует это.

Сначала создается пул процессов с одним процессом. Выпускается долго выполняющаяся задача, которая занимает весь пул, затем мы посылаем вторую задачу, добавляем обратный вызов ко второй задаче, отменяем ее и ждем завершения всех задач.


# SuperFastPython.com
# example of a callback for a cancelled task via the future object
from time import sleep
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import wait
 
# callback function to call when a task is completed
def custom_callback(future):
    print('Custom callback was called', flush=True)
 
# mock task that will sleep for a moment
def work(sleep_time):
    sleep(sleep_time)
 
# entry point
def main():
    # create a process pool
    with ProcessPoolExecutor(1) as executor:
        # start a long running task
        future1 = executor.submit(work, 2)
        running = future1.running()
        print(f'First task running={running}')
        # start a second
        future2 = executor.submit(work, 0.1)
        running = future2.running()
        print(f'Second task running={running}')
        # add the custom callback
        future2.add_done_callback(custom_callback)
        # cancel the second task
        was_cancelled = future2.cancel()
        print(f'Second task was cancelled: {was_cancelled}')
        # explicitly wait for all tasks to complete
        wait([future1, future2])
 
if __name__ == '__main__':
    main()

Выполнив пример, мы видим, что первая задача запущена, как мы и ожидали.

Вторая задача запланирована, но не успевает выполниться, прежде чем мы ее отменяем.

Обратный вызов выполняется сразу после отмены задачи, затем мы сообщаем в основном потоке, что задача действительно была отменена правильно.


First task running=True
Second task running=False
Custom callback was called
Second task was cancelled: True

Как получить исключения из фьючерсов

Задача может вызвать исключение во время выполнения.

Если мы можем предвидеть исключение, мы можем обернуть часть нашей функции задачи в блок try-except и обработать исключение внутри задачи.

Если в нашей задаче возникнет неожиданное исключение, задача прекратит выполнение.

На основании статуса задачи мы не можем знать, было ли вызвано исключение, но мы можем проверить наличие исключения напрямую.

Затем мы можем получить доступ к исключению через функцию exception(). В качестве альтернативы исключение будет повторно поднято при вызове функции result() при попытке получить результат.

Мы можем продемонстрировать это на примере.

Приведенный ниже пример вызовет ошибку ValueError внутри задачи, которая не будет поймана, а вместо этого будет поймана пулом процессов, чтобы мы могли получить к ней доступ позже.


# SuperFastPython.com
# example of handling an exception raised within a task
from time import sleep
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import wait
 
# mock task that will sleep for a moment
def work():
    sleep(1)
    raise Exception('This is Fake!')
    return "never gets here"
 
# entry point
def main():
    # create a process pool
    with ProcessPoolExecutor() as executor:
        # execute our task
        future = executor.submit(work)
        # wait for the task to complete
        wait([future])
        # check the status of the task after it has completed
        running = future.running()
        cancelled = future.cancelled()
        done = future.done()
        print(f'Task running={running}, cancelled={cancelled}, done={done}')
        # get the exception
        exception = future.exception()
        print(f'Exception={exception}')
        # get the result from the task
        try:
            result = future.result()
        except Exception:
            print('Unable to get the result')
 
if __name__ == '__main__':
    main()

Запуск примера нормально запускает задачу, которая спит в течение одной секунды.

Затем задача выбрасывает исключение, которое перехватывается пулом процессов. Пул процессов сохраняет исключение, и задача завершается.

Мы видим, что после завершения задачи она помечается как не запущенная, не отмененная и выполненная.

Затем мы получаем доступ к исключению из задачи, которое соответствует исключению, которое мы намеренно бросаем.

Попытка получить доступ к результату через функцию result() не удается, и мы ловим то же исключение, что и в задаче.


Task running=False, cancelled=False, done=True
Exception=This is Fake!
Unable to get the result

Обратные вызовы все еще вызываются, если задача вызывает исключение

Нам может быть интересно, если мы зарегистрируем функцию обратного вызова в Future, будет ли она по-прежнему выполняться, если задача вызовет исключение.

Как и следовало ожидать, обратный вызов будет выполнен, даже если задача вызовет исключение.

Мы можем проверить это, обновив предыдущий пример, чтобы зарегистрировать функцию обратного вызова до того, как задача завершится исключением.


# SuperFastPython.com
# example of handling an exception raised within a task that has a callback
from time import sleep
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import wait
 
# callback function to call when a task is completed
def custom_callback(future):
    print('Custom callback was called')
 
# mock task that will sleep for a moment
def work():
    sleep(1)
    raise Exception('This is Fake!')
    return "never gets here"
 
# entry point
def main():
    # create a process pool
    with ProcessPoolExecutor() as executor:
        # execute our task
        future = executor.submit(work)
        # add the custom callback
        future.add_done_callback(custom_callback)
        # wait for the task to complete
        wait([future])
        # check the status of the task after it has completed
        running = future.running()
        cancelled = future.cancelled()
        done = future.done()
        print(f'Task running={running}, cancelled={cancelled}, done={done}')
        # get the exception
        exception = future.exception()
        print(f'Exception={exception}')
        # get the result from the task
        try:
            result = future.result()
        except Exception:
            print('Unable to get the result')
 
if __name__ == '__main__':
    main()

Запуск примера запускает задачу, как и раньше, но на этот раз регистрирует функцию обратного вызова.

Когда задача терпит неудачу с исключением, обратный вызов вызывается немедленно. Затем главный процесс сообщает о статусе неудачной задачи и подробности исключения.


Custom callback was called
Task running=False, cancelled=False, done=True
Exception=This is Fake!
Unable to get the result

Когда использовать ProcessPoolExecutor

Исполнитель ProcessPoolExecutor является мощным и гибким, хотя и не подходит для всех ситуаций, когда необходимо запустить фоновое задание.

В этом разделе мы рассмотрим некоторые общие случаи, когда он хорошо подходит, а когда нет, затем мы рассмотрим широкие классы задач и почему они подходят или не подходят для ProcessPoolExecutor.

Используйте ProcessPoolExecutors, когда...

Пулы процессов лучше всего работают при применении одной и той же чистой функции к набору различных данных (например, однородные задачи, разнородные данные). Это облегчает чтение и отладку кода. Это не правило, а просто мягкое предложение.

Использование нескольких исполнителей ProcessPoolExecutors при...

Пулы процессов могут работать с задачами разных типов (например, разнородные задачи), хотя это может облегчить организацию вашей программы и отладку, если за каждый тип задачи будет отвечать отдельный пул процессов. Это не является правилом, просто мягкое предложение.

Не используйте ProcessPoolExecutors, когда...

Сладкая точка для пулов процессов - это диспетчеризация множества похожих задач, результаты которых могут быть использованы позже в программе. Задачи, которые не вписываются в это обобщение, вероятно, не подходят для пулов процессов. Это не правило, а просто мягкое предложение.

Знаете ли вы другие хорошие или плохие случаи использования ProcessPoolExecutor?
. Дайте мне знать в комментариях ниже.

Использование процессов для задач, связанных с вводом-выводом

Вы можете использовать процессы для задач, связанных с IO, хотя потоки могут быть более подходящим вариантом.

Задача, связанная с IO, - это тип задачи, которая включает чтение с устройства, файла или сокетного соединения или запись в них.

Эти операции связаны с вводом и выводом (IO), и скорость этих операций ограничена устройством, жестким диском или сетевым соединением. Поэтому такие задачи называются IO-bound.

Процессоры действительно быстры. Современные процессоры, например, 4 ГГц, могут выполнять 4 миллиарда инструкций в секунду, и, скорее всего, в вашей системе более одного процессора.

Выполнение ввода-вывода очень медленно по сравнению со скоростью процессоров.

Взаимодействие с устройствами, чтение и запись файлов, а также сокетные соединения подразумевают вызов инструкций в вашей операционной системе (ядре), которая будет ждать завершения операции. Если эта операция является основной для вашего процессора, например, выполняется в главном потоке вашей программы Python, то ваш процессор будет ждать много миллисекунд или даже много секунд, ничего не делая.

Это потенциально миллиарды операций, которые ему не дают выполнить.

Мы можем освободить ЦП от операций, связанных с IO, выполняя операции, связанные с IO, в другом процессе выполнения. Это позволяет ЦП запустить задачу и передать ее операционной системе (ядру) для выполнения ожидания, а самому освободиться для выполнения в другом прикладном процессе.

Под обложкой есть еще кое-что, но суть такова.

Поэтому задачи, которые мы выполняем с помощью ProcessPoolExecutor, могут быть задачами, включающими операции ввода-вывода.

Примеры включают:

Задачи, связанные с процессором

Вероятно, вам следует использовать процессы для задач, привязанных к процессору.

Задача, привязанная к процессору, - это тип задачи, которая включает в себя выполнение вычислений и не предполагает ввода-вывода.

Операции связаны только с данными в основной памяти (ОЗУ) или кэше (кэш процессора) и выполнением вычислений над этими данными или с ними. Ограничением для этих операций является скорость центрального процессора. Вот почему мы называем их задачами, привязанными к процессору.

Примеры включают:

Процессоры очень быстрые, и часто у нас есть более одного процессора. Мы хотели бы выполнять наши задачи и полностью использовать несколько ядер CPU в современном оборудовании.

Использование процессов и пулов процессов через класс ProcessPoolExecutor в Python, вероятно, лучший путь к достижению этой цели.

Обработка исключений исполнителя ProcessPoolExecutor

Обработка исключений является важным моментом при использовании процессов.

Код будет вызывать исключение, когда происходит что-то неожиданное, и это исключение должно быть обработано вашим приложением в явном виде, даже если это означает занести его в журнал и двигаться дальше.

При использовании ProcessPoolExecutor вам может понадобиться рассмотреть три момента обработки исключений, а именно:

Давайте рассмотрим каждый пункт по очереди.

Обработка исключений при инициализации процесса

Вы можете указать пользовательскую функцию инициализации при настройке вашего ProcessPoolExecutor.

Это можно задать через аргумент "initializer" для указания имени функции и "initargs" для указания кортежа аргументов функции.

Каждый процесс, запущенный пулом процессов, будет вызывать вашу функцию инициализации перед запуском процесса.

Если ваша функция инициализации выбросит исключение, это нарушит ваш пул процессов.

Все текущие задачи и любые будущие задачи, выполняемые пулом процессов, не будут выполняться и вызовут исключение BrokenProcessPool.

Мы можем продемонстрировать это на примере надуманной функции инициализатора, которая выбрасывает исключение.


# SuperFastPython.com
# example of an exception in a process pool initializer function
from time import sleep
from random import random
from concurrent.futures import ProcessPoolExecutor
 
# function for initializing the worker process
def initializer_worker():
    # throws an exception
    raise Exception('Something bad happened!', flush=True)
 
# a mock task that sleeps for a random amount of time less than one second
def task(identifier):
    sleep(random())
    # get the unique name
    return identifier
 
# entry point
def main():
    # create a process pool
    with ProcessPoolExecutor(max_workers=2, initializer=initializer_worker) as executor:
        # execute tasks
        for result in executor.map(task, range(10)):
            print(result)
 
if __name__ == '__main__':
    main()

Запуск примера не удался с исключением, как мы и ожидали.

Пул процессов создается как обычно, но как только мы пытаемся выполнить задания, создаются новые рабочие процессы, вызывается пользовательская функция инициализации рабочего процесса и выбрасывается исключение.

Множество процессов пытались запуститься, и, в свою очередь, несколько процессов потерпели неудачу, выдав исключение. Наконец, сам пул процессов выдал сообщение о том, что пул сломан и больше не может быть использован.


Exception in initializer:
Traceback (most recent call last):
  ...
    raise Exception('Something bad happened!', flush=True)
TypeError: Exception() takes no keyword arguments
Exception in initializer:
Traceback (most recent call last):
  ...
    raise Exception('Something bad happened!', flush=True)
TypeError: Exception() takes no keyword arguments
Traceback (most recent call last):
  ...
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

Это подчеркивает, что если вы используете пользовательскую функцию инициализатора, вы должны тщательно продумать исключения, которые могут быть вызваны, и, возможно, обработать их, иначе рискуете всеми задачами, которые зависят от пула процессов.

Обработка исключений во время выполнения задачи

При выполнении вашего задания может возникнуть исключение.

Это приведет к прекращению выполнения задачи, но не приведет к нарушению пула процессов. Вместо этого исключение будет поймано пулом процессов и будет доступно через объект Future, связанный с задачей через функцию exception().

Кроме того, исключение будет повторно поднято, если вы вызовете result() в будущем, чтобы получить результат. Это повлияет на вызовы submit() и map() при добавлении задач в пул процессов.

Это означает, что у вас есть два варианта обработки исключений в задачах; они следующие:

Выполнение исключения внутри задачи

Обработка исключения внутри задачи означает, что вам нужен какой-то механизм, чтобы сообщить получателю результата, что произошло что-то неожиданное.

Это может быть через возвращаемое значение из функции, например, None.

В качестве альтернативы вы можете повторно поднять исключение и попросить получателя обработать его напрямую. Третьим вариантом может быть использование некоторого более широкого состояния или глобального состояния, возможно, передаваемого по ссылке в вызове функции.

В приведенном ниже примере определена рабочая задача, которая вызовет исключение, но поймает исключение и вернет результат, указывающий на случай неудачи.


# SuperFastPython.com
# example of handling an exception raised within a task
from time import sleep
from concurrent.futures import ProcessPoolExecutor
 
# mock task that will sleep for a moment
def work():
    sleep(1)
    try:
        raise Exception('Something bad happened!')
    except Exception:
        return 'Unable to get the result'
    return "never gets here"
 
# entry point
def main():
    # create a process pool
    with ProcessPoolExecutor() as executor:
        # execute our task
        future = executor.submit(work)
        # get the result from the task
        result = future.result()
        print(result)
 
if __name__ == '__main__':
    main()

Запуск примера запускает пул процессов, как обычно, выдает задание, затем блокирует его, ожидая результата.

Задача вызывает исключение, и результатом является сообщение об ошибке.

Этот подход достаточно чист для кода получателя и подходит для задач, выдаваемых как submit(), так и map(). Это может потребовать специальной обработки пользовательского возвращаемого значения для случая неудачи.


Unable to get the result

Обработка исключения получателем результата задачи

Альтернативой обработке исключения в задаче является передача ответственности получателю результата.

Это может показаться более естественным решением, поскольку оно соответствует синхронной версии той же операции, например, если бы мы выполняли вызов функции в цикле for.

Это означает, что получатель должен знать о типах ошибок, которые может вызвать задача, и обрабатывать их явно.

Нижеприведенный пример определяет простую задачу, которая вызывает Exception, который затем обрабатывается получателем при попытке получить результат от вызова функции.


# SuperFastPython.com
# example of handling an exception raised within a task
from time import sleep
from concurrent.futures import ProcessPoolExecutor
 
# mock task that will sleep for a moment
def work():
    sleep(1)
    raise Exception('Something bad happened!')
    return "never gets here"
 
# entry point
def main():
    # create a process pool
    with ProcessPoolExecutor() as executor:
        # execute our task
        future = executor.submit(work)
        # get the result from the task
        try:
            result = future.result()
        except Exception:
            print('Unable to get the result')
 
if __name__ == '__main__':
    main()

Запуск примера создает пул процессов и отправляет работу в обычном режиме. Задание завершается с ошибкой; пул процессов ловит исключение, сохраняет его, а затем повторно бросает его, когда мы вызываем функцию result() в будущем.

Получатель результата принимает исключение и перехватывает его, сообщая о случае отказа.


Unable to get the result

Мы также можем проверить наличие исключения напрямую с помощью вызова функции exception() на объекте Future. Эта функция блокируется до тех пор, пока не произойдет исключение, и берет таймаут, как и вызов функции result().

Если исключение никогда не возникает и задача отменяется или успешно завершается, то exception() вернет значение None.

Мы можем продемонстрировать явную проверку на исключительный случай в задаче в примере ниже.


# SuperFastPython.com
# example of handling an exception raised within a task
from time import sleep
from concurrent.futures import ProcessPoolExecutor
 
# mock task that will sleep for a moment
def work():
    sleep(1)
    raise Exception('Something bad happened!')
    return "never gets here"
 
# entry point
def main():
    # create a process pool
    with ProcessPoolExecutor() as executor:
        # execute our task
        future = executor.submit(work)
        # get the result from the task
        exception = future.exception()
        # handle exceptional case
        if exception:
             print(exception)
        else:
            result = future.result()
            print(result)
 
if __name__ == '__main__':
    main()

Запуск примера создает и отправляет работу в обычном режиме.

Получатель проверяет наличие исключительного случая, который блокируется до тех пор, пока не будет получено исключение или задача не будет выполнена. Исключение получено и обрабатывается путем сообщения о нем.


Something bad happened!

Мы не можем проверить функцию exception() объекта Future для каждой задачи, поскольку map() не предоставляет доступ к объектам Future.

<<<Еще хуже то, что подход к обработке исключения в получателе не может быть использован при использовании

map() для отправки задач, если только вы не обернете всю итерацию.

Мы можем продемонстрировать это, представив одну задачу с помощью map(), которая случайно вызывает Exception.

Полный пример приведен ниже.


# SuperFastPython.com
# example of handling an exception raised within a task
from time import sleep
from concurrent.futures import ProcessPoolExecutor
 
# mock task that will sleep for a moment
def work(value):
    sleep(1)
    raise Exception('Something bad happened!')
    return f'Never gets here {value}'
 
# entry point
def main():
    # create a process pool
    with ProcessPoolExecutor() as executor:
        # execute our task
        for result in executor.map(work, [1]):
            print(result)
 
if __name__ == '__main__':
    main()

Запуск примера приводит к выполнению единственной задачи (плохое использование map()) и ожиданию первого результата.

Задача выбрасывает исключение и главный поток и процесс завершается, как мы и ожидали.


concurrent.futures.process._RemoteTraceback:
...
Exception: Something bad happened!
"""
 
The above exception was the direct cause of the following exception:
 
Traceback (most recent call last):
...
Exception: Something bad happened!

Это подчеркивает, что если map() используется для отправки задач в пул процессов, то задачи должны обрабатывать свои собственные исключения или быть достаточно простыми, чтобы исключения не ожидались.

Обработка исключений при обратных вызовах завершения задачи

Последний случай, который мы должны рассмотреть для обработки исключений при использовании ProcessPoolExecutor - это функции обратного вызова.

При выдаче заданий в пул процессов вызовом submit(), мы получаем в ответ объект Future, на котором мы можем зарегистрировать функции обратного вызова для вызова при завершении задания с помощью функции add_done_callback().

Это позволяет зарегистрировать одну или несколько функций обратного вызова, которые будут выполняться в том порядке, в котором они зарегистрированы.

Эти обратные вызовы вызываются всегда, даже если задача отменена или не выполнена с исключением.

Обратный вызов может завершиться с исключением, и это не повлияет на другие зарегистрированные функции обратного вызова или задачу.

Исключение перехватывается пулом процессов, регистрируется как сообщение типа исключения, и процедура продолжает работу. В некотором смысле, обратные вызовы могут работать бесшумно.

Мы можем продемонстрировать это на примере с несколькими функциями обратного вызова, первая из которых вызовет исключение.


# SuperFastPython.com
# add callbacks to a future, one of which throws an exception
from time import sleep
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import wait
 
# callback function to call when a task is completed
def custom_callback1(future):
    raise Exception('Something bad happened!')
    # never gets here
    print('Callback 1 called.', flush=True)
 
# callback function to call when a task is completed
def custom_callback2(future):
    print('Callback 2 called.', flush=True)
 
# mock task that will sleep for a moment
def work():
    sleep(1)
    return 'Task is done'
 
# entry point
def main():
    # create a process pool
    with ProcessPoolExecutor() as executor:
        # execute the task
        future = executor.submit(work)
        # add the custom callbacks
        future.add_done_callback(custom_callback1)
        future.add_done_callback(custom_callback2)
        # wait for the task to complete and get the result
        result = future.result()
        # wait for callbacks to finish
        sleep(0.1)
        print(result)
 
if __name__ == '__main__':
    main()

Запуск примера запускает пул процессов как обычно и выполняет задание.

После завершения задачи вызывается первый обратный вызов, который завершается с исключением. Исключение записывается в журнал и выводится на консоль (поведение по умолчанию для ведения журнала).

Пул процессов не нарушен и продолжает работу.

Второй обратный вызов выполняется успешно, и, наконец, главный поток получает результат выполнения задания.


exception calling callback for <Future at 0x10dc8a9a0 state=finished returned str>
...
    raise Exception('Something bad happened!')
Exception: Something bad happened!
Callback 2 called.
Task is done

Это подчеркивает, что если ожидается, что обратный вызов вызовет исключение, то оно должно обрабатываться явно и проверяться, если вы хотите, чтобы отказ повлиял на саму задачу.

Как работает ProcessPoolExecutor внутри?

Важно сделать небольшую паузу и посмотреть, как внутренне работает ProcessPoolExecutor.

Внутренняя работа класса влияет на то, как мы используем пул процессов и на поведение, которое мы можем ожидать, в частности, на отмену задач.

Без этих знаний некоторые действия пула процессов со стороны могут показаться непонятными или даже ошибочными.

Вы можете посмотреть исходный код для ProcessPoolExecutor и базового класса здесь:

Многое мы могли бы узнать о том, как работает пул процессов внутри компании, но мы ограничимся наиболее важными аспектами.

Внутренняя работа класса ProcessPoolExecutor требует рассмотрения двух аспектов: как задания отправляются в пул и как создаются рабочие процессы.

Задания добавляются во внутренние очереди

<

В источнике есть ASCII диаграмма, которую мы можем воспроизвести здесь, что поможет:


|======================= In-process =====================|== Out-of-process ==|
+----------+     +----------+       +--------+     +-----------+    +---------+
|          |  => | Work Ids |       |        |     | Call Q    |    | Process |
|          |     +----------+       |        |     +-----------+    |  Pool   |
|          |     | ...      |       |        |     | ...       |    +---------+
|          |     | 6        |    => |        |  => | 5, call() | => |         |
|          |     | 7        |       |        |     | ...       |    |         |
| Process  |     | ...      |       | Local  |     +-----------+    | Process |
|  Pool    |     +----------+       | Worker |                      |  #1..n  |
| Executor |                        | Thread |                      |         |
|          |     +----------- +     |        |     +-----------+    |         |
|          | <=> | Work Items | <=> |        | <=  | Result Q  | <= |         |
|          |     +------------+     |        |     +-----------+    |         |
|          |     | 6: call()  |     |        |     | ...       |    |         |
|          |     |    future  |     |        |     | 4, result |    |         |
|          |     | ...        |     |        |     | 3, except |    |         |
+----------+     +------------+     +--------+     +-----------+    +---------+

Поток данных для отправки новых задач в пул выглядит следующим образом.

Сначала мы вызываем submit() на пуле, чтобы добавить новую задачу. Обратите внимание, что внутренний вызов map() вызовет функцию submit().

Это добавит элемент работы во внутренний словарь, а именно экземпляр внутреннего класса с именем _WorkItem. Каждый рабочий элемент также имеет уникальный идентификатор, который отслеживается во внутренней очереди Queue.

В пуле процессов есть внутренний рабочий поток, который отвечает за подготовку работы для рабочих процессов. Он просыпается, когда в очередь идентификаторов работ добавляется новая работа, извлекает новую задачу и передает ее во вторую очередь.

Эта вторая очередь содержит задачи, которые должны быть выполнены рабочими процессами. Результаты функций целевых задач затем помещаются в очередь результатов, которая считывается рабочим потоком и становится доступной для любых связанных с ним объектов Future.

Это разделение между очередью, с которой взаимодействует пользователь, и очередью задач, с которой взаимодействуют процессы, является намеренным и обеспечивает некоторую меру контроля и безопасности.

Важным аспектом этих внутренних компонентов с точки зрения пользователя является отмена задач через их объекты Future.

Задачи во внутренней рабочей очереди могут быть отменены. Иные задачи не могут быть отменены, например, выполняющиеся задачи. Кроме того, задачи во внутренней очереди вызовов не могут быть отменены, но могут быть еще не запущены.

Это означает, что мы можем запросить статус объекта Future и увидеть, что он не запущен и не выполнен. Затем попытаться отменить его, потерпеть неудачу и увидеть, что задача начинает выполняться, а затем выполняется.

Это происходит потому, что некоторое количество задач будет сидеть в очереди вызовов, ожидая, когда они будут потреблены процессами.

В частности, очередь вызовов будет соответствовать количеству рабочих процессов плюс один.

Рабочие процессы создаются по мере необходимости

Рабочие процессы не создаются при создании пула процессов.

Вместо этого рабочие процессы создаются по требованию или точно в срок.

Каждый раз, когда задание добавляется во внутренние очереди, пул процессов проверяет, не меньше ли количество активных процессов, чем верхний предел поддерживаемых пулом процессов. Если да, то создается дополнительный процесс для обработки новой работы.

После того, как процесс выполнил задание, он будет ждать в очереди вызовов поступления новой работы. По мере поступления новой работы все процессы, ожидающие в очереди, будут уведомлены, и один из них попытается поглотить единицу работы и начать ее выполнение.

Также показано, что пул процессов не будет освобождать рабочие процессы после фиксированного количества единиц работы. Возможно, это было бы хорошим дополнением к API в будущем.

Теперь, когда мы понимаем, как работа вводится в пул процессов и как пул управляет процессами, давайте рассмотрим некоторые лучшие практики, которые следует учитывать при использовании ProcessPoolExecutor.

ProcessPoolExecutor Best Practices

Теперь, когда мы знаем, как работает ProcessPoolExecutor и как его использовать, давайте рассмотрим некоторые лучшие практики, которые следует учитывать при внедрении пулов процессов в наши программы на Python.

Чтобы упростить ситуацию, существует пять лучших практик; они следующие:

Используйте диспетчер контекстов

Используйте контекстный менеджер при использовании пулов процессов и обрабатывайте всю диспетчеризацию задач к пулу процессов и результаты обработки внутри менеджера.

Например:


...
# create a process pool via the context manager
with ProcessPoolExecutor(10) as executor:
# ...

Не забудьте настроить пул процессов при его создании в менеджере контекста, в частности, установив количество процессов, используемых в пуле.

Использование контекстного менеджера позволяет избежать ситуации, когда вы явно инстанцировали пул процессов и забыли выключить его вручную, вызвав shutdown().

Это также меньше кода и лучше сгруппировано, чем управление инстанцированием и выключением вручную; например:


...
# create a process pool manually
executor = ProcessPoolExecutor(10)
# ...
executor.shutdown()

Не используйте менеджер контекста, когда вам нужно отправлять задачи и получать результаты в более широком контексте (например, для нескольких функций) и/или когда у вас больше контроля над выключением пула.

Используйте map() для асинхронных For-Loops

Если у вас есть цикл for, который применяет функцию к каждому элементу списка, то используйте функцию map() для асинхронной диспетчеризации задач.

Например, у вас может быть цикл for по списку, который вызывает myfunc() для каждого элемента:


...
# apply a function to each item in an iterable
for item in mylist:
result = myfunc(item)
# do something...

Или, возможно, вы уже используете встроенную функцию map():


...
# apply a function to each item in an iterable
for result in map(myfinc, mylist):
# do something...

В обоих случаях можно сделать процесс асинхронным, используя функцию map() на пуле процессов.


...
# apply a function to each item in a iterable asynchronously
for result in executor.map(myfunc, mylist):
# do something...

Пожалуй, не используйте функцию map(), если ваша целевая функция задачи имеет побочные эффекты.

Не используйте функцию map(), если ваша целевая функция задачи не имеет аргументов или имеет более одного аргумента, если только все аргументы не могут быть получены из параллельных итераций (т.е. map() может принимать несколько итераций).

Не используйте функцию map(), если вам нужен контроль над обработкой исключений для каждой задачи, или если вы хотите получать результаты для задач в порядке их выполнения.

Использовать submit() С as_completed()

Если вы хотите обрабатывать результаты в порядке выполнения заданий, а не в порядке подачи заданий, то используйте submit() и as_completed().

Функция submit() находится в пуле процессов и используется для проталкивания задач в пул для выполнения и возвращается сразу с объектом Future для задачи. Функция as_completed() - это метод модуля, который принимает итерабельную последовательность объектов Future, подобно списку, и возвращает объекты Future по мере выполнения задач.

Например:


...
# submit all tasks and get future objects
futures = [executor.submit(myfunc, item) for item in mylist]
# process results from tasks in order of task completion
for future in as_completed(futures):
# get the result
result = future.result()
# do something...

Не используйте комбинацию submit() и as_completed(), если вам нужно обработать результаты в том порядке, в котором задачи были поданы в пул процессов; Вместо этого используйте map() или submit() и итерируйте объекты Future напрямую.

Не используйте комбинацию submit() и as_completed(), если вам нужны результаты всех задач для продолжения; вместо этого лучше использовать функцию модуля wait().

Не используйте комбинацию submit() и as_completed() для простого асинхронного цикла for; вместо этого лучше использовать map().

Использование независимых функций в качестве задач

Используйте ProcessPoolExecutor, если ваши задачи независимы.

Это означает, что каждая задача не зависит от других задач, которые могут выполняться в одно и то же время. Это также может означать, что задачи не зависят от каких-либо данных, кроме данных, предоставляемых через аргументы функции для задачи.

Исполнитель ProcessPoolExecutor идеально подходит для задач, которые не изменяют никаких данных, например, не имеют побочных эффектов, так называемые чистые функции.

Пулы процессов могут быть организованы в потоки данных и конвейеры для линейной зависимости между задачами, возможно, с одним пулом процессов для каждого типа задач.

Пул процессов не предназначен для задач, требующих координации; вам следует рассмотреть возможность использования класса Process и шаблонов координации, таких как Barrier и Semaphore.

Пулы процессов не предназначены для задач, требующих синхронизации, вам следует рассмотреть возможность использования класса Process и шаблонов блокировки, таких как Lock и RLock через Manager.

Использование для задач, связанных с ЦП (возможно)

Исполнитель ProcessPoolExecutor может использоваться для задач, связанных с IO, и задач, связанных с CPU.

Тем не менее, он, вероятно, лучше всего подходит для задач, привязанных к процессору, в то время как ThreadPoolExecutor, вероятно, лучше всего подходит для задач, привязанных к IO.

Задачи, связанные с процессором, - это задачи, которые предполагают прямые вычисления, например, выполнение инструкций над данными в процессоре. Они связаны со скоростью выполнения процессора, отсюда и название CPU-bound.

Это отличие от задач, связанных с IO, которые должны ждать внешних ресурсов, таких как чтение или запись в или из сетевых соединений и файлов.

Примеры распространенных задач, привязанных к процессору, которые могут хорошо подходить для ProcessPoolExecutor включают:

ProcessPoolExecutor можно использовать для задач, связанных с IO, но это, вероятно, менее подходящий вариант по сравнению с использованием потоков и ThreadPoolExecutor.

Это происходит по двум причинам:

Количество процессов, которые вы можете создавать и управлять ими, часто весьма ограничено, например, десятки или менее 100.

В то время как при использовании потоков вы можете иметь сотни или даже тысячи потоков в рамках одного процесса. Это полезно для операций ввода-вывода, при которых требуется одновременный доступ или управление большим количеством соединений или ресурсов.

Это может быть доведено до десятков тысяч соединений или ресурсов или даже выше при использовании AsyncIO.

Задачи, связанные с ИТ, обычно предполагают чтение или запись большого количества данных.

Это могут быть данные, считываемые или записываемые с или на удаленные соединения, базы данных, серверы, файлы, внешние устройства и так далее.

Поэтому, если данные должны быть общими между процессами, например, в конвейере, может потребоваться сериализация данных (называемая pickled, встроенный процесс сериализации Python) для передачи от процесса к процессу. Это может быть медленным и требовать много памяти, особенно для больших объемов данных.

Это не так, когда используются потоки, которые могут совместно использовать и обращаться к одному и тому же ресурсу в памяти без сериализации данных.

Общие ошибки при использовании ProcessPoolExecutor

При использовании ProcessPoolExecutor возникает ряд распространенных ошибок.

Эти ошибки обычно возникают из-за ошибок, вносимых копированием и вставкой кода, или из-за небольшого непонимания того, как работает ProcessPoolExecutor.

Мы рассмотрим некоторые наиболее распространенные ошибки, допускаемые при использовании ProcessPoolExecutor, такие как:

У вас возникла ошибка при использовании ProcessPoolExecutor? Дайте мне знать в комментариях, чтобы я мог порекомендовать исправление и добавить случай в этот раздел.

Забывая __main__

На сегодняшний день самой большой ошибкой при использовании ProcessPoolExecutor является забывание проверить наличие модуля __main__.

Напомним, что при использовании процессов в Python, таких как класс Process или ProcessPoolExecutor, мы должны включить проверку на наличие среды верхнего уровня. Это достигается путем проверки, равно ли имя модуля __name__ строке '__main__'.

Это указывает на то, что код выполняется в среде кода верхнего уровня, а не импортируется программой или сценарием.

Например:


# entry point
if __name__ == '__main__':
    # ...

Вы можете узнать больше о __main__ в более общем виде здесь:

Забыв о главной функции, вы получите ошибку, которая может быть весьма запутанной.

Ниже приведен полный пример использования ProcessPoolExecutor без проверки модуля __main__.


# SuperFastPython.com
# example of not having a check for the main top-level environment
from time import sleep
from random import random
from concurrent.futures import ProcessPoolExecutor
 
# custom task that will sleep for a variable amount of time
def task(value):
    # sleep for less than a second
    sleep(random())
    return value
 
# start the process pool
with ProcessPoolExecutor() as executor:
    # submit all tasks
    for result in executor.map(task, range(5)):
        print(result)

Запуск этого примера завершится ошибкой RuntimeError.


Traceback (most recent call last):
...
RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.
 
        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:
 
            if __name__ == '__main__':
                freeze_support()
                ...
 
        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.
Traceback (most recent call last):
...
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
...
UserWarning: resource_tracker: There appear to be 5 leaked semaphore objects to clean up at shutdown
  warnings.warn('resource_tracker: There appear to be %d '

Сообщение об ошибке содержит информацию о необходимости импортировать точку входа в программу, а также комментарии о freeze_support, которые могут запутать новичков, а также BrokenProcessPool.

Эту ошибку можно исправить, защитив точку входа в программу с помощью if-выражения:


if __name__ == '__main__':
# ...

Использование вызова функции в submit()

Частой ошибкой является вызов вашей функции при использовании функции submit().

Например:


...
# submit the task
future = executor.submit(task())

Ниже приведен полный пример с этой ошибкой.


# SuperFastPython.com
# example of calling submit with a function call
from time import sleep
from random import random
from concurrent.futures import ProcessPoolExecutor
 
# custom task that will sleep for a variable amount of time
def task():
    # sleep for less than a second
    sleep(random())
    return 'all done'
 
# entry point
def main():
    # start the process pool
    with ProcessPoolExecutor() as executor:
        # submit the task
        future = executor.submit(task())
        # get the result
        result = future.result()
        print(result)
 
if __name__ == '__main__':
    main()

Запуск этого примера завершится с ошибкой.


concurrent.futures.process._RemoteTraceback:
"""
Traceback (most recent call last):
...
TypeError: 'str' object is not callable
"""
 
The above exception was the direct cause of the following exception:
 
Traceback (most recent call last):
...
TypeError: 'str' object is not callable

Вы можете исправить ошибку, изменив вызов submit() так, чтобы он принимал имя вашей функции и любые аргументы, вместо вызова функции в вызове submit.

Например:


...
# submit the task
future = executor.submit(task)

Использование вызова функции в map()

Частой ошибкой является вызов вашей функции при использовании функции map().

Например:


...
# submit all tasks
for result in executor.map(task(), range(5)):
    print(result)

Ниже приведен полный пример с этой ошибкой.


# SuperFastPython.com
# example of calling map with a function call
from time import sleep
from random import random
from concurrent.futures import ProcessPoolExecutor
 
# custom task that will sleep for a variable amount of time
def task(value):
    # sleep for less than a second
    sleep(random())
    return value
 
# entry point
def main():
    # start the process pool
    with ProcessPoolExecutor() as executor:
        # submit all tasks
        for result in executor.map(task(), range(5)):
            print(result)
 
if __name__ == '__main__':
    main()

Запуск примера приводит к ошибке TypeError.


Traceback (most recent call last):
  ...
TypeError: task() missing 1 required positional argument: 'value'

Эту ошибку можно исправить, изменив вызов map() для передачи имени функции целевой задачи вместо вызова функции.


...
# submit all tasks
for result in executor.map(task, range(5)):
    print(result)

Неправильная сигнатура функции для map()

Еще одна распространенная ошибка при использовании map() - отсутствие второго аргумента функции, например, итерабельной таблицы.

Например:


...
# submit all tasks
for result in executor.map(task):
    print(result)

Ниже приведен полный пример с этой ошибкой.


# SuperFastPython.com
# example of calling map without an iterable
from time import sleep
from random import random
from concurrent.futures import ProcessPoolExecutor
 
# custom task that will sleep for a variable amount of time
def task(value):
    # sleep for less than a second
    sleep(random())
    return value
 
# entry point
def main():
    # start the process pool
    with ProcessPoolExecutor() as executor:
        # submit all tasks
        for result in executor.map(task):
            print(result)
 
if __name__ == '__main__':
    main()

Запуск примера не выдает никаких заданий пулу процессов, поскольку не было итерабельной функции map() для итерации.

В этом случае вывод не отображается.

Исправление заключается в предоставлении итерабельной переменной в вызове map() вместе с именем вашей функции.


...
# submit all tasks
for result in executor.map(task, range(5)):
    print(result)

Неправильная сигнатура функции для будущих обратных вызовов

Еще одна распространенная ошибка - забыть включить Future в сигнатуру для функции обратного вызова, зарегистрированной с объектом Future.

Например:


...
# callback function to call when a task is completed
def custom_callback():
    print('Custom callback was called')

Ниже приведен полный пример с этой ошибкой.


# SuperFastPython.com
# example of the wrong signature on the callback function
from time import sleep
from concurrent.futures import ProcessPoolExecutor
 
# callback function to call when a task is completed
def custom_callback():
    print('Custom callback was called', flush=True)
 
# mock task that will sleep for a moment
def work():
    sleep(1)
    return 'Task is done'
 
# entry point
def main():
    # create a process pool
    with ProcessPoolExecutor() as executor:
        # execute the task
        future = executor.submit(work)
        # add the custom callback
        future.add_done_callback(custom_callback)
        # get the result
        result = future.result()
        print(result)
 
if __name__ == '__main__':
    main()

Запуск этого примера приведет к ошибке, когда обратный вызов будет вызван пулом потоков.


Task is done
exception calling callback for <Future at 0x10a05f190 state=finished returned str>
Traceback (most recent call last):
...
TypeError: custom_callback() takes 0 positional arguments but 1 was given

Исправление этой ошибки заключается в обновлении сигнатуры вашей функции обратного вызова, чтобы включить в нее объект будущего.


...
# callback function to call when a task is completed
def custom_callback(future):
    print('Custom callback was called')

Аргументы или общие данные, которые не пикнули

Частой ошибкой является обмен данными между процессами, которые не могут быть сериализованы.

Python имеет встроенный процесс сериализации объектов , называемый pickle, в котором объекты пикируются или непикируются при сериализации и несериализации.

При совместном использовании данных между процессами, данные будут пикироваться автоматически.

Это включает аргументы, передаваемые функциям целевой задачи, данные, возвращаемые функциями целевой задачи, и данные, доступ к которым осуществляется напрямую, например, глобальные переменные.

Если данные, разделяемые между процессами, не могут быть автоматически травлены, будет выдана ошибка PicklingError.

Большинство обычных объектов Python можно травить.

Примерами объектов, которые нельзя мариновать, являются те, которые могут иметь открытое соединение, например, с файлом, базой данных, сервером и т.п.

Мы можем продемонстрировать это на примере ниже, который пытается передать хэндл файла в качестве аргумента функции целевой задачи.


# SuperFastPython.com
# example of an argument that does not pickle
from concurrent.futures import ProcessPoolExecutor
 
# write to a file
def work(file):
    file.write('hi there')
    return "All done!"
 
# entry point
def main():
    # submit the task
    with open('tmp.txt', 'w') as file:
        # start the process pool
        with ProcessPoolExecutor() as executor:
            # submit the task
            future = executor.submit(work, file)
            # get the result
            result = future.result()
            print(result)
 
if __name__ == '__main__':
    main()

Запустив пример, мы видим, что он завершается с ошибкой, указывающей на то, что аргумент не может быть обработан для передачи рабочему процессу.


concurrent.futures.process._RemoteTraceback:
...
TypeError: cannot pickle '_io.TextIOWrapper' object
"""
 
The above exception was the direct cause of the following exception:
 
Traceback (most recent call last):
  ...
TypeError: cannot pickle '_io.TextIOWrapper' object

Это был надуманный пример, в котором файл мог быть открыт в функции целевой задачи или ProcessPoolExecutor мог быть изменен на ThreadPoolExecutor.

В общем, если вы столкнулись с этой ошибкой и пытаетесь передать соединение или открытый файл, возможно, попробуйте открыть соединение внутри задачи или использовать потоки вместо процессов.

Если вы столкнулись с этим типом ошибки при использовании пользовательских типов данных, которые передаются по кругу, вам может понадобиться реализовать код для ручной сериализации и десериализации ваших типов. Я рекомендую прочитать документацию к модулю pickle..

Не удаляется print() Statements

Распространенной ошибкой является отсутствие промывки стандартного выхода (stdout) при вызове встроенного оператора print() из функций целевой задачи.

По умолчанию встроенный оператор print() в Python не промывает вывод.

Стандартный поток вывода (stout) будет автоматически промываться в основном процессе, часто при заполнении внутреннего буфера или обнаружении новой строки. Это означает, что вы увидите, как ваши операторы печати сообщают о себе почти сразу после вызова функции print в коде.

Существует проблема при вызове функции print() из порожденных или вилочных процессов, поскольку стандартный выход (stdout) по умолчанию буферизирует вывод.

Это означает, что если вы вызываете print() из функций целевых задач в ProcessPoolExecutor, вы, вероятно, не увидите операторов печати на стандартном выходе, пока рабочие процессы не будут закрыты.

Это будет сбивать с толку, потому что будет выглядеть так, будто ваша программа работает неправильно, например, buggy.

Нижеприведенный пример демонстрирует это на примере функции целевой задачи, которая будет вызывать print() для сообщения о некотором состоянии.


# SuperFastPython.com
# example of not flushing output when call print() from tasks in new processes
from time import sleep
from random import random
from concurrent.futures import ProcessPoolExecutor
 
# custom task that will sleep for a moment
def task(value):
    sleep(value)
    print(f'Done: {value}')
 
# entry point
def main():
    # start the process pool
    with ProcessPoolExecutor() as executor:
        executor.map(task, range(5))
    print('All done!')
 
if __name__ == '__main__':
    main()

Запуск примера будет ждать, пока все задачи в пуле процессов завершатся, прежде чем печатать все сообщения на стандартном выходе.


Done: 0
Done: 1
Done: 2
Done: 3
Done: 4
All done!

Это можно исправить, обновив все вызовы оператора print(), вызываемые из функций целевой задачи, чтобы промывать вывод после каждого вызова.

Этого можно достичь, установив аргумент "flush" в значение True; например:


...
# report progress and flush the stream
print(f'Done: {value}', flush=True)

Общие вопросы при использовании ProcessPoolExecutor

Этот раздел отвечает на распространенные вопросы, задаваемые разработчиками при использовании ProcessPoolExecutor.

У вас есть вопрос по ProcessPoolExecutor?
Задайте свой вопрос в комментариях ниже и я сделаю все возможное, чтобы ответить на него и, возможно, добавить его в этот список вопросов.

Как остановить запущенную задачу?

Задание в ProcessPoolExecutor может быть отменено до начала его выполнения.

В этом случае задача должна быть отправлена в пул вызовом функции submit(), которая возвращает объект Future. Затем можно вызвать функцию cancel() в объекте Future.

Если задача уже запущена, она не может быть отменена, остановлена или завершена пулом процессов.

Вместо этого вы должны добавить эту функциональность в свою задачу.

Одним из подходов может быть использование флага, например, multiprocessing.Event object, который при установке будет указывать на то, что все задачи должны прекратить выполнение, как только смогут. Затем вы можете обновить функцию или функции целевой задачи, чтобы часто проверять состояние этого флага.

Мы не можем использовать Event напрямую; вместо этого мы должны использовать Manager для создания экземпляра Event, затем поделиться экземпляром с каждой задачей, чтобы рабочие процессы могли получить к нему правильный доступ. Manager создаст серверный процесс для управления межпроцессной координацией совместно используемых объектов, таких как Event.

Этого можно достичь, если сначала создать Manager, затем создать событие из менеджера, используя технику контекстного менеджера, чтобы Manager правильно закрылся, когда мы закончим с ним.


...
# create the manager
with Manager() as manager:
    # create an event to shut down all running tasks
    event = manager.Event()
    # ...

Затем мы можем передать событие в функцию целевой задачи и часто проверять его.

Возможно, вам придется изменить структуру задания.

Например, если ваша задача читает данные из файла или сокета, вам может понадобиться изменить операцию чтения, чтобы она выполнялась блоками данных в цикле, чтобы на каждой итерации цикла вы могли проверять состояние флага.

В примере ниже представлен шаблон, который можно использовать для добавления флага события в функцию целевой задачи для проверки условия остановки, чтобы выключить все текущие задачи.

Ниже приведен пример, демонстрирующий это на примере работы.


# SuperFastPython.com
# example of stopping running tasks using an event
from time import sleep
from multiprocessing import Manager
from concurrent.futures import ProcessPoolExecutor
 
# mock target task function
def work(event):
    # pretend read data for a long time
    for _ in range(100):
        # pretend to read some data
        sleep(1)
        # check the status of the flag
        if event.is_set():
            # shut down this task now
            print('Not done, asked to stop', flush=True)
            return
    return "All done!"
 
# entry point
def main():
    # create the manager to coordinate shared objects like the event
    with Manager() as manager:
        # create an event to shut down all running tasks
        event = manager.Event()
        # create a process pool
        executor = ProcessPoolExecutor()
        # execute all of our tasks
        futures = [executor.submit(work, event) for _ in range(50)]
        # wait a moment
        print('Tasks are running...')
        sleep(2)
        # cancel all scheduled tasks
        print('Cancelling all scheduled tasks...')
        for future in futures:
            future.cancel()
        # stop all currently running tasks
        print('Trigger all running tasks to stop...')
        event.set()
        # shutdown the process pool and wait for all tasks to complete
        print('Shutting down, waiting for all tasks...')
        executor.shutdown()
 
if __name__ == '__main__':
    main()

При запуске примера сначала создаются Manager и Event, затем пул процессов с количеством рабочих процессов по умолчанию и планируются 50 заданий.

Объект Event передается в каждую задачу, где он проверяется каждую итерацию, чтобы увидеть, был ли он установлен, и если да, то для выхода из задачи.

Первые 9 задач начинают выполняться в течение нескольких секунд, затем мы решаем все выключить.

Примечание: в данном случае у нас 8 CPU и 1 задача извлечения добавляется к внутреннему вызову Queue и не может быть отменена. Смотрите раздел о том, как внутренне работает ProcessPoolExecutor, чтобы понять, почему так происходит.

Сначала мы отменяем все запланированные задачи, которые еще не запущены, чтобы, если они попадут из очереди в рабочий процесс, они не начали выполняться.

Затем мы установим событие, которое вызовет остановку всех запущенных задач.

Затем пул процессов выключается, и мы ждем, пока все запущенные процессы завершат свое выполнение.

Запущенные процессы проверяют статус события на следующей итерации цикла и выходят из него, печатая сообщение.


Tasks are running...
Cancelling all scheduled tasks...
Trigger all running tasks to stop...
Shutting down, waiting for all tasks...
Not done, asked to stop
Not done, asked to stop
Not done, asked to stop
Not done, asked to stop
Not done, asked to stop
Not done, asked to stop
Not done, asked to stop
Not done, asked to stop
Not done, asked to stop

Как дождаться завершения всех задач?

Существует несколько способов дождаться завершения всех задач в ProcessPoolExecutor.

Во-первых, если у вас есть объект Future для всех задач в пуле процессов, поскольку вы вызвали submit(), то вы можете предоставить коллекцию задач функции модуля wait(). По умолчанию эта функция вернется, когда все предоставленные объекты Future завершатся.


...
# wait for all tasks to complete
wait(futures)

В качестве альтернативы можно перечислить список объектов Future и попытаться получить результат от каждого из них. Эта итерация завершится, когда все результаты будут доступны, что означает, что все задачи были выполнены.


...
# wait for all tasks to complete by getting all results
for future in futures:
result = future.result()
# all tasks are complete

Другой подход заключается в отключении пула процессов. Мы можем установить значение "cancel_futures" в True, что приведет к отмене всех запланированных задач и ожиданию завершения всех текущих запущенных задач.


...
# shutdown the pool, cancels scheduled tasks, returns when running tasks complete
executor.shutdown(wait=True, cancel_futures=True)

Можно также выключить пул и не отменять запланированные задачи, но при этом дождаться завершения всех задач. Это обеспечит завершение всех запущенных и запланированных задач до возвращения функции. Это поведение функции shutdown по умолчанию, но неплохо было бы указать его явно.


...
# shutdown the pool, returns after all scheduled and running tasks complete
executor.shutdown(wait=True, cancel_futures=False)

Как динамически изменить количество процессов

Вы не можете динамически увеличивать или уменьшать количество процессов в ProcessPoolExecutor.

Количество процессов фиксировано, если в вызове конструктора объекта настроен ProcessPoolExecutor. Например:


...
# configure a process pool
executor = ProcessPoolExecutor(4)

Как объединить задачи и пулы процессов?

Вы можете проводить модульное тестирование функций целевой задачи напрямую, возможно, подражая любым необходимым внешним ресурсам.

Вы можете провести модульное тестирование использования пула процессов с помощью макетных задач, которые не взаимодействуют с внешними ресурсами.

Unit-тестирование задач и самого пула процессов должно быть рассмотрено как часть вашего проекта и может потребовать, чтобы соединение с ресурсом IO было конфигурируемым, чтобы его можно было имитировать, и чтобы целевая функция задачи, вызываемая вашим пулом процессов, была конфигурируемой, чтобы ее можно было имитировать.

Как сравнить последовательную и параллельную производительность?

Вы можете сравнить производительность вашей программы с пулом процессов и без него.

Это может быть полезным упражнением для подтверждения того, что использование ProcessPoolExecutor в вашей программе привело к увеличению скорости.

Возможно, самый простой подход заключается в том, чтобы вручную записать время начала и окончания выполнения кода и вычесть время окончания из времени начала, чтобы сообщить общее время выполнения. Затем запишите время с использованием и без использования пула процессов.


# SuperFastPython.com
# example of recording the execution time of a program
import time
 
# record the start time
start_time = time.time()
# do work with or without a process pool
# ....
time.sleep(3)
# record the end time
end_time = time.time()
# report execution time
total_time = end_time - start_time
print(f'Execution time: {total_time:.1f} seconds.')

Использование среднего времени выполнения программы может дать более стабильное время выполнения программы, чем однократный прогон.

Вы можете записать время выполнения 3 или более раз для вашей программы без пула процессов, а затем вычислить среднее значение как сумму времен, деленную на общее количество запусков. Затем повторите это упражнение для расчета среднего времени с пулом процессов.

Это, вероятно, будет уместно только в том случае, если время работы вашей программы составляет минуты, а не часы.

Затем вы можете сравнить последовательную и параллельную версии, вычислив кратность увеличения скорости следующим образом:

Например, если последовательный запуск программы занял 15 минут (900 секунд), а параллельная версия с ProcessPoolExecutor заняла 5 минут (300 секунд), то процентное кратное увеличение будет рассчитано как:

То есть, параллельная версия программы с ProcessPoolExecutor в 3 раза быстрее или в 3 раза быстрее.

Вы можете умножить кратность ускорения на 100, чтобы получить процент

В этом примере параллельная версия на 300% быстрее последовательной.

Как установить размер куска в map()?

Функция map() на ProcessPoolExecutor принимает параметр "chunksize", который по умолчанию равен 1.


...
# apply a function to each item in an iterable with a chunksize
for result in executor.map(task, items, chunksize=1)
# ...

Аргумент "chunksize" управляет отображением элементов в итерабле, переданном для отображения на задачи, используемые в исполнителе ProcessPoolExecutor.

Значение один означает, что один элемент сопоставлен с одной задачей.

Напомним, что данные для каждой задачи в виде аргументов, передаваемых целевой функции задачи, и возвращаемых значений должны быть сериализованы pickle. Это происходит автоматически, но требует определенных вычислительных затрат и затрат памяти, добавляя накладные расходы к каждой задаче, обрабатываемой пулом процессов.

Когда существует огромное количество задач или задачи относительно быстро вычисляются, тогда размер чанков должен быть настроен так, чтобы максимизировать группировку элементов для обработки задач пула, чтобы минимизировать накладные расходы на каждую задачу и, в свою очередь, уменьшить общее время вычислений.

Для этого, вероятно, потребуется некоторая настройка размера кусков, которую вы можете выполнить с помощью реальных данных задачи или, возможно, тестового набора с имитацией данных и процессов задачи.

Можно попробовать следующие значения:

Примечание: деление (items / max_workers) может потребовать округления, поскольку аргумент "chunksize" должен быть целым положительным числом.

Например:


...
# estimate the chunksize
size = round(len(items) / executor._max_workers)
# apply a function to each item in an iterable with a chunksize
for result in executor.map(task, items, chunksize=size)
# ...

Сравните производительность, чтобы понять, лучше ли одна конфигурация, чем другая, и если да, используйте ее как отправную точку для оценки аналогичных значений.

Как отправить последующее задание?

Некоторые задания требуют выполнения второго задания, которое каким-то образом использует результат первого задания.

Мы можем назвать это необходимостью выполнения последующего задания для каждого представленного задания, которое может быть каким-то образом обусловлено результатом.

Существует несколько способов отправить задание на выполнение.

Одним из подходов может быть представление последующей задачи по мере обработки результатов первой задачи.

Например, мы можем обрабатывать результат каждой из первых задач по мере их завершения, затем вручную вызывать submit() для каждой последующей задачи, когда это необходимо, и сохранять новый объект future во втором списке для последующего использования.

Мы можем конкретизировать этот пример представления последующих задач с помощью полного примера.


# SuperFastPython.com
# example of submitting follow-up tasks
from time import sleep
from random import random
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
 
# mock test that works for moment
def task1():
    value = random()
    sleep(value)
    print(f'Task 1: {value}', flush=True)
    return value
 
# mock test that works for moment
def task2(value1):
    value2 = random()
    sleep(value2)
    print(f'Task 2: value1={value1}, value2={value2}', flush=True)
    return value2
 
# entry point
def main():
    # start the process pool
    with ProcessPoolExecutor(4) as executor:
        # send in the first tasks
        futures1 = [executor.submit(task1) for _ in range(10)]
        # process results in the order they are completed
        futures2 = list()
        for future1 in as_completed(futures1):
            # get the result
            result = future1.result()
            # check if we should trigger a follow-up task
            if result > 0.5:
                future2 = executor.submit(task2, result)
                futures2.append(future2)
        # wait for all follow-up tasks to complete
 
if __name__ == '__main__':
    main()

Запуск примера запускает пул процессов с 5 рабочими потоками и отправляет 10 заданий.

Затем мы обрабатываем результаты задач по мере их выполнения. Если результат первого раунда задач требует продолжения, мы отправляем последующую задачу и отслеживаем объект Future во втором списке.

Эти последующие задания отправляются по мере необходимости, а не ждут, пока все задания первого раунда будут выполнены, что является приятным преимуществом использования функции as_completed() со списком объектов Future.

Мы видим, что в данном случае пять заданий первого тура привели к последующим заданиям.


Task 1: 0.19133060953809367
Task 1: 0.19427578824746594
Task 1: 0.031055062986840576
Task 1: 0.2477232412540048
Task 1: 0.20527354481753513
Task 1: 0.7162388973077713
Task 1: 0.6377247814525346
Task 1: 0.9711113247989889
Task 2: value1=0.7162388973077713, value2=0.1286947111771508
Task 1: 0.382993492347377
Task 1: 0.8171633483302672
Task 2: value1=0.8171633483302672, value2=0.2634272538884753
Task 2: value1=0.6377247814525346, value2=0.8930378431849559
Task 2: value1=0.9711113247989889, value2=0.9553733884486519

Возможно, вам захочется использовать отдельный пул процессов для последующих задач, чтобы сохранить раздельное существование.

Я бы не рекомендовал отправлять новые задачи изнутри задачи.

Это потребует доступа к пулу процессов либо как к глобальной переменной, либо путем передачи, и нарушит идею того, что задачи являются чистыми функциями, не имеющими побочных эффектов, что является хорошей практикой при использовании пулов процессов.

Кроме того, взаимодействие с задачами внутри задач, например, ожидание результатов от объектов Future, может привести к тупиковым ситуациям, которые остановят выполнение вашей программы.

Как показать прогресс всех задач?

Существует много способов показать прогресс для задач, выполняемых ProcessPoolExecutor.

Возможно, самым простым является использование функции обратного вызова, которая обновляет индикатор выполнения. Этого можно достичь, определив функцию индикатора выполнения и зарегистрировав ее в объекте Future для каждой задачи с помощью функции add_done_callback().

Простейшим индикатором выполнения является печать точки на экране для каждого выполненного задания.

В приведенном ниже примере демонстрируется этот простой индикатор прогресса.


# SuperFastPython.com
# example of a simple progress indicator
from time import sleep
from random import random
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import wait
 
# simple progress indicator callback function
def progress_indicator(future):
    print('.', end='', flush=True)
 
# mock test that works for moment
def task(name):
    sleep(random())
 
# entry point
def main():
    # start the process pool
    with ProcessPoolExecutor(2) as executor:
        # send in the tasks
        futures = [executor.submit(task, i) for i in range(20)]
        # register the progress indicator callback
        for future in futures:
            future.add_done_callback(progress_indicator)
        # wait for all tasks to complete
    print('\nDone!')
 
if __name__ == '__main__':
    main()

Запуск примера запускает пул процессов с двумя рабочими процессами и отправляет 20 заданий.

С каждым объектом Future регистрируется функция обратного вызова индикатора выполнения, которая печатает одну точку по мере выполнения каждой задачи, обеспечивая промывку стандартного вывода при каждом вызове print() и отсутствие печати новой строки.

Это гарантирует, что каждый из нас видит точку немедленно, независимо от процесса, который печатает, и что все точки появляются на одной линии.


....................
Done!

Нужно ли нам защищать __main__?

Да.

При использовании процессов вообще, и ProcessPoolExecutor в частности, необходимо явно защитить точку входа с помощью if-выражения:


# entry point
if __name__ == '__main__':
# ...

Нужно ли вызывать freeze_support()?

Python-программы могут быть преобразованы в исполняемый файл Windows.

В процессе преобразования программы Python "заморожены". Если эти программы попытаются запустить новые процессы, это приведет к ошибке RuntimeError.

Поэтому, если вы собираетесь "заморозить" свою программу (например, преобразовать ее в исполняемый файл Windows), вы должны добавить поддержку заморозки.

Этого можно достичь, вызывая функцию freeze_support() в качестве первой строки вашей программы, такой первой строки после проверки наличия защищенной точки входа; например:


# protected entry point
if __name__ == '__main__':
    freeze_support()
    # ...

Как получить объект будущего для задач, добавленных с помощью map()?

Когда вы вызываете map(), он действительно создает объект Future для каждой задачи.

Внутри, submit() вызывается для каждого элемента в итерации, предоставленной для вызова map().

Тем не менее, не существует чистого способа доступа к объекту Future для задач, отправленных в пул процессов через map().

Каждая задача во внутренней очереди работ объекта ProcessPoolExecutor является экземпляром _WorkItem, который имеет ссылку на объект Future для задачи. Вы можете получить доступ к внутренней очереди объекта ProcessPoolExecutor, но у вас нет безопасного способа перечислить элементы в очереди без их удаления.

Если вам нужен объект Future для задачи, вызовите submit().

Как проверить, сколько заданий осталось в ProcessPoolExecutor

Есть два способа оценить количество задач, которые остаются в пуле процессов.

В первом случае можно отслеживать количество поданных заданий с помощью счетчика из главного потока, например, если все задания подаются из главного потока.

Вы можете использовать безопасный для процесса счетчик в обратном вызове Future для отслеживания количества выполненных задач. Это можно использовать в качестве грубого индикатора прогресса.

Более просто, вы можете напрямую запросить защищенные члены ProcessPoolExecutor, чтобы узнать, сколько заданий осталось обработать.

Возможно, простой структурой для проверки внутри пула процессов является словарь, который сопоставляет идентификаторы рабочих элементов с рабочими элементами, например, заданиями, которые были представлены, но еще не отправлены процессам для выполнения. Это находится в защищенном свойстве _pending_work_items.

Мы можем сообщить размер этого словаря напрямую; например:


...
# report the number of remaining tasks
print(f'About {len(executor._pending_work_items)} tasks remain')

Мы могли бы сообщать количество оставшихся задач по мере завершения каждой представленной задачи, например, с помощью функции as_completed() для списка объектов Future; например:


...
# update each time a task finishes
for _ in as_completed(futures):
    # report the number of remaining tasks
    print(f'About {len(executor._pending_work_items)} tasks remain')

Связывая это вместе, приведенный ниже полный пример отправляет 50 задач в пул процессов с четырьмя рабочими процессами и сообщает о количестве оставшихся задач по мере завершения каждой задачи.


# SuperFastPython.com
# example of estimating the number of remaining tasks
from time import sleep
from random import random
from concurrent.futures import ProcessPoolExecutor
from concurrent.futures import as_completed
 
# mock test that works for moment
def task():
    value = random()
    sleep(value)
 
# entry point
def main():
    # start the process pool
    with ProcessPoolExecutor(4) as executor:
        # submit many tasks
        futures = [executor.submit(task) for _ in range(50)]
        print('Waiting for tasks to complete...')
        # update each time a task finishes
        for _ in as_completed(futures):
            # report the number of remaining tasks
            print(f'About {len(executor._pending_work_items)} tasks remain')
 
if __name__ == '__main__':
    main()

Запуск примера быстро выполняет задания и предоставляет обновленный отчет о количестве оставшихся заданий по мере выполнения каждого задания.


Waiting for tasks to complete...
About 49 tasks remain
About 48 tasks remain
About 47 tasks remain
About 46 tasks remain
About 45 tasks remain
About 44 tasks remain
About 43 tasks remain
About 42 tasks remain
About 41 tasks remain
About 40 tasks remain
About 39 tasks remain
About 38 tasks remain
About 37 tasks remain
About 36 tasks remain
About 35 tasks remain
About 34 tasks remain
About 33 tasks remain
About 32 tasks remain
About 31 tasks remain
About 30 tasks remain
About 29 tasks remain
About 28 tasks remain
About 27 tasks remain
About 26 tasks remain
About 25 tasks remain
About 24 tasks remain
About 23 tasks remain
About 22 tasks remain
About 21 tasks remain
About 20 tasks remain
About 19 tasks remain
About 18 tasks remain
About 17 tasks remain
About 16 tasks remain
About 15 tasks remain
About 14 tasks remain
About 13 tasks remain
About 12 tasks remain
About 11 tasks remain
About 10 tasks remain
About 9 tasks remain
About 8 tasks remain
About 7 tasks remain
About 6 tasks remain
About 5 tasks remain
About 4 tasks remain
About 3 tasks remain
About 2 tasks remain
About 1 tasks remain
About 0 tasks remain

Исполнитель ProcessPoolExecutor может быть не лучшим решением для всех проблем параллелизма в вашей программе.

Вместе с тем, возможно, существуют некоторые недопонимания, которые мешают вам полностью и наилучшим образом использовать возможности ProcessPoolExecutor в вашей программе.

В этом разделе мы рассмотрим некоторые распространенные возражения, с которыми сталкиваются разработчики при рассмотрении возможности использования ProcessPoolExecutor в своем коде

Давайте погрузимся внутрь.

Что касается глобальной блокировки интерпретатора (GIL)?

При использовании процессов, таких как класс Process или класс ProcessPoolExecutor, GIL обычно не имеет значения.

Глобальная блокировка интерпретатора, или сокращенно GIL, - это проектное решение в эталонном интерпретаторе Python.

Это относится к тому, что реализация интерпретатора Python использует главную блокировку, которая предотвращает одновременное выполнение более одной инструкции Python.

Это предотвращает более одного потока выполнения в программах Python, в частности, в каждом процессе Python, то есть в каждом экземпляре интерпретатора Python.

Реализация GIL означает, что потоки Python могут быть параллельными, но не могут выполняться параллельно. Напомним, что параллельное выполнение означает, что несколько задач могут выполняться одновременно, а параллельное - что несколько задач выполняются одновременно. Параллельные задачи являются параллельными; параллельные задачи могут выполняться или не выполняться параллельно.

Это причина эвристики, согласно которой потоки Python должны использоваться только для задач, связанных с IO, а не для задач, связанных с CPU, поскольку задачи, связанные с IO, будут ждать в ядре операционной системы ответа удаленных ресурсов (не выполняя инструкции Python), позволяя другим потокам Python запускаться и выполнять инструкции Python.

Поэтому GIL необходимо учитывать при использовании потоков в Python, таких как класс threading.Thread и ThreadPoolExecutor. Он не является обязательным при использовании ProcessPoolExecutor (если только вы не используете дополнительные потоки внутри каждой задачи).

Являются ли процессы Python "реальными процессами"?

Да.

Python использует реальные процессы системного уровня, также называемые порождающими процессами или развивающими процессами, возможность, предоставляемую современными операционными системами, такими как Windows, Linux и MacOS.

Разве процессы Python не глючат?

Нет.

Процессы Python не являются глючными.

Процессы Python являются первоклассной возможностью платформы Python и были таковой в течение очень долгого времени.

Не является ли Python плохим выбором для параллелизма?

Разработчики любят python по многим причинам, чаще всего потому, что он прост в использовании и быстр в разработке.

Python обычно используется для "клеящего" кода, одноразовых скриптов, но все больше и больше для крупномасштабных программных систем.

Если вы используете Python и вам нужен параллелизм, то вы работаете с тем, что у вас есть. Вопрос спорный.

Если вам нужен параллелизм, а вы не выбрали язык, возможно, другой язык будет более подходящим, а возможно, и нет. Рассмотрите весь объем функциональных и нефункциональных требований (или потребностей, желаний и нужд пользователей) для вашего проекта и возможности различных платформ разработки.

Почему бы не использовать ThreadPoolExecutor вместо этого?

Процедура ThreadPoolExecutor поддерживает пулы потоков, в отличие от ProcessPoolExecutor, который поддерживает пулы процессов, где каждый процесс будет иметь один поток.

Потоки и процессы совершенно разные, и выбор одного над другим должен быть вполне осознанным.

Программа Python - это процесс, который имеет главный поток. Вы можете создать множество дополнительных потоков в процессе Python. Вы также можете создать или породить множество процессов Python, каждый из которых будет иметь один главный поток и может порождать дополнительные потоки.

В более широком смысле, потоки являются легкими и могут совместно использовать память (данные и переменные) внутри процесса, в то время как процессы являются тяжелыми и требуют больше накладных расходов и накладывают больше ограничений на совместное использование памяти (данных и переменных).

Обычно в Python процессы используются для задач, связанных с процессором, а потоки - для задач, связанных с вводом-выводом, и это хорошая эвристика, но это не обязательно должно быть так.

Возможно, ThreadPoolExecutor лучше подходит для решения вашей конкретной проблемы. Возможно, попробуйте и посмотрите.

Почему бы не использовать multiprocessing.Process вместо этого?

ProcessPoolExecutor - это как "автоматический режим" для процессов Python.

Если у вас более сложный случай использования, вам может понадобиться напрямую использовать класс multiprocessing.Process.

Это может быть связано с тем, что вам требуется больше синхронизации между процессами с помощью механизмов блокировки, общей памяти между процессами, такой как менеджер, и/или больше координации между процессами с помощью барьеров и семафоров.

Возможно, у вас более простой случай использования, например, одна задача, и в этом случае пул процессов будет слишком тяжелым решением.

При этом, если вы используете класс Process с ключевым словом "target" для чистых функций (функций, не имеющих побочных эффектов), возможно, вам лучше использовать ProcessPoolExecutor.

Почему не использовать AsyncIO?

AsyncIO может быть альтернативой использованию ThreadPoolExecutor, но, вероятно, не является хорошей альтернативой для ProcessPoolExecutor.

AsyncIO разработан для поддержки большого количества операций ввода-вывода, возможно, от тысяч до десятков тысяч, все в рамках одного потока.

Это требует альтернативной парадигмы программирования, называемой реактивным программированием, которая может быть сложной для новичков.

При использовании ProcessPoolExecutor обычно выполняются задачи, привязанные к процессору, что не подходит при использовании модуля AsyncIO.

Выводы

Это большое руководство, и вы очень подробно узнали, как работает ProcessPoolExecutor и как лучше всего использовать его в вашем проекте.

Вернуться на верх