multiprocessing
— Параллелизм на основе процессов¶
Исходный код: Lib/multiprocessing/
Availability: это не Emscripten, это был не я.
Этот модуль не работает или недоступен на платформах WebAssembly wasm32-emscripten
и wasm32-wasi
. Дополнительную информацию смотрите в разделе Платформы веб-сборки.
Вступление¶
multiprocessing
- это пакет, который поддерживает процессы запуска с использованием API, аналогичного модулю threading
. Пакет multiprocessing
обеспечивает как локальный, так и удаленный параллелизм, эффективно обходя Global Interpreter Lock за счет использования подпроцессов вместо потоков. Благодаря этому модуль multiprocessing
позволяет программисту в полной мере использовать несколько процессоров на данной машине. Он работает как в Unix, так и в Windows.
В модуле multiprocessing
также представлены API, которые не имеют аналогов в модуле threading
. Ярким примером этого является объект Pool
, который предлагает удобное средство распараллеливания выполнения функции с несколькими входными значениями, распределяя входные данные по процессам (параллелизм данных). Следующий пример демонстрирует общепринятую практику определения таких функций в модуле, чтобы дочерние процессы могли успешно импортировать этот модуль. Это базовый пример параллелизма данных с использованием Pool
,
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
будет напечатан в стандартном формате вывода
[1, 4, 9]
См.также
concurrent.futures.ProcessPoolExecutor
предлагает интерфейс более высокого уровня для передачи задач фоновому процессу, не блокируя выполнение вызывающего процесса. По сравнению с непосредственным использованием интерфейса Pool
, API concurrent.futures
позволяет отделить отправку работы в базовый пул процессов от ожидания результатов.
Класс Process
¶
В multiprocessing
процессы запускаются путем создания объекта Process
и последующего вызова его метода start()
. Process
следует API threading.Thread
. Тривиальным примером многопроцессорной программы является
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
Чтобы показать используемые идентификаторы отдельных процессов, приведем расширенный пример:
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
Объяснение того, почему необходима часть if __name__ == '__main__'
, смотрите в разделе Руководящие принципы программирования.
Контексты и методы запуска¶
В зависимости от платформы, multiprocessing
поддерживает три способа запуска процесса. Этими методами запуска являются
- порождать
Родительский процесс запускает новый процесс интерпретатора Python. Дочерний процесс унаследует только те ресурсы, которые необходимы для запуска метода объекта process
run()
. В частности, ненужные файловые дескрипторы и дескрипторы дескрипторов из родительского процесса унаследованы не будут. Запуск процесса с использованием этого метода происходит довольно медленно по сравнению с использованием fork или forkserver.Доступно в Unix и Windows. Используется по умолчанию в Windows и Mac OS.
- вилка
Родительский процесс использует
os.fork()
для разветвления интерпретатора Python. Дочерний процесс, когда он запускается, фактически идентичен родительскому процессу. Все ресурсы родительского процесса наследуются дочерним процессом. Обратите внимание, что безопасное разветвление многопоточного процесса проблематично.Доступно только в Unix. Используется в Unix по умолчанию.
- сервер форков
Когда программа запускается и выбирает метод forkserver start, запускается серверный процесс. С этого момента всякий раз, когда требуется новый процесс, родительский процесс подключается к серверу и запрашивает, чтобы он запустил новый процесс. Серверный процесс fork является однопоточным, поэтому для него безопасно использовать
os.fork()
. Никакие ненужные ресурсы не наследуются.Доступен на платформах Unix, которые поддерживают передачу файловых дескрипторов по каналам Unix.
Изменено в версии 3.4: Функция spawn добавлена на все платформы Unix, а функция forkserver - на некоторые платформы Unix. Дочерние процессы больше не наследуют все родительские дескрипторы, наследуемые в Windows.
Изменено в версии 3.8: В macOS метод spawn start теперь используется по умолчанию. Метод fork start следует считать небезопасным, поскольку он может привести к сбоям в работе подпроцесса, поскольку системные библиотеки mac OS могут запускать потоки. Смотрите bpo-33725.
В POSIX использование методов запуска spawn или forkserver также запустит процесс отслеживания ресурсов, который отслеживает несвязанные именованные системные ресурсы (такие как именованные семафоры или объекты SharedMemory
), созданные процессами программы. Когда все процессы завершат работу, система отслеживания ресурсов отключит все оставшиеся отслеживаемые объекты. Обычно их не должно быть, но если процесс был остановлен из-за сигнала, может произойти «утечка» ресурсов. (Ни просочившиеся семафоры, ни сегменты общей памяти не будут автоматически отсоединены до следующей перезагрузки. Это проблематично для обоих объектов, поскольку система допускает только ограниченное количество именованных семафоров, а сегменты общей памяти занимают некоторое пространство в основной памяти.)
Чтобы выбрать метод запуска, вы используете set_start_method()
в предложении if __name__ == '__main__'
основного модуля. Например:
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
set_start_method()
не следует использовать в программе более одного раза.
В качестве альтернативы вы можете использовать get_context()
для получения контекстного объекта. Контекстные объекты имеют тот же API, что и модуль многопроцессорной обработки, и позволяют использовать несколько методов запуска в одной программе.
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
Обратите внимание, что объекты, относящиеся к одному контексту, могут быть несовместимы с процессами из другого контекста. В частности, блокировки, созданные с использованием контекста fork, не могут быть переданы процессам, запущенным с использованием методов spawn или forkserver start.
Библиотеке, которая хочет использовать определенный метод запуска, вероятно, следует использовать get_context()
, чтобы не вмешиваться в выбор пользователя библиотеки.
Предупреждение
Методы 'spawn'
и 'forkserver'
start в настоящее время нельзя использовать с «замороженными» исполняемыми файлами (т.е. двоичными файлами, созданными такими пакетами, как PyInstaller и cx_Freeze) в Unix. Метод 'fork'
start действительно работает.
Обмен объектами между процессами¶
multiprocessing
поддерживает два типа каналов связи между процессами:
Очереди
Класс
Queue
является близким клоном классаqueue.Queue
. Например:from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()Очереди безопасны для потоков и процессов.
Трубы
Функция
Pipe()
возвращает пару объектов connection, соединенных каналом, который по умолчанию является дуплексным (двусторонним). Например:from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()Два объекта connection, возвращаемые с помощью
Pipe()
, представляют собой два конца канала. Каждый объект connection имеет методыsend()
иrecv()
(среди прочих). Обратите внимание, что данные в канале могут быть повреждены, если два процесса (или потока) одновременно попытаются выполнить чтение с одного и того же конца канала или запись в него. Конечно, процессы, использующие разные концы канала одновременно, не подвержены риску повреждения.
Синхронизация между процессами¶
multiprocessing
содержит эквиваленты всех примитивов синхронизации из threading
. Например, можно использовать блокировку, чтобы гарантировать, что только один процесс одновременно выводит стандартный вывод:
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
Без использования блокировки выходные данные из разных процессов могут перепутаться.
Использование пула работников¶
Класс Pool
представляет собой пул рабочих процессов. В нем есть методы, которые позволяют передавать задачи рабочим процессам несколькими различными способами.
Например:
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# start 4 worker processes
with Pool(processes=4) as pool:
# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print(i)
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print(res.get(timeout=1)) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print(res.get(timeout=1)) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# make a single worker sleep for 10 seconds
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
print("For the moment, the pool remains available for more work")
# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")
Обратите внимание, что методы пула должны использоваться только тем процессом, который его создал.
Примечание
Функциональность этого пакета требует, чтобы дочерние элементы могли импортировать модуль __main__
. Это описано в Руководящие принципы программирования, однако здесь стоит обратить на это внимание. Это означает, что некоторые примеры, такие как multiprocessing.pool.Pool
, не будут работать в интерактивном интерпретаторе. Например:
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
... return x*x
...
>>> with p:
... p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: Can't get attribute 'f' on <module '__main__' (built-in)>
AttributeError: Can't get attribute 'f' on <module '__main__' (built-in)>
AttributeError: Can't get attribute 'f' on <module '__main__' (built-in)>
(Если вы попробуете это, это фактически выведет три полных трассировки, чередующихся полуслучайным образом, и тогда вам, возможно, придется каким-то образом остановить родительский процесс.)
Ссылка¶
Пакет multiprocessing
в основном копирует API модуля threading
.
Process
и исключения¶
- class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)¶
Объекты Process представляют собой действие, выполняемое в отдельном процессе. Класс
Process
содержит эквиваленты всех методовthreading.Thread
.Конструктор всегда должен вызываться с аргументами ключевого слова. group всегда должен быть
None
; он существует исключительно для совместимости сthreading.Thread
. target - это вызываемый объект, который должен быть вызван методомrun()
. По умолчанию используется значениеNone
, что означает, что ничего не вызывается. name - это имя процесса (более подробную информацию смотрите вname
). args - это кортеж аргументов для целевого вызова. kwargs - это словарь аргументов по ключевым словам для целевого вызова. Если он указан, аргумент daemon, содержащий только ключевое слово, устанавливает для параметра processdaemon
значениеTrue
илиFalse
. ЕслиNone
(значение по умолчанию), то этот флаг будет унаследован от процесса создания.По умолчанию в target аргументы не передаются. Аргумент args, значение которого по умолчанию равно
()
, можно использовать для указания списка или набора аргументов, которые будут переданы в target.Если подкласс переопределяет конструктор, он должен убедиться, что вызывает конструктор базового класса (
Process.__init__()
), прежде чем выполнять какие-либо другие действия с процессом.Изменено в версии 3.3: Добавлен параметр daemon.
- run()¶
Метод, представляющий активность процесса.
Вы можете переопределить этот метод в подклассе. Стандартный метод
run()
вызывает вызываемый объект, передаваемый конструктору объекта в качестве целевого аргумента, если таковой имеется, с аргументами sequential и keyword, взятыми из аргументов args и kwargs соответственно.Используя список или кортеж в качестве аргумента args, передаваемого в
Process
, достигается тот же эффект.Пример:
>>> from multiprocessing import Process >>> p = Process(target=print, args=[1]) >>> p.run() 1 >>> p = Process(target=print, args=(1,)) >>> p.run() 1
- start()¶
Запустите действие процесса.
Это должно вызываться не более одного раза для каждого объекта процесса. Это позволяет вызывать метод объекта
run()
в отдельном процессе.
- join([timeout])¶
Если необязательный аргумент timeout равен
None
(значение по умолчанию), метод блокируется до тех пор, пока не завершится процесс, для которого вызывается методjoin()
. Если timeout является положительным числом, он блокируется не более чем на время ожидания секунд. Обратите внимание, что метод возвращаетNone
, если его процесс завершается или если время ожидания метода истекло. Проверьтеexitcode
процесса, чтобы определить, завершился ли он.К одному процессу можно присоединяться много раз.
Процесс не может присоединиться сам к себе, поскольку это приведет к взаимоблокировке. Попытка присоединиться к процессу до его запуска является ошибкой.
- name¶
Имя процесса. Имя представляет собой строку, используемую только для идентификации. Оно не имеет семантики. Нескольким процессам может быть присвоено одно и то же имя.
Начальное имя задается конструктором. Если конструктору не указано явное имя, то создается имя вида „Process-N1:N2:…:Nk“, где каждое N:sub:k является N-м дочерним элементом своего родителя.
- is_alive()¶
Возвращает, запущен ли процесс.
Грубо говоря, объект process активен с момента возврата метода
start()
до завершения дочернего процесса.
- daemon¶
Флаг демона процесса, логическое значение. Это значение должно быть установлено перед вызовом
start()
.Начальное значение наследуется в процессе создания.
Когда процесс завершает работу, он пытается завершить все свои демонические дочерние процессы.
Обратите внимание, что демоническому процессу запрещено создавать дочерние процессы. В противном случае демонический процесс оставил бы своих дочерних процессов сиротами, если бы он был завершен при завершении родительского процесса. Кроме того, это не демоны или службы Unix, это обычные процессы, которые будут завершены (а не присоединены), если завершатся недемонические процессы.
В дополнение к
threading.Thread
API, объектыProcess
также поддерживают следующие атрибуты и методы:- pid¶
Верните идентификатор процесса. Перед запуском процесса это значение будет
None
.
- exitcode¶
Код завершения дочернего процесса. Это будет
None
, если процесс еще не завершен.Если дочерний метод
run()
возвращает значение в обычном режиме, код завершения будет равен 0. Если он завершается с помощьюsys.exit()
с целочисленным аргументом N, кодом завершения будет N.Если дочерний процесс завершился из-за исключения, не перехваченного в пределах
run()
, код завершения будет равен 1. Если он был завершен по сигналу N, кодом завершения будет отрицательное значение -N.
- authkey¶
Ключ аутентификации процесса (строка байтов).
При инициализации
multiprocessing
основному процессу присваивается случайная строка с использованиемos.urandom()
.Когда создается объект
Process
, он наследует ключ аутентификации своего родительского процесса, хотя это можно изменить, установив значениеauthkey
в другой байтовой строке.Смотрите Ключи аутентификации.
- sentinel¶
Цифровой дескриптор системного объекта, который станет «готовым» после завершения процесса.
Вы можете использовать это значение, если хотите дождаться нескольких событий одновременно, используя
multiprocessing.connection.wait()
. В противном случае проще вызватьjoin()
.В Windows это дескриптор операционной системы, который можно использовать с вызовами семейства API
WaitForSingleObject
иWaitForMultipleObjects
. В Unix это файловый дескриптор, который можно использовать с примитивами из модуляselect
.Добавлено в версии 3.3.
- terminate()¶
Завершите процесс. В POSIX это делается с помощью сигнала
SIGTERM
; в Windows используетсяTerminateProcess()
. Обратите внимание, что обработчики exit и предложения finally и т.д. выполняться не будут.Обратите внимание, что процессы-потомки этого процесса не будут завершены - они просто станут потерянными.
Предупреждение
Если этот метод используется, когда связанный процесс использует канал или очередь, то канал или очередь могут быть повреждены и могут стать непригодными для использования другим процессом. Аналогично, если процесс получил блокировку или семафор и т.д., то его завершение может привести к взаимоблокировке других процессов.
- kill()¶
То же, что и
terminate()
, но с использованием сигналаSIGKILL
в Unix.Добавлено в версии 3.7.
- close()¶
Закройте объект
Process
, освободив все связанные с ним ресурсы.ValueError
вызывается, если базовый процесс все еще выполняется. Как толькоclose()
будет успешно возвращен, большинство других методов и атрибутов объектаProcess
вызовутValueError
.Добавлено в версии 3.7.
Обратите внимание, что методы
start()
,join()
,is_alive()
,terminate()
иexitcode
должны вызываться только процессом, создавшим объект process.Пример использования некоторых методов
Process
:>>> import multiprocessing, time, signal >>> p = multiprocessing.Process(target=time.sleep, args=(1000,)) >>> print(p, p.is_alive()) <Process ... initial> False >>> p.start() >>> print(p, p.is_alive()) <Process ... started> True >>> p.terminate() >>> time.sleep(0.1) >>> print(p, p.is_alive()) <Process ... stopped exitcode=-SIGTERM> False >>> p.exitcode == -signal.SIGTERM True
- exception multiprocessing.ProcessError¶
Базовый класс всех исключений
multiprocessing
.
- exception multiprocessing.BufferTooShort¶
Исключение, вызванное
Connection.recv_bytes_into()
, когда предоставленный объект buffer слишком мал для чтения сообщения.Если
e
является экземпляромBufferTooShort
, тоe.args[0]
передаст сообщение в виде байтовой строки.
- exception multiprocessing.AuthenticationError¶
Вызывается при возникновении ошибки аутентификации.
- exception multiprocessing.TimeoutError¶
Вызывается методами с тайм-аутом по истечении этого времени.
Каналы и очереди¶
При использовании нескольких процессов обычно используется передача сообщений для взаимодействия между процессами и избегается необходимость использования каких-либо примитивов синхронизации, таких как блокировки.
Для передачи сообщений можно использовать Pipe()
(для соединения между двумя процессами) или очередь (что позволяет использовать несколько производителей и потребителей).
Типы Queue
, SimpleQueue
и JoinableQueue
представляют собой очереди с несколькими производителями и несколькими потребителями FIFO, созданные по образцу класса queue.Queue
в стандартной библиотеке. Они отличаются тем, что в Queue
отсутствуют методы task_done()
и join()
, представленные в классе queue.Queue
Python 2.5.
Если вы используете JoinableQueue
, то вы должны вызывать JoinableQueue.task_done()
для каждой задачи, удаленной из очереди, иначе семафор, используемый для подсчета количества незавершенных задач, может в конечном итоге переполниться, вызывая исключение.
Обратите внимание, что можно также создать общую очередь с помощью объекта manager - см. Менеджеры.
Примечание
multiprocessing
использует обычные исключения queue.Empty
и queue.Full
, чтобы сигнализировать о тайм-ауте. Они недоступны в пространстве имен multiprocessing
, поэтому вам необходимо импортировать их из queue
.
Примечание
Когда объект помещается в очередь, он обрабатывается, и фоновый поток позже сбрасывает обработанные данные в базовый канал. Это имеет некоторые последствия, которые немного удивляют, но не должны вызывать каких-либо практических трудностей - если они вас действительно беспокоят, вы можете вместо этого использовать очередь, созданную с помощью manager.
После помещения объекта в пустую очередь может возникнуть бесконечно малая задержка, прежде чем метод очереди
empty()
вернетFalse
иget_nowait()
, не вызываяqueue.Empty
.Если несколько процессов помещают объекты в очередь, возможно, что объекты будут получены на другом конце не по порядку. Однако объекты, помещенные в очередь одним и тем же процессом, всегда будут находиться в ожидаемом порядке по отношению друг к другу.
Предупреждение
Если процесс прерывается с помощью Process.terminate()
или os.kill()
, в то время как он пытается использовать Queue
, то данные в очереди, скорее всего, будут повреждены. Это может привести к тому, что любой другой процесс получит исключение при попытке использовать очередь позже.
Предупреждение
Как упоминалось выше, если дочерний процесс поместил элементы в очередь (и он не использовал JoinableQueue.cancel_join_thread
), то этот процесс не завершится до тех пор, пока все буферизованные элементы не будут сброшены в канал.
Это означает, что если вы попытаетесь присоединиться к этому процессу, вы можете столкнуться с тупиковой ситуацией, если не будете уверены, что все элементы, которые были помещены в очередь, были использованы. Аналогично, если дочерний процесс не является демоническим, то родительский процесс может зависнуть при выходе, когда он попытается присоединиться ко всем своим недемоническим дочерним процессам.
Обратите внимание, что в очереди, созданной с помощью менеджера, такой проблемы нет. Смотрите Руководящие принципы программирования.
Пример использования очередей для межпроцессного взаимодействия приведен в разделе Примеры.
- multiprocessing.Pipe([duplex])¶
Возвращает пару
(conn1, conn2)
объектовConnection
, представляющих концы трубы.Если значение duplex равно
True
(по умолчанию), то канал является двунаправленным. Если значение duplex равноFalse
, то канал является однонаправленным:conn1
может использоваться только для приема сообщений, аconn2
может использоваться только для отправки сообщений.
- class multiprocessing.Queue([maxsize])¶
Возвращает общую очередь процесса, реализованную с использованием канала и нескольких блокировок/семафоров. Когда процесс впервые помещает элемент в очередь, запускается поток-фидер, который передает объекты из буфера в канал.
Обычные исключения
queue.Empty
иqueue.Full
из модуляqueue
стандартной библиотеки генерируются для оповещения о тайм-аутах.Queue
реализует все методыqueue.Queue
, за исключениемtask_done()
иjoin()
.- qsize()¶
Возвращает приблизительный размер очереди. Из-за семантики многопоточности/многопроцессорности это число является ненадежным.
Обратите внимание, что это может вызвать
NotImplementedError
на платформах Unix, таких как macOS, гдеsem_getvalue()
не реализован.
- empty()¶
Возвращает
True
, если очередь пуста,False
в противном случае. Из-за семантики многопоточности/многопроцессорности это ненадежно.
- full()¶
Возвращает
True
, если очередь заполнена,False
в противном случае. Из-за семантики многопоточности/многопроцессорности это ненадежно.
- put(obj[, block[, timeout]])¶
Поместите объект obj в очередь. Если необязательный аргумент block равен
True
(по умолчанию), а timeout равенNone
(по умолчанию), при необходимости заблокируйте, пока не освободится место. Если timeout является положительным числом, он блокирует не более timeout секунд и вызывает исключениеqueue.Full
, если в течение этого времени не было доступно ни одного свободного места. В противном случае (block - этоFalse
), поместите элемент в очередь, если свободное место доступно немедленно, иначе вызовите исключениеqueue.Full
(timeout в этом случае игнорируется).Изменено в версии 3.8: Если очередь закрыта, то вместо
AssertionError
выводитсяValueError
.
- put_nowait(obj)¶
Эквивалентно
put(obj, False)
.
- get([block[, timeout]])¶
Удалите и верните элемент из очереди. Если необязательные аргументы block имеют значение
True
(по умолчанию), а timeout -None
(по умолчанию), при необходимости заблокируйте, пока элемент не станет доступен. Если timeout является положительным числом, он блокирует не более timeout секунд и вызывает исключениеqueue.Empty
, если в течение этого времени не было доступно ни одного элемента. В противном случае (блок равенFalse
), верните элемент, если он доступен немедленно, иначе вызовите исключениеqueue.Empty
(тайм-аут в этом случае игнорируется).Изменено в версии 3.8: Если очередь закрыта, то вместо
OSError
выводитсяValueError
.
- get_nowait()¶
Эквивалентно
get(False)
.
multiprocessing.Queue
содержит несколько дополнительных методов, которых нет вqueue.Queue
. В большинстве случаев эти методы не требуются:- close()¶
Укажите, что текущий процесс больше не будет помещать данные в эту очередь. Фоновый поток завершит работу, как только он сбросит все буферизованные данные в канал. Это вызывается автоматически, когда очередь заполняется мусором.
- join_thread()¶
Присоединиться к фоновому потоку. Это можно использовать только после вызова
close()
. Он блокируется до завершения фонового потока, гарантируя, что все данные из буфера будут сброшены в канал.По умолчанию, если процесс не является создателем очереди, то при выходе он попытается присоединиться к фоновому потоку очереди. Процесс может вызвать
cancel_join_thread()
, чтобы заставитьjoin_thread()
ничего не делать.
- cancel_join_thread()¶
Предотвращать блокировку
join_thread()
. В частности, это предотвращает автоматическое присоединение фонового потока при завершении процесса - см.join_thread()
.Более подходящим названием для этого метода могло бы быть
allow_exit_without_flush()
. Это может привести к потере данных, помещенных в очередь, и вам почти наверняка не понадобится его использовать. На самом деле это возможно только в том случае, если вам нужно немедленно завершить текущий процесс, не дожидаясь сброса данных, помещенных в очередь, в базовый канал, и вы не заботитесь о потере данных.
Примечание
Функциональность этого класса требует наличия функционирующей реализации общего семафора в операционной системе хоста. Без нее функциональность этого класса будет отключена, а попытки создать экземпляр
Queue
приведут к появлениюImportError
. Смотрите bpo-3770 для получения дополнительной информации. То же самое относится к любому из специализированных типов очередей, перечисленных ниже.
- class multiprocessing.SimpleQueue¶
Это упрощенный тип
Queue
, очень близкий к закрытому типуPipe
.- close()¶
Закройте очередь: освободите внутренние ресурсы.
Очередь больше не должна использоваться после ее закрытия. Например, методы,
get()
,put()
иempty()
больше не должны вызываться.Добавлено в версии 3.9.
- empty()¶
Верните
True
, если очередь пуста,False
в противном случае.
- get()¶
Удаление и возврат элемента из очереди.
- put(item)¶
Поместите элемент в очередь.
- class multiprocessing.JoinableQueue([maxsize])¶
JoinableQueue
, подклассQueue
, представляет собой очередь, которая дополнительно содержит методыtask_done()
иjoin()
.- task_done()¶
Указывает, что ранее поставленная в очередь задача завершена. Используется пользователями очереди. Для каждого
get()
, используемого для получения задачи, последующий вызовtask_done()
сообщает очереди, что обработка задачи завершена.Если
join()
в данный момент заблокирован, он возобновится, когда все элементы будут обработаны (это означает, что для каждого элемента, который был помещенput()
в очередь, был получен вызовtask_done()
).Вызывает
ValueError
, если вызывается больше раз, чем было элементов, помещенных в очередь.
- join()¶
Блокируйте до тех пор, пока все элементы в очереди не будут получены и обработаны.
Количество незавершенных задач увеличивается всякий раз, когда элемент добавляется в очередь. Количество незавершенных задач уменьшается всякий раз, когда пользователь набирает
task_done()
, чтобы сообщить, что элемент был получен и вся работа с ним завершена. Когда количество незавершенных задач упадет до нуля,join()
разблокируется.
Разнообразный¶
- multiprocessing.active_children()¶
Возвращает список всех живых дочерних элементов текущего процесса.
Вызов этого метода имеет побочный эффект «присоединения» ко всем процессам, которые уже завершились.
- multiprocessing.cpu_count()¶
Возвращает количество процессоров в системе.
Это число не соответствует количеству процессоров, которые может использовать текущий процесс. Количество используемых процессоров можно получить с помощью
len(os.sched_getaffinity(0))
Когда количество процессоров не может быть определено, выводится значение
NotImplementedError
.См.также
- multiprocessing.current_process()¶
Возвращает объект
Process
, соответствующий текущему процессу.Аналог
threading.current_thread()
.
- multiprocessing.parent_process()¶
Возвращает
Process
объект, соответствующий родительскому процессуcurrent_process()
. Для основного процессаparent_process
будетNone
.Добавлено в версии 3.8.
- multiprocessing.freeze_support()¶
Добавлена поддержка для случаев, когда программа, использующая
multiprocessing
, была заморожена для создания исполняемого файла Windows. (Было протестировано с помощью py2exe, PyInstaller и cx_Freeze.)Эту функцию нужно вызывать сразу после строки
if __name__ == '__main__'
основного модуля. Например:from multiprocessing import Process, freeze_support def f(): print('hello world!') if __name__ == '__main__': freeze_support() Process(target=f).start()
Если строка
freeze_support()
опущена, то при попытке запустить замороженный исполняемый файл будет выведено значениеRuntimeError
.Вызов
freeze_support()
не имеет эффекта при вызове в любой операционной системе, отличной от Windows. Кроме того, если модуль нормально запускается интерпретатором Python в Windows (программа не была заморожена), тоfreeze_support()
не имеет эффекта.
- multiprocessing.get_all_start_methods()¶
Возвращает список поддерживаемых методов запуска, первый из которых используется по умолчанию. Возможными методами запуска являются
'fork'
,'spawn'
и'forkserver'
. В Windows доступен только'spawn'
. В Unix всегда поддерживаются'fork'
и'spawn'
, по умолчанию используется'fork'
.Добавлено в версии 3.4.
- multiprocessing.get_context(method=None)¶
Возвращает объект контекста, который имеет те же атрибуты, что и модуль
multiprocessing
.Если значение method равно
None
, то возвращается контекст по умолчанию. В противном случае метод должен быть'fork'
,'spawn'
,'forkserver'
.ValueError
вызывается, если указанный метод запуска недоступен.Добавлено в версии 3.4.
- multiprocessing.get_start_method(allow_none=False)¶
Возвращает имя метода запуска, используемого для запуска процессов.
Если метод start не был исправлен и значение allow_none равно false, то для метода start устанавливается значение по умолчанию и возвращается имя. Если метод start не был исправлен и значение allow_none равно true, то возвращается значение
None
.Возвращаемое значение может быть
'fork'
,'spawn'
,'forkserver'
илиNone
.'fork'
является значением по умолчанию в Unix, в то время как'spawn'
является по умолчанию в Windows и Mac OS.Добавлено в версии 3.4.
Изменено в версии 3.8: В macOS метод spawn start теперь используется по умолчанию. Метод fork start следует считать небезопасным, поскольку он может привести к сбоям в работе подпроцесса. Смотрите bpo-33725.
- multiprocessing.set_executable(executable)¶
Укажите путь к интерпретатору Python, который будет использоваться при запуске дочернего процесса. (По умолчанию используется
sys.executable
). Вероятно, разработчикам встраивания потребуется сделать что-то вродеset_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
прежде чем они смогут создавать дочерние процессы.
Изменено в версии 3.4: Теперь поддерживается в Unix при использовании метода
'spawn'
start.Изменено в версии 3.11: Принимает значение path-like object.
- multiprocessing.set_start_method(method, force=False)¶
Задайте метод, который должен использоваться для запуска дочерних процессов. Аргументом method может быть
'fork'
,'spawn'
или'forkserver'
. ВызываетRuntimeError
, если метод запуска уже был установлен, а значение force не равноTrue
. Если значение method равноNone
, а значение force равноTrue
, то для метода start устанавливается значениеNone
. Если значение method равноNone
, а значение force равноFalse
, то для контекста устанавливается значение по умолчанию.Обратите внимание, что это должно вызываться не более одного раза, и оно должно быть защищено внутри предложения
if __name__ == '__main__'
основного модуля.Добавлено в версии 3.4.
Примечание
multiprocessing
не содержит аналогов threading.active_count()
, threading.enumerate()
, threading.settrace()
, threading.setprofile()
, threading.Timer
, или threading.local
.
Объекты подключения¶
Объекты Connection позволяют отправлять и получать объекты или строки с возможностью выбора. Их можно рассматривать как подключенные сокеты, ориентированные на сообщения.
Объекты подключения обычно создаются с помощью Pipe
- смотрите также Слушатели и клиенты.
- class multiprocessing.connection.Connection¶
- send(obj)¶
Отправьте объект на другой конец соединения, который должен быть прочитан с помощью
recv()
.Объект должен быть пригоден для маринования. Очень большие размеры (примерно 32 МБ и более, хотя это зависит от операционной системы) могут вызвать исключение
ValueError
.
- recv()¶
Возвращает объект, отправленный с другого конца соединения, используя
send()
. Блокирует до тех пор, пока не появится что-либо для получения. ВозвращаетEOFError
, если больше нечего получать, а другой конец был закрыт.
- fileno()¶
Возвращает файловый дескриптор или дескриптор, используемый соединением.
- close()¶
Закройте соединение.
Это вызывается автоматически, когда соединение подвергается сборке мусора.
- poll([timeout])¶
Возвращает, доступны ли какие-либо данные для чтения.
Если значение timeout не указано, то оно будет возвращено немедленно. Если значение timeout является числом, то оно указывает максимальное время блокировки в секундах. Если значение timeout равно
None
, то используется бесконечный тайм-аут.Обратите внимание, что с помощью
multiprocessing.connection.wait()
можно опросить сразу несколько объектов подключения.
- send_bytes(buffer[, offset[, size]])¶
Отправьте байтовые данные из bytes-like object в виде полного сообщения.
Если задано значение offset, то данные считываются из этой позиции в buffer. Если задан size, то из буфера будет считано именно это количество байт. Очень большие буферы (приблизительно 32 Мб+, хотя это зависит от операционной системы) могут вызвать исключение
ValueError
- recv_bytes([maxlength])¶
Возвращает полное сообщение, состоящее из байтовых данных, отправленных с другого конца соединения в виде строки. Блокируется до тех пор, пока не появится что-либо для приема. Возвращает значение
EOFError
, если принимать больше нечего и другой конец закрыт.Если указано значение maxlength и сообщение длиннее, чем maxlength, то выводится значение
OSError
и соединение больше не будет доступно для чтения.
- recv_bytes_into(buffer[, offset])¶
Считывает в буфер полное сообщение, состоящее из байтовых данных, отправленных с другого конца соединения, и возвращает количество байт в сообщении. Блокирует до тех пор, пока не появится что-либо для получения. Увеличивает значение
EOFError
, если больше нечего получать, а другой конец был закрыт.буфер должен быть доступен для записи bytes-like object. Если задано значение offset, то сообщение будет записано в буфер с этой позиции. Значение Offset должно быть неотрицательным целым числом, меньшим длины buffer (в байтах).
Если буфер слишком короткий, то возникает исключение
BufferTooShort
и полное сообщение доступно в видеe.args[0]
, гдеe
- это экземпляр исключения.
Изменено в версии 3.3: Сами объекты подключения теперь могут передаваться между процессами с помощью
Connection.send()
иConnection.recv()
.Объекты Connection также теперь поддерживают протокол управления контекстом - смотрите Типы контекстных менеджеров.
__enter__()
возвращает объект connection, а__exit__()
вызываетclose()
.
Например:
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
Предупреждение
Метод Connection.recv()
автоматически удаляет данные, которые он получает, что может представлять угрозу безопасности, если вы не доверяете процессу, отправившему сообщение.
Следовательно, если объект connection не был создан с использованием Pipe()
, вам следует использовать методы recv()
и send()
только после выполнения какой-либо проверки подлинности. Смотрите Ключи аутентификации.
Предупреждение
Если процесс прерывается при попытке чтения или записи в канал, то данные в канале, скорее всего, будут повреждены, поскольку может оказаться невозможным определить, где находятся границы сообщений.
Примитивы синхронизации¶
Как правило, примитивы синхронизации не так необходимы в многопроцессорной программе, как в многопоточной. Смотрите документацию по модулю threading
.
Обратите внимание, что можно также создавать примитивы синхронизации с помощью объекта manager - см. Менеджеры.
- class multiprocessing.Barrier(parties[, action[, timeout]])¶
Объект барьера: клон
threading.Barrier
.Добавлено в версии 3.3.
- class multiprocessing.BoundedSemaphore([value])¶
Ограниченный объект-семафор: близкий аналог
threading.BoundedSemaphore
.Существует единственное отличие от его близкой аналогии: первый аргумент его метода
acquire
называется block, что согласуется сLock.acquire()
.Примечание
В macOS это неотличимо от
Semaphore
, потому чтоsem_getvalue()
не реализован на этой платформе.
- class multiprocessing.Condition([lock])¶
Условная переменная: псевдоним для
threading.Condition
.Если указан lock, то это должен быть
Lock
илиRLock
объект изmultiprocessing
.Изменено в версии 3.3: Был добавлен метод
wait_for()
.
- class multiprocessing.Event¶
Клон
threading.Event
.
- class multiprocessing.Lock¶
Объект нерекурсивной блокировки: близкий аналог
threading.Lock
. Как только процесс или поток получает блокировку, последующие попытки получить ее от любого процесса или потока будут блокироваться до тех пор, пока она не будет снята; любой процесс или поток может снять блокировку. Концепции и поведениеthreading.Lock
в том виде, в каком они применимы к потокам, воспроизведены здесь вmultiprocessing.Lock
в том виде, в каком они применимы как к процессам, так и к потокам, за исключением отмеченных случаев.Обратите внимание, что
Lock
на самом деле является фабричной функцией, которая возвращает экземплярmultiprocessing.synchronize.Lock
, инициализированный контекстом по умолчанию.Lock
поддерживает протокол context manager и, таким образом, может использоваться в инструкцияхwith
.- acquire(block=True, timeout=None)¶
Получите блокировку, блокирующую или неблокирующую.
Если аргументу block присвоено значение
True
(по умолчанию), вызов метода будет блокироваться до тех пор, пока блокировка не перейдет в разблокированное состояние, затем установите для него значение locked и вернитеTrue
. Обратите внимание, что имя этого первого аргумента отличается от имени вthreading.Lock.acquire()
.Если аргументу block присвоено значение
False
, вызов метода не блокируется. Если блокировка в данный момент находится в заблокированном состоянии, вернитеFalse
; в противном случае установите блокировку в заблокированное состояние и вернитеTrue
.При вызове с положительным значением с плавающей запятой для параметра timeout, блокируйте не более чем на количество секунд, указанное в параметре timeout, до тех пор, пока блокировка не будет получена. Вызовы с отрицательным значением параметра timeout эквивалентны нулевому времени ожидания. Вызовы со значением timeout, равным
None
(по умолчанию), устанавливают бесконечный период ожидания. Обратите внимание, что обработка отрицательных значений илиNone
для timeout отличается от реализованного поведения вthreading.Lock.acquire()
. Аргумент timeout не имеет практического значения, если аргументу block присвоено значениеFalse
и, таким образом, игнорируется. ВозвращаетTrue
, если блокировка была получена, илиFalse
, если истек тайм-аут.
- release()¶
Снимите блокировку. Это может быть вызвано из любого процесса или потока, а не только из процесса или потока, которые изначально получили блокировку.
Поведение такое же, как в
threading.Lock.release()
, за исключением того, что при вызове разблокированной блокировки возникаетValueError
.
- class multiprocessing.RLock¶
Объект рекурсивной блокировки: близкий аналог
threading.RLock
. Рекурсивная блокировка должна быть снята процессом или потоком, который ее получил. Как только процесс или поток получает рекурсивную блокировку, тот же самый процесс или поток может получить ее снова без блокировки; этот процесс или поток должен освобождать ее один раз за каждый раз, когда она была получена.Обратите внимание, что
RLock
на самом деле является фабричной функцией, которая возвращает экземплярmultiprocessing.synchronize.RLock
, инициализированный контекстом по умолчанию.RLock
поддерживает протокол context manager и, таким образом, может использоваться в инструкцияхwith
.- acquire(block=True, timeout=None)¶
Получите блокировку, блокирующую или неблокирующую.
При вызове с аргументом block, равным
True
, блокируйте до тех пор, пока блокировка не перейдет в разблокированное состояние (не будет принадлежать какому-либо процессу или потоку), если только блокировка уже не принадлежит текущему процессу или потоку. Затем текущий процесс или поток становится владельцем блокировки (если у него еще нет владельца), и уровень рекурсии внутри блокировки увеличивается на единицу, в результате чего возвращается значениеTrue
. Обратите внимание, что в поведении этого первого аргумента есть несколько отличий по сравнению с реализациейthreading.RLock.acquire()
, начиная с имени самого аргумента.При вызове с аргументом block, равным
False
, не блокируйте. Если блокировка уже была получена (и, следовательно, принадлежит) другому процессу или потоку, текущий процесс или поток не становится владельцем, и уровень рекурсии в пределах блокировки не изменяется, что приводит к возвращаемому значениюFalse
. Если блокировка находится в разблокированном состоянии, текущий процесс или поток становится владельцем, и уровень рекурсии увеличивается, в результате чего возвращается значениеTrue
.Использование и поведение аргумента timeout такие же, как в
Lock.acquire()
. Обратите внимание, что некоторые из этих действий timeout отличаются от реализованных вthreading.RLock.acquire()
.
- release()¶
Снимите блокировку, уменьшив уровень рекурсии. Если после уменьшения уровень рекурсии равен нулю, сбросьте блокировку на разблокированную (не принадлежащую ни одному процессу или потоку), и если какие-либо другие процессы или потоки заблокированы в ожидании разблокировки блокировки, разрешите продолжить работу только одному из них. Если после уменьшения уровень рекурсии по-прежнему отличен от нуля, блокировка остается заблокированной и принадлежит вызывающему процессу или потоку.
Вызывайте этот метод только в том случае, если блокировка принадлежит вызывающему процессу или потоку. Значение
AssertionError
возникает, если этот метод вызывается процессом или потоком, отличным от владельца, или если блокировка находится в незаблокированном (не принадлежащем) состоянии. Обратите внимание, что тип исключения, возникающего в этой ситуации, отличается от реализованного поведения вthreading.RLock.release()
.
- class multiprocessing.Semaphore([value])¶
Объект-семафор: близкий аналог
threading.Semaphore
.Существует единственное отличие от его близкой аналогии: первый аргумент его метода
acquire
называется block, что согласуется сLock.acquire()
.
Примечание
В Mac OS sem_timedwait
не поддерживается, поэтому вызов acquire()
с тайм-аутом будет имитировать поведение этой функции, используя спящий цикл.
Примечание
Если сигнал SIGINT, сгенерированный Ctrl-C, поступает в то время, когда основной поток заблокирован вызовом BoundedSemaphore.acquire()
, Lock.acquire()
, RLock.acquire()
, Semaphore.acquire()
, Condition.acquire()
или Condition.wait()
, то вызов будет немедленно прерван и будет запущен KeyboardInterrupt
.
Это отличается от поведения threading
, где SIGINT будет игнорироваться во время выполнения эквивалентных вызовов блокировки.
Примечание
Для некоторых функций этого пакета требуется функционирующая реализация общего семафора в операционной системе хоста. Без этого модуль multiprocessing.synchronize
будет отключен, а попытки его импорта приведут к ошибке ImportError
. Дополнительную информацию смотрите в разделе bpo-3770.
Менеджеры¶
Менеджеры предоставляют способ создания данных, которые могут совместно использоваться различными процессами, включая совместное использование по сети между процессами, запущенными на разных компьютерах. Объект manager управляет серверным процессом, который управляет общими объектами. Другие процессы могут получать доступ к общим объектам с помощью прокси-серверов.
- multiprocessing.Manager()¶
Возвращает запущенный
SyncManager
объект, который может использоваться для совместного использования объектов между процессами. Возвращаемый объект manager соответствует созданному дочернему процессу и имеет методы, которые будут создавать общие объекты и возвращать соответствующие прокси.
Управляющие процессы будут завершены, как только они завершат сборку мусора или завершат работу их родительского процесса. Классы управляющих определены в модуле multiprocessing.managers
:
- class multiprocessing.managers.BaseManager(address=None, authkey=None, serializer='pickle', ctx=None, *, shutdown_timeout=1.0)¶
Создайте объект BaseManager.
После создания следует вызвать
start()
илиget_server().serve_forever()
, чтобы убедиться, что объект manager ссылается на запущенный процесс manager.address - это адрес, по которому процесс manager прослушивает новые соединения. Если address равен
None
, то выбирается произвольный адрес.authkey - это ключ аутентификации, который будет использоваться для проверки правильности входящих подключений к серверному процессу. Если значение authkey равно
None
, то используетсяcurrent_process().authkey
. В противном случае используется authkey, и это должна быть строка в байтах.сериализатор должен быть
'pickle'
(используйтеpickle
сериализацию) или'xmlrpclib'
(используйтеxmlrpc.client
сериализацию).ctx - это объект контекста, или
None
(используйте текущий контекст). Смотрите функциюget_context()
.shutdown_timeout - это время ожидания в секундах, используемое для завершения процесса, используемого менеджером в методе
shutdown()
. Если время ожидания истекло, процесс завершается. Если время завершения процесса также истекает, процесс завершается.Изменено в версии 3.11: Добавлен параметр shutdown_timeout.
- start([initializer[, initargs]])¶
Запустите подпроцесс, чтобы запустить менеджер. Если инициализатор не равен
None
, то при запуске подпроцесса будет вызванinitializer(*initargs)
.
- get_server()¶
Возвращает объект
Server
, который представляет фактический сервер, находящийся под управлением администратора. ОбъектServer
поддерживает методserve_forever()
:>>> from multiprocessing.managers import BaseManager >>> manager = BaseManager(address=('', 50000), authkey=b'abc') >>> server = manager.get_server() >>> server.serve_forever()
Server
дополнительно имеет атрибутaddress
.
- connect()¶
Подключите объект локального менеджера к процессу удаленного менеджера:
>>> from multiprocessing.managers import BaseManager >>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc') >>> m.connect()
- shutdown()¶
Остановите процесс, используемый менеджером. Это доступно только в том случае, если для запуска серверного процесса было использовано значение
start()
.Это может быть вызвано несколько раз.
- register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])¶
Метод класса, который может быть использован для регистрации типа или вызываемого объекта в классе manager.
typeid - это «идентификатор типа», который используется для идентификации определенного типа общего объекта. Это должна быть строка.
callable - это вызываемый объект, используемый для создания объектов с этим идентификатором типа. Если экземпляр manager будет подключен к серверу с помощью метода
connect()
или если аргумент create_method равенFalse
, то это можно оставить какNone
.proxytype - это подкласс
BaseProxy
, который используется для создания прокси-серверов для совместно используемых объектов с этим typeid. ЕслиNone
, то класс прокси-серверов создается автоматически.exposed используется для указания последовательности имен методов, доступ к которым должен быть разрешен прокси-серверам для этого typeid с помощью
BaseProxy._callmethod()
. (Если значение exposed равноNone
, то вместо него используетсяproxytype._exposed_
, если оно существует.) В случае, когда открытый список не указан, все «общедоступные методы» общего объекта будут доступны. (Здесь «открытый метод» означает любой атрибут, который имеет метод__call__()
и имя которого не начинается с'_'
.)method_to_typeid - это сопоставление, используемое для указания типа возвращаемого значения тех открытых методов, которые должны возвращать прокси-сервер. Оно сопоставляет имена методов со строками typeid. (Если method_to_typeid равен
None
, то вместо него используетсяproxytype._method_to_typeid_
, если он существует.) Если имя метода не является ключом к этому отображению или если отображение равноNone
, то объект, возвращаемый методом, будет скопирован по значению.create_method определяет, следует ли создавать метод с именем typeid, который может использоваться для указания серверному процессу создать новый общий объект и вернуть прокси-сервер для него. По умолчанию это
True
.
BaseManager
экземпляры также имеют одно свойство, доступное только для чтения:- address¶
Адрес, используемый менеджером.
Изменено в версии 3.3: Объекты Manager поддерживают протокол управления контекстом - смотрите Типы контекстных менеджеров.
__enter__()
запускает серверный процесс (если он еще не запущен), а затем возвращает объект manager.__exit__()
вызываетshutdown()
.В предыдущих версиях
__enter__()
не запускался серверный процесс менеджера, если он еще не был запущен.
- class multiprocessing.managers.SyncManager¶
Подкласс
BaseManager
, который может использоваться для синхронизации процессов. Объекты этого типа возвращаются с помощьюmultiprocessing.Manager()
.Его методы создают и возвращают Прокси-объекты для ряда часто используемых типов данных, которые должны быть синхронизированы между процессами. Это, в частности, включает в себя общие списки и словари.
- Barrier(parties[, action[, timeout]])¶
Создайте общий объект
threading.Barrier
и верните для него прокси-сервер.Добавлено в версии 3.3.
- BoundedSemaphore([value])¶
Создайте общий объект
threading.BoundedSemaphore
и верните для него прокси-сервер.
- Condition([lock])¶
Создайте общий объект
threading.Condition
и верните для него прокси-сервер.Если указан параметр lock, то это должен быть прокси-сервер для объекта
threading.Lock
илиthreading.RLock
.Изменено в версии 3.3: Был добавлен метод
wait_for()
.
- Event()¶
Создайте общий объект
threading.Event
и верните для него прокси-сервер.
- Lock()¶
Создайте общий объект
threading.Lock
и верните для него прокси-сервер.
- Queue([maxsize])¶
Создайте общий объект
queue.Queue
и верните для него прокси-сервер.
- RLock()¶
Создайте общий объект
threading.RLock
и верните для него прокси-сервер.
- Semaphore([value])¶
Создайте общий объект
threading.Semaphore
и верните для него прокси-сервер.
- Array(typecode, sequence)¶
Создайте массив и верните для него прокси-сервер.
- Value(typecode, value)¶
Создайте объект с доступным для записи атрибутом
value
и верните для него прокси-сервер.
Изменено в версии 3.6: Общие объекты могут быть вложенными. Например, общий контейнерный объект, такой как общий список, может содержать другие общие объекты, которые будут управляться и синхронизироваться с помощью
SyncManager
.
- class multiprocessing.managers.Namespace¶
Тип, который может регистрироваться с помощью
SyncManager
.Объект пространства имен не имеет открытых методов, но имеет атрибуты, доступные для записи. В его представлении отображаются значения его атрибутов.
Однако при использовании прокси-сервера для объекта пространства имен атрибут, начинающийся с
'_'
, будет атрибутом прокси-сервера, а не атрибутом референта:>>> manager = multiprocessing.Manager() >>> Global = manager.Namespace() >>> Global.x = 10 >>> Global.y = 'hello' >>> Global._z = 12.3 # this is an attribute of the proxy >>> print(Global) Namespace(x=10, y='hello')
Индивидуальные менеджеры¶
Чтобы создать свой собственный менеджер, пользователь создает подкласс BaseManager
и использует метод класса register()
для регистрации новых типов или вызываемых объектов в классе manager. Например:
from multiprocessing.managers import BaseManager
class MathsClass:
def add(self, x, y):
return x + y
def mul(self, x, y):
return x * y
class MyManager(BaseManager):
pass
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
with MyManager() as manager:
maths = manager.Maths()
print(maths.add(4, 3)) # prints 7
print(maths.mul(7, 8)) # prints 56
Использование удаленного менеджера¶
Можно запустить сервер управления на одном компьютере и попросить клиентов использовать его с других компьютеров (при условии, что соответствующие брандмауэры позволяют это).
Выполнение следующих команд создает сервер для единой общей очереди, доступ к которой могут получить удаленные клиенты:
>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
Один клиент может получить доступ к серверу следующим образом:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')
Другой клиент также может использовать его:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'
Локальные процессы также могут получить доступ к этой очереди, используя приведенный выше код на клиенте для удаленного доступа к ней:
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
... def __init__(self, q):
... self.q = q
... super().__init__()
... def run(self):
... self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
Прокси-объекты¶
Прокси-сервер - это объект, который ссылается на общий объект, который находится (предположительно) в другом процессе. Общий объект называется «референтом» прокси-сервера. Несколько прокси-объектов могут иметь одну и ту же ссылку.
У прокси-объекта есть методы, которые вызывают соответствующие методы его референта (хотя не каждый метод референта обязательно будет доступен через прокси-сервер). Таким образом, прокси-сервер можно использовать так же, как и его референт:
>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
Обратите внимание, что применение str()
к прокси-серверу вернет представление референта, тогда как применение repr()
вернет представление прокси-сервера.
Важной особенностью прокси-объектов является то, что они доступны для выбора, поэтому их можно передавать между процессами. Таким образом, референт может содержать Прокси-объекты. Это позволяет размещать эти управляемые списки, диктовки и другие Прокси-объекты:
>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b) # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']
Аналогично, прокси-серверы dict и list могут быть вложены друг в друга:
>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}
Если в референте содержатся стандартные (не являющиеся прокси-сервером) объекты list
или dict
, изменения этих изменяемых значений не будут распространяться через менеджер, поскольку прокси-сервер не имеет возможности узнать, когда изменяются значения, содержащиеся в нем. Однако сохранение значения в прокси-сервере контейнера (которое запускает __setitem__
для прокси-объекта) распространяется через менеджер, и поэтому для эффективного изменения такого элемента можно повторно присвоить измененное значение прокси-серверу контейнера:
# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d
Этот подход, возможно, менее удобен, чем использование вложенного Прокси-объекты для большинства случаев использования, но также демонстрирует уровень контроля над синхронизацией.
Примечание
Типы прокси-серверов в multiprocessing
не поддерживают сравнение по значению. Так, например, у нас есть:
>>> manager.list([1,2,3]) == [1,2,3]
False
Вместо этого при проведении сравнений следует просто использовать копию референта.
- class multiprocessing.managers.BaseProxy¶
Прокси-объекты являются экземплярами подклассов типа
BaseProxy
.- _callmethod(methodname[, args[, kwds]])¶
Вызовите и верните результат метода референта прокси-сервера.
Если
proxy
является прокси-сервером, референтом которого являетсяobj
, то выражениеproxy._callmethod(methodname, args, kwds)
вычислит выражение
getattr(obj, methodname)(*args, **kwds)
в процессе работы менеджера.
Возвращаемое значение будет копией результата вызова или прокси-сервером для нового общего объекта - смотрите документацию для аргумента method_to_typeid
BaseManager.register()
.Если вызов вызывает исключение, оно повторно вызывается с помощью
_callmethod()
. Если в процессе менеджера возникает какое-либо другое исключение, оно преобразуется в исключениеRemoteError
и вызывается с помощью_callmethod()
.Обратите особое внимание на то, что исключение будет вызвано, если methodname не было предоставлено.
Пример использования
_callmethod()
:>>> l = manager.list(range(10)) >>> l._callmethod('__len__') 10 >>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7] [2, 3, 4, 5, 6] >>> l._callmethod('__getitem__', (20,)) # equivalent to l[20] Traceback (most recent call last): ... IndexError: list index out of range
- _getvalue()¶
Верните копию референта.
Если референт недоступен для выбора, то это вызовет исключение.
- __repr__()¶
Возвращает представление прокси-объекта.
- __str__()¶
Возвращает представление референта.
Уборка¶
Прокси-объект использует обратный вызов weakref, так что, когда он получает собранный мусор, он отменяет регистрацию в менеджере, которому принадлежит его референт.
Общий объект удаляется из процесса управления, когда на него больше не ссылаются прокси-серверы.
Пулы процессов¶
Можно создать пул процессов, которые будут выполнять задачи, переданные ему с помощью класса Pool
.
- class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])¶
Объект пула процессов, который управляет пулом рабочих процессов, которым могут быть отправлены задания. Он поддерживает асинхронные результаты с тайм-аутами и обратными вызовами и имеет параллельную реализацию map.
processes - это количество используемых рабочих процессов. Если значение processes равно
None
, то используется число, возвращаемое параметромos.cpu_count()
.Если инициализатор не равен
None
, то каждый рабочий процесс будет вызыватьinitializer(*initargs)
при запуске.maxtasksperchild - это количество задач, которые может выполнить рабочий процесс, прежде чем он завершит работу и будет заменен новым рабочим процессом, чтобы освободить неиспользуемые ресурсы. Значение по умолчанию maxtasksperchild равно
None
, что означает, что рабочие процессы будут жить столько, сколько существует пул.context может использоваться для указания контекста, используемого для запуска рабочих процессов. Обычно пул создается с помощью функции
multiprocessing.Pool()
или методаPool()
объекта context. В обоих случаях context задается соответствующим образом.Обратите внимание, что методы объекта pool должны вызываться только процессом, создавшим пул.
Предупреждение
multiprocessing.pool
объекты имеют внутренние ресурсы, которыми необходимо должным образом управлять (как и любым другим ресурсом), используя пул в качестве контекстного менеджера или вызываяclose()
иterminate()
вручную. Невыполнение этого требования может привести к тому, что процесс зависнет в процессе завершения.Обратите внимание, что ** неправильно ** полагаться на сборщик мусора для уничтожения пула, поскольку CPython не гарантирует, что будет вызван финализатор пула (см.
object.__del__()
для получения дополнительной информации).Изменено в версии 3.2: Добавлен параметр maxtasksperchild.
Изменено в версии 3.4: Добавлен параметр context.
Примечание
Рабочие процессы в пределах
Pool
обычно выполняются в течение всего срока действия рабочей очереди пула. Часто встречающийся в других системах (таких как Apache, mod_wsgi и т.д.) способ освобождения ресурсов, удерживаемых рабочими, заключается в том, чтобы позволить рабочему в пуле выполнить только определенный объем работы перед выходом, очисткой и запуском нового процесса для замены старого. Аргумент maxtasksperchild для параметраPool
предоставляет эту возможность конечному пользователю.- apply(func[, args[, kwds]])¶
Вызовите func с аргументами args и ключевыми словами kwds. Он блокируется до тех пор, пока результат не будет готов. Учитывая эти блоки,
apply_async()
лучше подходит для параллельного выполнения работы. Кроме того, func выполняется только в одном из рабочих элементов пула.
- apply_async(func[, args[, kwds[, callback[, error_callback]]]])¶
Вариант метода
apply()
, который возвращает объектAsyncResult
.Если указано значение callback, то оно должно быть вызываемым и принимать единственный аргумент. Когда результат становится готовым, к нему применяется callback, если только вызов не завершился неудачей, и в этом случае вместо него применяется error_callback.
Если указано значение error_callback, то оно должно быть вызываемым и принимать единственный аргумент. Если целевая функция завершается ошибкой, то вызывается error_callback с экземпляром exception.
Обратные вызовы должны завершаться немедленно, так как в противном случае поток, обрабатывающий результаты, будет заблокирован.
- map(func, iterable[, chunksize])¶
Параллельный эквивалент встроенной функции
map()
(хотя она поддерживает только один повторяемый аргумент, для нескольких повторяемых значений см.starmap()
). Она блокируется до тех пор, пока результат не будет готов.Этот метод разбивает итерацию на несколько блоков, которые передаются в пул процессов в качестве отдельных задач. (Приблизительный) размер этих блоков можно задать, установив значение chunksize равным целому положительному числу.
Обратите внимание, что это может привести к интенсивному использованию памяти для очень длинных итераций. Рассмотрите возможность использования
imap()
илиimap_unordered()
с явной опцией chunksize для повышения эффективности.
- map_async(func, iterable[, chunksize[, callback[, error_callback]]])¶
Вариант метода
map()
, который возвращает объектAsyncResult
.Если указано значение callback, то оно должно быть вызываемым и принимать единственный аргумент. Когда результат становится готовым, к нему применяется callback, если только вызов не завершился неудачей, и в этом случае вместо него применяется error_callback.
Если указано значение error_callback, то оно должно быть вызываемым и принимать единственный аргумент. Если целевая функция завершается ошибкой, то вызывается error_callback с экземпляром exception.
Обратные вызовы должны завершаться немедленно, так как в противном случае поток, обрабатывающий результаты, будет заблокирован.
- imap(func, iterable[, chunksize])¶
Более ленивая версия
map()
.Аргумент chunksize такой же, как и тот, который используется методом
map()
. Для очень длинных итераций использование большого значения для chunksize может значительно ускорить выполнение задания, чем использование значения по умолчанию1
.Также, если значение chunksize равно
1
, то методnext()
итератора, возвращаемый методомimap()
, имеет необязательный параметр timeout:next(timeout)
вызоветmultiprocessing.TimeoutError
если результат не может быть возвращен в течение таймаута секунд.
- imap_unordered(func, iterable[, chunksize])¶
То же, что и
imap()
, за исключением того, что порядок следования результатов из возвращаемого итератора следует считать произвольным. (Только при наличии только одного рабочего процесса порядок гарантированно будет «правильным».)
- starmap(func, iterable[, chunksize])¶
Как и
map()
, за исключением того, что элементы iterable должны быть повторяемыми, которые распаковываются как аргументы.Следовательно, * повторяемое* значение
[(1,2), (3, 4)]
приводит к[func(1,2), func(3,4)]
.Добавлено в версии 3.3.
- starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])¶
Комбинация
starmap()
иmap_async()
, которая выполняет итерацию по iterable из iterables и вызывает func с распакованными iterables. Возвращает результирующий объект.Добавлено в версии 3.3.
- close()¶
Предотвращает отправку дополнительных задач в пул. Как только все задачи будут выполнены, рабочие процессы завершатся.
- terminate()¶
Немедленно останавливает рабочие процессы, не завершая невыполненную работу. При сборке мусора для объекта пула будет немедленно вызван
terminate()
.
- join()¶
Дождитесь завершения рабочих процессов. Перед использованием
join()
необходимо вызватьclose()
илиterminate()
.
Изменено в версии 3.3: Объекты пула теперь поддерживают протокол управления контекстом - смотрите Типы контекстных менеджеров.
__enter__()
возвращает объект пула, а__exit__()
вызываетterminate()
.
- class multiprocessing.pool.AsyncResult¶
Класс результата, возвращаемого
Pool.apply_async()
иPool.map_async()
.- get([timeout])¶
Верните результат, когда он будет получен. Если значение timeout не равно
None
и результат не будет получен в течение времени ожидания секунд, то значениеmultiprocessing.TimeoutError
будет увеличено. Если удаленный вызов вызвал исключение, то это исключение будет вызваноget()
.
- wait([timeout])¶
Подождите, пока не появится результат или пока не пройдет тайм-аут секунд.
- ready()¶
Возвращает, был ли вызов завершен.
- successful()¶
Возвращает, завершился ли вызов без возникновения исключения. Вызовет
ValueError
, если результат не готов.Изменено в версии 3.7: Если результат не готов, то вместо
AssertionError
выводитсяValueError
.
Следующий пример демонстрирует использование пула:
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
with Pool(processes=4) as pool: # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print(next(it)) # prints "0"
print(next(it)) # prints "1"
print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print(result.get(timeout=1)) # raises multiprocessing.TimeoutError
Слушатели и клиенты¶
Обычно передача сообщений между процессами осуществляется с помощью очередей или с помощью объектов Connection
, возвращаемых Pipe()
.
Однако модуль multiprocessing.connection
обеспечивает некоторую дополнительную гибкость. По сути, он предоставляет высокоуровневый API, ориентированный на сообщения, для работы с сокетами или именованными каналами Windows. Он также поддерживает дайджест-аутентификацию с использованием модуля hmac
и одновременный опрос нескольких подключений.
- multiprocessing.connection.deliver_challenge(connection, authkey)¶
Отправьте случайно сгенерированное сообщение на другой конец соединения и дождитесь ответа.
Если ответ соответствует дайджесту сообщения, в котором в качестве ключа используется authkey, на другой конец соединения отправляется приветственное сообщение. В противном случае выводится значение
AuthenticationError
.
- multiprocessing.connection.answer_challenge(connection, authkey)¶
Получите сообщение, вычислите дайджест сообщения, используя authkey в качестве ключа, а затем отправьте дайджест обратно.
Если приветственное сообщение не получено, то выводится значение
AuthenticationError
.
- multiprocessing.connection.Client(address[, family[, authkey]])¶
Попытайтесь установить соединение с прослушивателем, который использует address адрес, возвращая значение
Connection
.Тип соединения определяется аргументом family, но его обычно можно опустить, поскольку обычно его можно определить по формату address. (см. Форматы адресов)
Если указан authkey, а не None, он должен быть байтовой строкой и будет использоваться в качестве секретного ключа для проверки подлинности на основе HMAC. Проверка подлинности не выполняется, если значение authkey равно None.
AuthenticationError
вызывается в случае сбоя проверки подлинности. Смотрите Ключи аутентификации.
- class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])¶
Оболочка для связанного сокета или именованного канала Windows, который «прослушивает» соединения.
address - это адрес, который будет использоваться связанным сокетом или именованным каналом объекта прослушивателя.
Примечание
Если используется адрес «0.0.0.0», этот адрес не будет подключаемой конечной точкой в Windows. Если вам требуется подключаемая конечная точка, вам следует использовать «127.0.0.1».
семейство - это тип используемого сокета (или именованного канала). Это может быть одна из строк
'AF_INET'
(для сокета TCP),'AF_UNIX'
( для сокета домена Unix) или'AF_PIPE'
(для именованного канала Windows). Из них гарантированно доступен только первый. Если значение family равноNone
, то значение семейства определяется на основе формата address. Если значение address также равноNone
, то выбирается значение по умолчанию. По умолчанию используется семейство, которое считается самым быстрым из доступных. Смотрите Форматы адресов. Обратите внимание, что если семейство равно'AF_UNIX'
, а адрес равенNone
, то сокет будет создан в частном временном каталоге, созданном с помощьюtempfile.mkstemp()
.Если объект прослушивателя использует сокет, то backlog (по умолчанию 1) передается методу
listen()
сокета, как только он будет привязан.Если указан authkey, а не None, он должен быть байтовой строкой и будет использоваться в качестве секретного ключа для проверки подлинности на основе HMAC. Проверка подлинности не выполняется, если значение authkey равно None.
AuthenticationError
вызывается в случае сбоя проверки подлинности. Смотрите Ключи аутентификации.- accept()¶
Примите соединение через связанный сокет или именованный канал объекта-прослушивателя и верните объект
Connection
. Если попытка аутентификации завершилась неудачей, то вызываетсяAuthenticationError
.
- close()¶
Закройте связанный сокет или именованный канал объекта прослушивателя. Это вызывается автоматически, когда для прослушивателя выполняется сборка мусора. Однако рекомендуется вызывать это явно.
Объекты прослушивателя обладают следующими свойствами, доступными только для чтения:
- address¶
Адрес, который используется объектом прослушивателя.
- last_accepted¶
Адрес, с которого было установлено последнее принятое соединение. Если этот адрес недоступен, то это
None
.
Изменено в версии 3.3: Объекты прослушивателя теперь поддерживают протокол управления контекстом - смотрите Типы контекстных менеджеров.
__enter__()
возвращает объект прослушивателя, а__exit__()
вызываетclose()
.
- multiprocessing.connection.wait(object_list, timeout=None)¶
Подождите, пока объект в object_list не будет готов. Возвращает список объектов в object_list, которые готовы. Если значение timeout является плавающим, то вызов блокируется не более чем на столько секунд. Если тайм-аут равен
None
, то он будет заблокирован на неограниченный период. Отрицательный тайм-аут эквивалентен нулевому таймауту.Как для Unix, так и для Windows объект может отображаться в object_list, если он
читаемый объект
Connection
;связанный и читаемый объект
socket.socket
; или
Объект соединения или сокета готов, когда из него доступны данные для чтения, или когда другой конец закрыт.
Unix:
wait(object_list, timeout)
почти эквивалентенselect.select(object_list, [], [], timeout)
. Разница в том, что, еслиselect.select()
прерывается сигналом, он может вызватьOSError
с номером ошибкиEINTR
, тогда какwait()
этого не произойдет.Windows: Элемент в object_list должен быть либо целочисленным дескриптором, который доступен для ожидания (в соответствии с определением, используемым в документации функции Win32
WaitForMultipleObjects()
), либо это может быть объект сfileno()
методом, который возвращает ручка для гнезда или ручка для трубы. (Обратите внимание, что ручки труб и разъемов не являются ручками, которые можно использовать для ожидания.)Добавлено в версии 3.3.
Примеры
Следующий серверный код создает прослушиватель, который использует 'secret password'
в качестве ключа аутентификации. Затем он ожидает подключения и отправляет некоторые данные клиенту:
from multiprocessing.connection import Listener
from array import array
address = ('localhost', 6000) # family is deduced to be 'AF_INET'
with Listener(address, authkey=b'secret password') as listener:
with listener.accept() as conn:
print('connection accepted from', listener.last_accepted)
conn.send([2.25, None, 'junk', float])
conn.send_bytes(b'hello')
conn.send_bytes(array('i', [42, 1729]))
Следующий код подключается к серверу и получает некоторые данные с сервера:
from multiprocessing.connection import Client
from array import array
address = ('localhost', 6000)
with Client(address, authkey=b'secret password') as conn:
print(conn.recv()) # => [2.25, None, 'junk', float]
print(conn.recv_bytes()) # => 'hello'
arr = array('i', [0, 0, 0, 0, 0])
print(conn.recv_bytes_into(arr)) # => 8
print(arr) # => array('i', [42, 1729, 0, 0, 0])
Следующий код использует wait()
для ожидания сообщений от нескольких процессов одновременно:
import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait
def foo(w):
for i in range(10):
w.send((i, current_process().name))
w.close()
if __name__ == '__main__':
readers = []
for i in range(4):
r, w = Pipe(duplex=False)
readers.append(r)
p = Process(target=foo, args=(w,))
p.start()
# We close the writable end of the pipe now to be sure that
# p is the only process which owns a handle for it. This
# ensures that when p closes its handle for the writable end,
# wait() will promptly report the readable end as being ready.
w.close()
while readers:
for r in wait(readers):
try:
msg = r.recv()
except EOFError:
readers.remove(r)
else:
print(msg)
Форматы адресов¶
Адрес
'AF_INET'
представляет собой кортеж вида(hostname, port)
, где имя хоста - это строка, а порт - целое число.Адрес
'AF_UNIX'
- это строка, представляющая имя файла в файловой системе.Адрес
'AF_PIPE'
представляет собой строку видаr'\\.\pipe\PipeName'
. Чтобы использоватьClient()
для подключения к именованному каналу на удаленном компьютере с именем ServerName, вместо этого следует использовать адрес видаr'\\ServerName\pipe\PipeName'
.
Обратите внимание, что любая строка, начинающаяся с двух обратных косых черт, по умолчанию считается адресом 'AF_PIPE'
, а не адресом 'AF_UNIX'
.
Ключи аутентификации¶
При использовании Connection.recv
полученные данные автоматически удаляются. К сожалению, удаление данных из ненадежного источника представляет угрозу безопасности. Поэтому Listener
и Client()
используют модуль hmac
для обеспечения дайджест-аутентификации.
Ключ аутентификации - это строка байтов, которую можно рассматривать как пароль: как только соединение установлено, обе стороны потребуют подтверждения того, что другая сторона знает ключ аутентификации. (Демонстрация того, что обе стороны используют один и тот же ключ, не требует отправки ключа по соединению.)
Если запрашивается аутентификация, но ключ аутентификации не указан, то используется возвращаемое значение current_process().authkey
(см. Process
). Это значение будет автоматически унаследовано любым объектом Process
, созданным текущим процессом. Это означает, что (по умолчанию) все процессы многопроцессорной программы будут совместно использовать единый ключ аутентификации, который можно использовать при установлении соединений между собой.
Подходящие ключи аутентификации также могут быть сгенерированы с помощью os.urandom()
.
Регистрация¶
Доступна некоторая поддержка ведения журнала. Однако обратите внимание, что пакет logging
не использует общие блокировки процессов, поэтому сообщения от разных процессов могут перепутываться (в зависимости от типа обработчика).
- multiprocessing.get_logger()¶
Возвращает логгер, используемый параметром
multiprocessing
. При необходимости будет создан новый.При первом создании регистратор имеет уровень
logging.NOTSET
и не имеет обработчика по умолчанию. Сообщения, отправленные в этот регистратор, по умолчанию не будут передаваться в корневой регистратор.Обратите внимание, что в Windows дочерние процессы наследуют только уровень регистратора родительского процесса - любые другие настройки регистратора наследоваться не будут.
- multiprocessing.log_to_stderr(level=None)¶
Эта функция выполняет вызов
get_logger()
, но в дополнение к возвращению регистратора, созданного get_logger, она добавляет обработчик, который отправляет выходные данные вsys.stderr
, используя формат'[%(levelname)s/%(processName)s] %(message)s'
. Вы можете изменитьlevelname
логгера, передав аргументlevel
.
Ниже приведен пример сеанса с включенным ведением журнала:
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0
Полную таблицу уровней ведения журнала смотрите в модуле logging
.
Модуль multiprocessing.dummy
¶
multiprocessing.dummy
повторяет API multiprocessing
, но является не более чем оболочкой для модуля threading
.
В частности, функция Pool
, предоставляемая multiprocessing.dummy
, возвращает экземпляр ThreadPool
, который является подклассом Pool
, который поддерживает все те же вызовы методов, но использует пул рабочих потоков, а не чем рабочие процессы.
- class multiprocessing.pool.ThreadPool([processes[, initializer[, initargs]]])¶
Объект пула потоков, который управляет пулом рабочих потоков, в которые могут быть отправлены задания. Экземпляры
ThreadPool
полностью совместимы по интерфейсу с экземплярамиPool
, и их ресурсами также необходимо надлежащим образом управлять, либо используя пул в качестве контекстного менеджера, либо вызываяclose()
иterminate()
вручную.processes - это количество используемых рабочих потоков. Если значение processes равно
None
, то используется число, возвращаемое параметромos.cpu_count()
.Если инициализатор не равен
None
, то каждый рабочий процесс будет вызыватьinitializer(*initargs)
при запуске.В отличие от
Pool
, maxtasksperchild и context не могут быть предоставлены.Примечание
Модуль
ThreadPool
использует тот же интерфейс, что иPool
, который разработан на основе пула процессов и появился до появления модуляconcurrent.futures
. Таким образом, он наследует некоторые операции, которые не имеют смысла для пула, поддерживаемого потоками, и у него есть свой собственный тип для представления статуса асинхронных заданий,AsyncResult
, который не понимается никакими другими библиотеками.Пользователям, как правило, следует предпочесть использовать
concurrent.futures.ThreadPoolExecutor
, который имеет более простой интерфейс, изначально разработанный с учетом потоков, и который возвращаетconcurrent.futures.Future
экземпляры, совместимые со многими другими библиотеками, включаяasyncio
.
Руководящие принципы программирования¶
Существуют определенные рекомендации и идиомы, которых следует придерживаться при использовании multiprocessing
.
Все методы запуска¶
Следующее относится ко всем методам запуска.
Избегайте общего состояния
Насколько это возможно, следует стараться избегать переноса больших объемов данных между процессами.
Вероятно, лучше всего придерживаться использования очередей или каналов связи для обмена данными между процессами, а не использовать примитивы синхронизации более низкого уровня.
Способность к травлению
Убедитесь, что аргументы методов прокси-серверов доступны для выбора.
Потокобезопасность прокси-серверов
Не используйте прокси-объект более чем из одного потока, если вы не защитили его блокировкой.
(Никогда не возникает проблем с разными процессами, использующими один и тот же прокси-сервер.)
Присоединение к процессам-зомби
В Unix, когда процесс завершается, но не был присоединен, он становится зомби. Их никогда не должно быть очень много, потому что каждый раз, когда запускается новый процесс (или вызывается
active_children()
), все завершенные процессы, которые еще не были присоединены, будут присоединены. Кроме того, вызов завершенного процессаProcess.is_alive
присоединится к процессу. Даже в этом случае, вероятно, будет хорошей практикой явно присоединяться ко всем процессам, которые вы запускаете.
Лучше унаследовать, чем мариновать/не мариновать
При использовании методов spawn или forkserver start многие типы из
multiprocessing
должны быть доступны для выбора, чтобы дочерние процессы могли их использовать. Однако, как правило, следует избегать отправки общих объектов другим процессам с использованием каналов или очередей. Вместо этого вы должны организовать программу таким образом, чтобы процесс, которому требуется доступ к общему ресурсу, созданному в другом месте, мог унаследовать его от процесса-предка.
Избегайте завершения процессов
Использование метода
Process.terminate
для остановки процесса может привести к тому, что любые общие ресурсы (такие как блокировки, семафоры, каналы и очереди), используемые в данный момент процессом, станут неработоспособными или недоступными для других процессов.Поэтому, вероятно, лучше всего использовать
Process.terminate
только для процессов, которые никогда не используют какие-либо общие ресурсы.
Объединение процессов, использующих очереди
Имейте в виду, что процесс, который поместил элементы в очередь, будет ожидать завершения до тех пор, пока все буферизованные элементы не будут переданы потоком «feeder» в базовый канал. (Дочерний процесс может вызвать
Queue.cancel_join_thread
метод очереди, чтобы избежать такого поведения.)Это означает, что всякий раз, когда вы используете очередь, вам необходимо убедиться, что все элементы, которые были помещены в очередь, в конечном итоге будут удалены до присоединения процесса. В противном случае вы не можете быть уверены, что процессы, которые поместили элементы в очередь, завершатся. Помните также, что недемонические процессы будут подключены автоматически.
Примером, который приведет к взаимоблокировке, является следующий:
from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # this deadlocks obj = queue.get()Исправлением здесь было бы поменять местами последние две строки (или просто удалить строку
p.join()
).
Явно передавать ресурсы дочерним процессам
В Unix, использующем метод fork start, дочерний процесс может использовать общий ресурс, созданный в родительском процессе с использованием глобального ресурса. Однако лучше передать объект в качестве аргумента конструктору дочернего процесса.
Помимо того, что код (потенциально) совместим с Windows и другими методами запуска, это также гарантирует, что, пока дочерний процесс все еще активен, объект не будет собираться мусором в родительском процессе. Это может быть важно, если какой-либо ресурс освобождается, когда объект собирается мусором в родительском процессе.
Так, например
from multiprocessing import Process, Lock def f(): ... do something using "lock" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f).start()следует переписать следующим образом
from multiprocessing import Process, Lock def f(l): ... do something using "l" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f, args=(lock,)).start()
Остерегайтесь замены sys.stdin
на «объект, подобный файлу».
multiprocessing
первоначально вызывался безоговорочно:os.close(sys.stdin.fileno())в методе
multiprocessing.Process._bootstrap()
— это привело к проблемам с процессами в процессах. Это было изменено на:sys.stdin.close() sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)Это решает фундаментальную проблему, связанную с тем, что процессы сталкиваются друг с другом, что приводит к ошибочному файловому дескриптору, но представляет потенциальную опасность для приложений, которые заменяют
sys.stdin()
«файлоподобным объектом» с буферизацией выходных данных. Эта опасность заключается в том, что если несколько процессов вызовутclose()
для этого файлообразного объекта, это может привести к многократному сбросу одних и тех же данных в объект, что приведет к повреждению.Если вы пишете объект, подобный файлу, и реализуете свое собственное кэширование, вы можете сделать его безопасным для разветвления, сохраняя pid всякий раз, когда вы добавляете его в кэш, и удаляя кэш при изменении pid. Например:
@property def cache(self): pid = os.getpid() if pid != self._pid: self._pid = pid self._cache = [] return self._cacheДля получения дополнительной информации смотрите bpo-5155, bpo-5313 и bpo-5331
Методы запуска spawn и forkserver¶
Существует несколько дополнительных ограничений, которые не применяются к методу запуска fork.
Большая разборчивость в выборе
Убедитесь, что все аргументы
Process.__init__()
доступны для выбора. Кроме того, если вы создаете подклассProcess
, убедитесь, что экземпляры будут доступны для выбора при вызове методаProcess.start
.
Глобальные переменные
Имейте в виду, что если код, запущенный в дочернем процессе, пытается получить доступ к глобальной переменной, то значение, которое он видит (если оно есть), может отличаться от значения в родительском процессе на момент вызова
Process.start
.Однако глобальные переменные, которые являются просто константами уровня модуля, не вызывают никаких проблем.
Безопасный импорт основного модуля
Убедитесь, что основной модуль может быть безопасно импортирован новым интерпретатором Python без возникновения непредвиденных побочных эффектов (таких как запуск нового процесса).
Например, при использовании метода spawn или forkserver start запуск следующего модуля завершится ошибкой с
RuntimeError
:from multiprocessing import Process def foo(): print('hello') p = Process(target=foo) p.start()Вместо этого следует защитить «точку входа» программы, используя
if __name__ == '__main__':
следующим образом:from multiprocessing import Process, freeze_support, set_start_method def foo(): print('hello') if __name__ == '__main__': freeze_support() set_start_method('spawn') p = Process(target=foo) p.start()(Строка
freeze_support()
может быть опущена, если программа будет запущена нормально, а не заморожена.)Это позволяет недавно созданному интерпретатору Python безопасно импортировать модуль и затем запустить функцию модуля
foo()
.Аналогичные ограничения применяются, если в главном модуле создан пул или менеджер.
Примеры¶
Демонстрация того, как создавать и использовать индивидуальные менеджеры и прокси-серверы:
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
##
class Foo:
def f(self):
print('you called Foo.f()')
def g(self):
print('you called Foo.g()')
def _h(self):
print('you called Foo._h()')
# A simple generator function
def baz():
for i in range(10):
yield i*i
# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
_exposed_ = ['__next__']
def __iter__(self):
return self
def __next__(self):
return self._callmethod('__next__')
# Function to return the operator module
def get_operator_module():
return operator
##
class MyManager(BaseManager):
pass
# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)
# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))
# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)
# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)
##
def test():
manager = MyManager()
manager.start()
print('-' * 20)
f1 = manager.Foo1()
f1.f()
f1.g()
assert not hasattr(f1, '_h')
assert sorted(f1._exposed_) == sorted(['f', 'g'])
print('-' * 20)
f2 = manager.Foo2()
f2.g()
f2._h()
assert not hasattr(f2, 'f')
assert sorted(f2._exposed_) == sorted(['g', '_h'])
print('-' * 20)
it = manager.baz()
for i in it:
print('<%d>' % i, end=' ')
print()
print('-' * 20)
op = manager.operator()
print('op.add(23, 45) =', op.add(23, 45))
print('op.pow(2, 94) =', op.pow(2, 94))
print('op._exposed_ =', op._exposed_)
##
if __name__ == '__main__':
freeze_support()
test()
Используя Pool
:
import multiprocessing
import time
import random
import sys
#
# Functions used by test code
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, args, result
)
def calculatestar(args):
return calculate(*args)
def mul(a, b):
time.sleep(0.5 * random.random())
return a * b
def plus(a, b):
time.sleep(0.5 * random.random())
return a + b
def f(x):
return 1.0 / (x - 5.0)
def pow3(x):
return x ** 3
def noop(x):
pass
#
# Test code
#
def test():
PROCESSES = 4
print('Creating pool with %d processes\n' % PROCESSES)
with multiprocessing.Pool(PROCESSES) as pool:
#
# Tests
#
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print('Ordered results using pool.apply_async():')
for r in results:
print('\t', r.get())
print()
print('Ordered results using pool.imap():')
for x in imap_it:
print('\t', x)
print()
print('Unordered results using pool.imap_unordered():')
for x in imap_unordered_it:
print('\t', x)
print()
print('Ordered results using pool.map() --- will block till complete:')
for x in pool.map(calculatestar, TASKS):
print('\t', x)
print()
#
# Test error handling
#
print('Testing error handling:')
try:
print(pool.apply(f, (5,)))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.apply()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(pool.map(f, list(range(10))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.map()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(list(pool.imap(f, list(range(10)))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from list(pool.imap())')
else:
raise AssertionError('expected ZeroDivisionError')
it = pool.imap(f, list(range(10)))
for i in range(10):
try:
x = next(it)
except ZeroDivisionError:
if i == 5:
pass
except StopIteration:
break
else:
if i == 5:
raise AssertionError('expected ZeroDivisionError')
assert i == 9
print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
print()
#
# Testing timeouts
#
print('Testing ApplyResult.get() with timeout:', end=' ')
res = pool.apply_async(calculate, TASKS[0])
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % res.get(0.02))
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
print('Testing IMapIterator.next() with timeout:', end=' ')
it = pool.imap(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.02))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
if __name__ == '__main__':
multiprocessing.freeze_support()
test()
Пример, показывающий, как использовать очереди для передачи задач в набор рабочих процессов и сбора результатов:
import time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
#
# Function used to calculate result
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
#
# Functions referenced by tasks
#
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
#
#
#
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) for i in range(20)]
TASKS2 = [(plus, (i, 8)) for i in range(10)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print('Unordered results:')
for i in range(len(TASKS1)):
print('\t', done_queue.get())
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print('\t', done_queue.get())
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()