multiprocessing
— Параллелизм на основе процессов¶
Исходный код: Lib/multiprocessing/.
Введение¶
multiprocessing
- это пакет, поддерживающий порождение процессов с помощью API, аналогичного модулю threading
. Пакет multiprocessing
предлагает локальный и удаленный параллелизм, эффективно обходя Global Interpreter Lock путем использования подпроцессов вместо потоков. Благодаря этому модуль multiprocessing
позволяет программисту полностью задействовать несколько процессоров на данной машине. Он работает как на Unix, так и на Windows.
Модуль multiprocessing
также вводит API, не имеющие аналогов в модуле threading
. Ярким примером этого является объект Pool
, который предлагает удобное средство распараллеливания выполнения функции по нескольким входным значениям, распределяя входные данные между процессами (параллелизм данных). Следующий пример демонстрирует обычную практику определения таких функций в модуле, чтобы дочерние процессы могли успешно импортировать этот модуль. Этот базовый пример параллелизма данных с использованием Pool
,
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
with Pool(5) as p:
print(p.map(f, [1, 2, 3]))
выведет на стандартный вывод
[1, 4, 9]
См.также
concurrent.futures.ProcessPoolExecutor
предлагает интерфейс более высокого уровня для передачи задач в фоновый процесс без блокирования выполнения вызывающего процесса. По сравнению с непосредственным использованием интерфейса Pool
, API concurrent.futures
позволяет отделить передачу работы в пул базовых процессов от ожидания результатов.
Класс Process
¶
В multiprocessing
процессы порождаются путем создания объекта Process
и последующего вызова его метода start()
. Process
следует API threading.Thread
. Тривиальным примером многопроцессной программы является
from multiprocessing import Process
def f(name):
print('hello', name)
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
Чтобы показать отдельные идентификаторы процессов, вот расширенный пример:
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
Для объяснения того, зачем нужна часть if __name__ == '__main__'
, смотрите Руководство по программированию.
Контексты и методы запуска¶
В зависимости от платформы, multiprocessing
поддерживает три способа запуска процесса. Этими способами запуска являются
- spawn
Родительский процесс запускает новый процесс интерпретатора Python. Дочерний процесс наследует только те ресурсы, которые необходимы для выполнения метода
run()
объекта process. В частности, ненужные дескрипторы файлов и хэндлы от родительского процесса не будут унаследованы. Запуск процесса с помощью этого метода довольно медленный по сравнению с использованием fork или forkserver.Доступен на Unix и Windows. По умолчанию в Windows и macOS.
- fork
Родительский процесс использует команду
os.fork()
для форка интерпретатора Python. Дочерний процесс, когда он запускается, фактически идентичен родительскому процессу. Все ресурсы родительского процесса наследуются дочерним процессом. Обратите внимание, что безопасное форкирование многопоточного процесса проблематично.Доступно только на Unix. По умолчанию в Unix.
- forkserver
Когда программа запускается и выбирает метод запуска forkserver, запускается серверный процесс. С этого момента всякий раз, когда требуется новый процесс, родительский процесс соединяется с сервером и запрашивает у него форк нового процесса. Процесс fork server является однопоточным, поэтому для него безопасно использовать
os.fork()
. Никакие ненужные ресурсы не наследуются.Доступен на платформах Unix, которые поддерживают передачу файловых дескрипторов через Unix pipes.
Изменено в версии 3.8: В macOS метод запуска spawn теперь используется по умолчанию. Метод запуска fork следует считать небезопасным, поскольку он может привести к сбою подпроцесса. См. bpo-33725.
Изменено в версии 3.4: spawn добавлен на всех платформах unix, а forkserver добавлен для некоторых платформ unix. Дочерние процессы больше не наследуют все ручки, наследуемые родителями, в Windows.
На Unix при использовании методов запуска spawn или forkserver также запускается процесс resource tracker, который отслеживает несвязанные именованные системные ресурсы (такие как именованные семафоры или объекты SharedMemory
), созданные процессами программы. Когда все процессы завершатся, программа отслеживания ресурсов отсоединяет все оставшиеся отслеживаемые объекты. Обычно их не должно быть, но если процесс был убит сигналом, могут остаться «утечки» ресурсов. (Ни утечка семафоров, ни сегменты общей памяти не будут автоматически отсоединены до следующей перезагрузки. Это проблематично для обоих объектов, поскольку система позволяет использовать только ограниченное количество именованных семафоров, а сегменты разделяемой памяти занимают некоторое пространство в основной памяти).
Для выбора метода запуска используется set_start_method()
в пункте if __name__ == '__main__'
главного модуля. Например:
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
set_start_method()
не должен использоваться более одного раза в программе.
В качестве альтернативы можно использовать get_context()
для получения контекстного объекта. Контекстные объекты имеют тот же API, что и модуль мультипроцессинга, и позволяют использовать несколько методов запуска в одной программе.
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
Обратите внимание, что объекты, относящиеся к одному контексту, могут быть несовместимы с процессами для другого контекста. В частности, блокировки, созданные с помощью контекста fork, не могут быть переданы процессам, запущенным с помощью методов spawn или forkserver start.
Библиотека, которая хочет использовать определенный метод запуска, вероятно, должна использовать get_context()
, чтобы не вмешиваться в выбор пользователя библиотеки.
Предупреждение
Методы запуска 'spawn'
и 'forkserver'
в настоящее время не могут быть использованы с «замороженными» исполняемыми файлами (т.е. двоичными файлами, созданными пакетами типа PyInstaller и cx_Freeze) на Unix. Метод запуска 'fork'
работает.
Обмен объектами между процессами¶
multiprocessing
поддерживает два типа канала связи между процессами:
Квоты
Класс
Queue
является почти клономqueue.Queue
. Например:from multiprocessing import Process, Queue def f(q): q.put([42, None, 'hello']) if __name__ == '__main__': q = Queue() p = Process(target=f, args=(q,)) p.start() print(q.get()) # prints "[42, None, 'hello']" p.join()Очереди безопасны для потоков и процессов.
Трубы
Функция
Pipe()
возвращает пару объектов соединения, соединенных трубой, которая по умолчанию является дуплексной (двусторонней). Например:from multiprocessing import Process, Pipe def f(conn): conn.send([42, None, 'hello']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(target=f, args=(child_conn,)) p.start() print(parent_conn.recv()) # prints "[42, None, 'hello']" p.join()Два объекта соединения, возвращаемые методом
Pipe()
, представляют собой два конца трубы. Каждый объект соединения имеет методыsend()
иrecv()
(среди прочих). Обратите внимание, что данные в трубе могут быть повреждены, если два процесса (или потока) пытаются одновременно читать из или записывать в один и тот же конец трубы. Конечно, нет никакого риска повреждения от процессов, использующих разные концы трубы в одно и то же время.
Синхронизация между процессами¶
multiprocessing
содержит эквиваленты всех примитивов синхронизации из threading
. Например, можно использовать блокировку, чтобы гарантировать, что только один процесс одновременно печатает на стандартный вывод:
from multiprocessing import Process, Lock
def f(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
if __name__ == '__main__':
lock = Lock()
for num in range(10):
Process(target=f, args=(lock, num)).start()
Без использования блокировки выходные данные различных процессов могут смешаться.
Использование резерва работников¶
Класс Pool
представляет собой пул рабочих процессов. Он имеет методы, которые позволяют перегружать задачи на рабочие процессы несколькими различными способами.
Например:
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# start 4 worker processes
with Pool(processes=4) as pool:
# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print(i)
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print(res.get(timeout=1)) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print(res.get(timeout=1)) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# make a single worker sleep for 10 secs
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
print("For the moment, the pool remains available for more work")
# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")
Обратите внимание, что методы пула должны использоваться только тем процессом, который его создал.
Примечание
Функциональность внутри этого пакета требует, чтобы модуль __main__
был импортируемым дочерними модулями. Это описано в Руководство по программированию, однако стоит указать на это здесь. Это означает, что некоторые примеры, такие как multiprocessing.pool.Pool
, не будут работать в интерактивном интерпретаторе. Например:
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
... return x*x
...
>>> with p:
... p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
(Если вы попробуете это сделать, то на самом деле будет выведено три полных трассировки, чередующихся полуслучайным образом, и тогда вам, возможно, придется как-то остановить родительский процесс).
Ссылка¶
Пакет multiprocessing
в основном повторяет API модуля threading
.
Process
и исключения¶
-
class
multiprocessing.
Process
(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)¶ Объекты процесса представляют деятельность, которая выполняется в отдельном процессе. Класс
Process
имеет эквиваленты всех методовthreading.Thread
.Конструктор всегда должен вызываться с аргументами в виде ключевых слов. group всегда должна быть
None
; она существует исключительно для совместимости сthreading.Thread
. target - это вызываемый объект, который будет вызван методомrun()
. По умолчанию это значение равноNone
, что означает, что ничего не вызывается. name - это имя процесса (подробнее см.name
). args - кортеж аргументов для целевого вызова. kwargs - словарь аргументов с ключевыми словами для целевого вызова. Если указано, то аргумент daemon, содержащий только ключевое слово, устанавливает флаг процессаdaemon
в значениеTrue
илиFalse
. ЕслиNone
(по умолчанию), то этот флаг будет унаследован от создающего процесса.По умолчанию никакие аргументы не передаются target.
Если подкласс переопределяет конструктор, он должен убедиться, что вызывает конструктор базового класса (
Process.__init__()
), прежде чем делать что-либо еще с процессом.Изменено в версии 3.3: Добавлен аргумент daemon.
-
run
()¶ Метод, представляющий деятельность процесса.
Вы можете переопределить этот метод в подклассе. Стандартный метод
run()
вызывает вызываемый объект, переданный в конструктор объекта в качестве целевого аргумента, если таковой имеется, с последовательными и ключевыми аргументами, взятыми из аргументов args и kwargs соответственно.
-
start
()¶ Запустите активность процесса.
Этот вызов должен выполняться не более одного раза для каждого объекта процесса. Он организует вызов метода
run()
объекта в отдельном процессе.
-
join
([timeout])¶ Если необязательный аргумент timeout равен
None
(по умолчанию), метод блокируется до тех пор, пока процесс, чей методjoin()
вызывается, не завершится. Если timeout - положительное число, метод блокирует не более timeout секунд. Обратите внимание, что метод возвращаетNone
, если процесс завершается или если метод завершает работу. Проверьтеexitcode
процесса, чтобы определить, завершился ли он.К процессу можно присоединяться много раз.
Процесс не может присоединиться к самому себе, поскольку это приведет к тупиковой ситуации. Ошибкой является попытка присоединиться к процессу до того, как он был запущен.
-
name
¶ Имя процесса. Имя - это строка, используемая только для целей идентификации. Оно не имеет семантики. Одно и то же имя может быть присвоено нескольким процессам.
Начальное имя задается конструктором. Если конструктору не предоставлено явное имя, то строится имя вида „Process-N1:N2:…:Nk“, где каждый Nk является N-м дочерним элементом своего родителя.
-
is_alive
()¶ Возвращает, жив ли процесс.
Грубо говоря, объект процесса жив с момента возврата метода
start()
до завершения дочернего процесса.
-
daemon
¶ Флаг демона процесса, булево значение. Он должен быть установлен до вызова
start()
.Начальное значение наследуется от процесса создания.
Когда процесс завершается, он пытается завершить все свои дочерние демонические процессы.
Обратите внимание, что демоническому процессу не разрешается создавать дочерние процессы. В противном случае демонический процесс оставит своих детей сиротами, если он будет завершен при выходе из родительского процесса. Кроме того, это не демоны или службы Unix, это обычные процессы, которые будут завершены (и не присоединятся), если завершатся не демонические процессы.
В дополнение к API
threading.Thread
, объектыProcess
также поддерживают следующие атрибуты и методы:-
pid
¶ Возвращает идентификатор процесса. До порождения процесса это будет
None
.
-
exitcode
¶ Код завершения дочернего процесса. Это будет
None
, если процесс еще не завершился.Если дочерний метод
run()
вернулся нормально, код выхода будет 0. Если он завершился черезsys.exit()
с целочисленным аргументом N, код выхода будет N.Если дочерняя программа завершилась из-за исключения, не пойманного в пределах
run()
, код выхода будет равен 1. Если он был завершен по сигналу N, код выхода будет отрицательным значением -N.
-
authkey
¶ Ключ аутентификации процесса (байтовая строка).
При инициализации
multiprocessing
главному процессу присваивается случайная строка с помощьюos.urandom()
.При создании объекта
Process
он наследует ключ аутентификации своего родительского процесса, хотя это можно изменить, установивauthkey
в другую байтовую строку.См. Ключи аутентификации.
-
sentinel
¶ Числовой хэндл системного объекта, который станет «готовым» после завершения процесса.
Вы можете использовать это значение, если хотите ждать сразу несколько событий, используя
multiprocessing.connection.wait()
. В противном случае вызовjoin()
является более простым.В Windows это дескриптор ОС, используемый с помощью вызовов API семейства
WaitForSingleObject
иWaitForMultipleObjects
. В Unix это дескриптор файла, используемый с примитивами модуляselect
.Добавлено в версии 3.3.
-
terminate
()¶ Завершить процесс. В Unix это делается с помощью сигнала
SIGTERM
; в Windows используется сигналTerminateProcess()
. Обратите внимание, что обработчики выхода, предложения finally и т.д. не будут выполнены.Обратите внимание, что процессы-потомки этого процесса не будут завершены - они просто станут бесхозными.
Предупреждение
Если этот метод используется, когда связанный процесс использует канал или очередь, то канал или очередь могут быть повреждены и стать непригодными для использования другими процессами. Аналогично, если процесс получил блокировку или семафор и т.д., то его завершение может привести к тупику других процессов.
-
kill
()¶ То же самое, что и
terminate()
, но с использованием сигналаSIGKILL
на Unix.Добавлено в версии 3.7.
-
close
()¶ Закрывает объект
Process
, освобождая все связанные с ним ресурсы. ВызовValueError
происходит, если основной процесс все еще запущен. После успешного возвратаclose()
большинство других методов и атрибутов объектаProcess
будут вызыватьValueError
.Добавлено в версии 3.7.
Обратите внимание, что методы
start()
,join()
,is_alive()
,terminate()
иexitcode
должны вызываться только процессом, создавшим объект процесса.Пример использования некоторых методов
Process
:>>> import multiprocessing, time, signal >>> p = multiprocessing.Process(target=time.sleep, args=(1000,)) >>> print(p, p.is_alive()) <Process ... initial> False >>> p.start() >>> print(p, p.is_alive()) <Process ... started> True >>> p.terminate() >>> time.sleep(0.1) >>> print(p, p.is_alive()) <Process ... stopped exitcode=-SIGTERM> False >>> p.exitcode == -signal.SIGTERM True
-
-
exception
multiprocessing.
ProcessError
¶ Базовый класс всех исключений
multiprocessing
.
-
exception
multiprocessing.
BufferTooShort
¶ Исключение, вызванное
Connection.recv_bytes_into()
, когда предоставленный объект буфера слишком мал для прочитанного сообщения.Если
e
является экземпляромBufferTooShort
, тоe.args[0]
выдаст сообщение в виде байтовой строки.
-
exception
multiprocessing.
AuthenticationError
¶ Возникает при ошибке аутентификации.
-
exception
multiprocessing.
TimeoutError
¶ Вызывается методами с таймаутом, когда таймаут истекает.
Трубы и очереди¶
При использовании нескольких процессов обычно используется передача сообщений для связи между процессами, что позволяет избежать использования примитивов синхронизации, таких как блокировки.
Для передачи сообщений можно использовать Pipe()
(для соединения между двумя процессами) или очередь (которая позволяет использовать несколько производителей и потребителей).
Типы Queue
, SimpleQueue
и JoinableQueue
представляют собой многопроизводственные, многопотребительские FIFO очереди, созданные по образцу класса queue.Queue
в стандартной библиотеке. Они отличаются тем, что в Queue
отсутствуют методы task_done()
и join()
, введенные в класс queue.Queue
в Python 2.5.
Если вы используете JoinableQueue
, то вы должны вызывать JoinableQueue.task_done()
для каждой задачи, удаленной из очереди, иначе семафор, используемый для подсчета количества незавершенных задач, может в конечном итоге переполниться, вызвав исключение.
Обратите внимание, что можно также создать общую очередь с помощью объекта-менеджера - см. Менеджеры.
Примечание
multiprocessing
использует обычные исключения queue.Empty
и queue.Full
для сигнализации тайм-аута. Они недоступны в пространстве имен multiprocessing
, поэтому их необходимо импортировать из queue
.
Примечание
Когда объект помещается в очередь, он пикируется, и фоновый поток позже сбрасывает пикированные данные в нижележащую трубу. Это имеет некоторые последствия, которые немного удивляют, но не должны вызывать никаких практических трудностей - если они вас действительно беспокоят, вы можете вместо этого использовать очередь, созданную с помощью manager.
После помещения объекта в пустую очередь может произойти бесконечно малая задержка, прежде чем метод
empty()
очереди вернетсяFalse
иget_nowait()
сможет вернуться без возникновенияqueue.Empty
.Если несколько процессов ставят объекты в очередь, возможно, что объекты будут получены на другом конце не по порядку. Однако объекты, выставляемые в очередь одним и тем же процессом, всегда будут располагаться в ожидаемом порядке по отношению друг к другу.
Предупреждение
Если процесс убит с помощью Process.terminate()
или os.kill()
, когда он пытается использовать Queue
, то данные в очереди могут быть повреждены. Это может привести к тому, что любой другой процесс получит исключение при попытке использовать очередь в дальнейшем.
Предупреждение
Как упоминалось выше, если дочерний процесс поместил элементы в очередь (и не использовал JoinableQueue.cancel_join_thread
), то этот процесс не завершится, пока все буферизированные элементы не будут смыты в трубу.
Это означает, что если вы попытаетесь присоединиться к этому процессу, вы можете получить тупик, если не будете уверены, что все элементы, которые были поставлены в очередь, были потреблены. Аналогично, если дочерний процесс не является демоническим, то родительский процесс может зависнуть при выходе, когда он попытается присоединиться ко всем своим недемоническим дочерним процессам.
Обратите внимание, что очередь, созданная с помощью менеджера, не имеет этой проблемы. См. раздел Руководство по программированию.
Пример использования очередей для межпроцессного взаимодействия приведен в Примеры.
-
multiprocessing.
Pipe
([duplex])¶ Возвращает пару
(conn1, conn2)
объектовConnection
, представляющих концы трубы.Если duplex имеет значение
True
(по умолчанию), то труба является двунаправленной. Если duplex равенFalse
, то труба будет однонаправленной:conn1
может использоваться только для приема сообщений, аconn2
может использоваться только для отправки сообщений.
-
class
multiprocessing.
Queue
([maxsize])¶ Возвращает общую очередь процесса, реализованную с помощью трубы и нескольких блокировок/семафоров. Когда процесс впервые помещает элемент в очередь, запускается поток фидера, который передает объекты из буфера в трубу.
Обычные исключения
queue.Empty
иqueue.Full
из модуляqueue
стандартной библиотеки поднимаются для сигнализации тайм-аута.Queue
реализует все методыqueue.Queue
, кромеtask_done()
иjoin()
.-
qsize
()¶ Возвращает приблизительный размер очереди. Из-за семантики многопоточности/многопроцессорности это число не является надежным.
Обратите внимание, что это может вызвать ошибку
NotImplementedError
на платформах Unix, таких как macOS, гдеsem_getvalue()
не реализована.
-
empty
()¶ Возвращает
True
, если очередь пуста,False
в противном случае. Из-за семантики многопоточности/многопроцессорности это ненадежно.
-
full
()¶ Возвращает
True
, если очередь заполнена,False
в противном случае. Из-за семантики многопоточности/многопроцессорности это ненадежно.
-
put
(obj[, block[, timeout]])¶ Поместить obj в очередь. Если дополнительный аргумент block равен
True
(по умолчанию) и timeout равенNone
(по умолчанию), то при необходимости блокируется до тех пор, пока не появится свободный слот. Если timeout - положительное число, то блокировка длится не более timeout секунд и вызывает исключениеqueue.Full
, если в течение этого времени свободный слот не был доступен. В противном случае (block равноFalse
), поместите элемент в очередь, если свободный слот доступен немедленно, иначе вызовите исключениеqueue.Full
(timeout в этом случае игнорируется).Изменено в версии 3.8: Если очередь закрыта, вместо
ValueError
выдаетсяAssertionError
.
-
put_nowait
(obj)¶ Эквивалентно
put(obj, False)
.
-
get
([block[, timeout]])¶ Удалить и вернуть элемент из очереди. Если опциональные args block равно
True
(по умолчанию) и timeout равноNone
(по умолчанию), то при необходимости блокируется, пока элемент не будет доступен. Если timeout - положительное число, то блокируется не более timeout секунд и вызывает исключениеqueue.Empty
, если в течение этого времени ни один элемент не был доступен. В противном случае (блок равенFalse
), возвращает элемент, если он доступен немедленно, иначе вызывает исключениеqueue.Empty
(timeout в этом случае игнорируется).Изменено в версии 3.8: Если очередь закрыта, вместо
ValueError
выдаетсяOSError
.
-
get_nowait
()¶ Эквивалентно
get(False)
.
multiprocessing.Queue
имеет несколько дополнительных методов, которых нет вqueue.Queue
. Эти методы обычно не нужны для большинства кода:-
close
()¶ Указывает, что текущий процесс больше не будет помещать данные в эту очередь. Фоновый поток завершится, как только передаст все буферизованные данные в трубу. Это вызывается автоматически, когда очередь собирается в мусор.
-
join_thread
()¶ Присоединиться к фоновому потоку. Это можно использовать только после вызова
close()
. Он блокирует выполнение до выхода из фонового потока, гарантируя, что все данные в буфере были выведены в трубу.По умолчанию, если процесс не является создателем очереди, то при выходе он попытается присоединиться к фоновому потоку очереди. Процесс может вызвать
cancel_join_thread()
, чтобы заставитьjoin_thread()
ничего не делать.
-
cancel_join_thread
()¶ Предотвратить блокировку
join_thread()
. В частности, это предотвращает автоматическое присоединение фонового потока при завершении процесса - см.join_thread()
.Более подходящим названием для этого метода может быть
allow_exit_without_flush()
. Он, скорее всего, приведет к потере данных из очереди, и почти наверняка вам не понадобится его использовать. Он действительно нужен только в том случае, если вам нужно, чтобы текущий процесс завершился немедленно, не дожидаясь, пока данные из очереди будут переданы в базовый канал, и вас не волнует потеря данных.
Примечание
Функциональность этого класса требует наличия функционирующей реализации общего семафора в операционной системе хоста. При отсутствии таковой функциональность этого класса будет отключена, а попытки инстанцировать
Queue
приведут к появлениюImportError
. Дополнительную информацию см. в разделе bpo-3770. То же самое справедливо для любого из специализированных типов очередей, перечисленных ниже.-
-
class
multiprocessing.
SimpleQueue
¶ Это упрощенный тип
Queue
, очень близкий к заблокированномуPipe
.-
close
()¶ Закрыть очередь: освободить внутренние ресурсы.
Очередь не должна больше использоваться после ее закрытия. Например, методы
get()
,put()
иempty()
больше не должны вызываться.Добавлено в версии 3.9.
-
empty
()¶ Возвращает
True
, если очередь пуста,False
в противном случае.
-
get
()¶ Удаление и возврат элемента из очереди.
-
put
(item)¶ Поместите элемент в очередь.
-
-
class
multiprocessing.
JoinableQueue
([maxsize])¶ JoinableQueue
, подклассQueue
- это очередь, которая дополнительно имеет методыtask_done()
иjoin()
.-
task_done
()¶ Указывает, что ранее поставленная в очередь задача завершена. Используется потребителями очереди. Для каждого
get()
, используемого для получения задачи, последующий вызовtask_done()
сообщает очереди, что обработка задачи завершена.Если
join()
в настоящее время блокируется, он возобновится, когда все элементы будут обработаны (это означает, что вызовtask_done()
был получен для каждого элемента, который былput()
в очереди).Вызывает ошибку
ValueError
, если вызывается большее количество раз, чем было помещено элементов в очередь.
-
join
()¶ Блокировать, пока все элементы в очереди не будут получены и обработаны.
Счетчик незавершенных задач увеличивается всякий раз, когда в очередь добавляется элемент. Счетчик уменьшается всякий раз, когда потребитель вызывает
task_done()
, чтобы указать, что элемент был получен и вся работа над ним завершена. Когда количество незавершенных задач падает до нуля,join()
разблокируется.
-
Разное¶
-
multiprocessing.
active_children
()¶ Возвращает список всех живых детей текущего процесса.
Вызов этого вызова имеет побочный эффект «присоединения» к любым процессам, которые уже завершились.
-
multiprocessing.
cpu_count
()¶ Возвращает количество центральных процессоров в системе.
Это число не эквивалентно количеству CPU, которые может использовать текущий процесс. Количество используемых ЦП можно получить с помощью
len(os.sched_getaffinity(0))
.Если количество CPU не может быть определено, выдается сообщение
NotImplementedError
.См.также
-
multiprocessing.
current_process
()¶ Возвращает объект
Process
, соответствующий текущему процессу.Аналог
threading.current_thread()
.
-
multiprocessing.
parent_process
()¶ Возвращает объект
Process
, соответствующий родительскому процессуcurrent_process()
. Для главного процессаparent_process
будетNone
.Добавлено в версии 3.8.
-
multiprocessing.
freeze_support
()¶ Добавьте поддержку для случая, когда программа, использующая
multiprocessing
, была заморожена для создания исполняемого файла Windows. (Проверено с помощью py2exe, PyInstaller и cx_Freeze).Вызывать эту функцию нужно сразу после строки
if __name__ == '__main__'
основного модуля. Например:from multiprocessing import Process, freeze_support def f(): print('hello world!') if __name__ == '__main__': freeze_support() Process(target=f).start()
Если строка
freeze_support()
опущена, то попытка запустить замороженный исполняемый файл приведет к ошибкеRuntimeError
.Вызов
freeze_support()
не имеет эффекта при вызове на любой операционной системе, отличной от Windows. Кроме того, если модуль нормально выполняется интерпретатором Python на Windows (программа не была заморожена), тоfreeze_support()
не имеет никакого эффекта.
-
multiprocessing.
get_all_start_methods
()¶ Возвращает список поддерживаемых методов запуска, первый из которых используется по умолчанию. Возможными методами запуска являются
'fork'
,'spawn'
и'forkserver'
. В Windows доступен только'spawn'
. На Unix всегда поддерживаются'fork'
и'spawn'
, при этом по умолчанию используется'fork'
.Добавлено в версии 3.4.
-
multiprocessing.
get_context
(method=None)¶ Возвращает объект контекста, который имеет те же атрибуты, что и модуль
multiprocessing
.Если method равен
None
, то возвращается контекст по умолчанию. В противном случае method должен быть'fork'
,'spawn'
,'forkserver'
. Если указанный метод запуска недоступен, выдается сообщениеValueError
.Добавлено в версии 3.4.
-
multiprocessing.
get_start_method
(allow_none=False)¶ Возвращает имя метода запуска, используемого для запуска процессов.
Если метод запуска не был зафиксирован и allow_none равно false, то метод запуска фиксируется по умолчанию и возвращается имя. Если метод запуска не был зафиксирован и allow_none равно true, то возвращается
None
.Возвращаемое значение может быть
'fork'
,'spawn'
,'forkserver'
илиNone
.'fork'
является значением по умолчанию в Unix, а'spawn'
является значением по умолчанию в Windows и macOS.
Изменено в версии 3.8: В macOS метод запуска spawn теперь используется по умолчанию. Метод запуска fork следует считать небезопасным, поскольку он может привести к сбою подпроцесса. См. bpo-33725.
Добавлено в версии 3.4.
-
multiprocessing.
set_executable
(executable)¶ Установите путь к интерпретатору Python, который будет использоваться при запуске дочернего процесса. (По умолчанию используется
sys.executable
). Встраиваемым программам, вероятно, потребуется сделать что-то вродеset_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
прежде чем они смогут создавать дочерние процессы.
Изменено в версии 3.4: Теперь поддерживается на Unix, когда используется метод запуска
'spawn'
.
-
multiprocessing.
set_start_method
(method)¶ Установите метод, который должен использоваться для запуска дочерних процессов. method может быть
'fork'
,'spawn'
или'forkserver'
.Обратите внимание, что это должно быть вызвано не более одного раза и должно быть защищено внутри пункта
if __name__ == '__main__'
главного модуля.Добавлено в версии 3.4.
Примечание
multiprocessing
не содержит аналогов threading.active_count()
, threading.enumerate()
, threading.settrace()
, threading.setprofile()
, threading.Timer
или threading.local
.
Объекты соединения¶
Объекты соединений позволяют отправлять и получать picklable объекты или строки. Их можно рассматривать как ориентированные на сообщения подключенные сокеты.
Объекты соединений обычно создаются с помощью Pipe
- см. также Слушатели и клиенты.
-
class
multiprocessing.connection.
Connection
¶ -
send
(obj)¶ Отправьте на другой конец соединения объект, который должен быть прочитан с помощью
recv()
.Объект должен быть picklable. Очень большие pickle (примерно 32 MiB+, хотя это зависит от ОС) могут вызвать исключение
ValueError
.
-
recv
()¶ Возвращает объект, отправленный с другого конца соединения с помощью
send()
. Блокируется до тех пор, пока не будет что получать. ВызываетEOFError
, если нечего принимать и другой конец был закрыт.
-
fileno
()¶ Возвращает дескриптор файла или дескриптор, используемый соединением.
-
close
()¶ Закройте соединение.
Это вызывается автоматически, когда соединение собирается в мусор.
-
poll
([timeout])¶ Возвращает, есть ли данные, доступные для чтения.
Если timeout не указан, то возврат происходит немедленно. Если timeout - это число, то оно определяет максимальное время блокировки в секундах. Если timeout равен
None
, то используется бесконечный таймаут.Обратите внимание, что несколько объектов соединения могут быть опрошены одновременно с помощью
multiprocessing.connection.wait()
.
-
send_bytes
(buffer[, offset[, size]])¶ Отправка байтовых данных из bytes-like object в виде полного сообщения.
Если задано offset, то данные считываются из этой позиции в буфере. Если указан size, то из буфера будет считано столько байт. Очень большие буферы (примерно 32 MiB+, хотя это зависит от ОС) могут вызвать исключение
ValueError
-
recv_bytes
([maxlength])¶ Возвращает полное сообщение байтовых данных, отправленных с другого конца соединения, в виде строки. Блокируется до тех пор, пока не будет что принимать. Вызывает
EOFError
, если нечего принимать и другой конец закрыт.Если указано maxlength и сообщение длиннее, чем maxlength, то будет выдано сообщение
OSError
и соединение больше не будет доступно для чтения.
-
recv_bytes_into
(buffer[, offset])¶ Считывает в буфер полное сообщение байтовых данных, отправленное с другого конца соединения, и возвращает количество байтов в сообщении. Блокируется до тех пор, пока не будет что получать. Вызывает
EOFError
, если нечего принимать и другой конец был закрыт.buffer должен быть записываемым bytes-like object. Если задано смещение, то сообщение будет записано в буфер с этой позиции. Смещение должно быть неотрицательным целым числом, меньшим, чем длина буфера (в байтах).
Если буфер слишком короткий, то возникает исключение
BufferTooShort
и полное сообщение доступно какe.args[0]
, гдеe
- экземпляр исключения.
Изменено в версии 3.3: Сами объекты соединений теперь можно передавать между процессами с помощью
Connection.send()
иConnection.recv()
.Добавлено в версии 3.3: Объекты соединения теперь поддерживают протокол управления контекстом - см. Типы контекстного менеджера.
__enter__()
возвращает объект соединения, а__exit__()
вызываетclose()
.-
Например:
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
Предупреждение
Метод Connection.recv()
автоматически распикировывает полученные данные, что может представлять угрозу безопасности, если вы не доверяете процессу, отправившему сообщение.
Поэтому, если объект соединения не был создан с помощью Pipe()
, вы должны использовать методы recv()
и send()
только после выполнения некоторой аутентификации. См. раздел Ключи аутентификации.
Предупреждение
Если процесс будет убит, когда он пытается читать или писать в трубу, то данные в трубе, скорее всего, будут повреждены, поскольку невозможно будет определить, где проходят границы сообщений.
Примитивы синхронизации¶
Как правило, примитивы синхронизации не так необходимы в многопроцессной программе, как в многопоточной. См. документацию по модулю threading
.
Обратите внимание, что можно также создавать примитивы синхронизации с помощью объекта-менеджера - см. Менеджеры.
-
class
multiprocessing.
Barrier
(parties[, action[, timeout]])¶ Объект барьера: клон
threading.Barrier
.Добавлено в версии 3.3.
-
class
multiprocessing.
BoundedSemaphore
([value])¶ Ограниченный объект семафора: близкий аналог
threading.BoundedSemaphore
.Единственное отличие от близкого аналога: первый аргумент метода
acquire
имеет имя block, как это соответствуетLock.acquire()
.Примечание
На macOS это неотличимо от
Semaphore
, посколькуsem_getvalue()
на этой платформе не реализовано.
-
class
multiprocessing.
Condition
([lock])¶ Переменная условия: псевдоним для
threading.Condition
.Если указан lock, то это должен быть объект
Lock
илиRLock
изmultiprocessing
.Изменено в версии 3.3: Был добавлен метод
wait_for()
.
-
class
multiprocessing.
Event
¶ Клон
threading.Event
.
-
class
multiprocessing.
Lock
¶ Объект нерекурсивной блокировки: близкий аналог
threading.Lock
. Если процесс или поток получил блокировку, последующие попытки получить ее от любого процесса или потока будут блокироваться до тех пор, пока она не будет освобождена; любой процесс или поток может освободить ее. Концепции и поведениеthreading.Lock
применительно к потокам повторяются здесь вmultiprocessing.Lock
применительно либо к процессам, либо к потокам, за исключением отмеченного.Обратите внимание, что
Lock
на самом деле является фабричной функцией, которая возвращает экземплярmultiprocessing.synchronize.Lock
, инициализированный контекстом по умолчанию.Lock
поддерживает протокол context manager и поэтому может использоваться в утвержденияхwith
.-
acquire
(block=True, timeout=None)¶ Получение блокировки, блокирующей или неблокирующей.
Если аргумент block установлен в
True
(по умолчанию), вызов метода будет блокироваться до тех пор, пока замок не будет находиться в разблокированном состоянии, затем установит его в заблокированное состояние и вернетTrue
. Обратите внимание, что имя этого первого аргумента отличается от имени аргумента вthreading.Lock.acquire()
.Если аргумент block установлен в
False
, вызов метода не блокируется. Если замок в данный момент находится в заблокированном состоянии, вернитеFalse
; в противном случае переведите замок в заблокированное состояние и вернитеTrue
.При вызове с положительным значением с плавающей точкой для timeout, блокируется не более чем на количество секунд, указанное timeout, до тех пор, пока блокировка не может быть получена. Вызовы с отрицательным значением timeout эквивалентны нулевому timeout. Вызовы со значением timeout
None
(по умолчанию) устанавливают период тайм-аута бесконечным. Обратите внимание, что обработка отрицательных илиNone
значений для timeout отличается от реализованного поведения вthreading.Lock.acquire()
. Аргумент timeout не имеет практических последствий, если аргумент block установлен вFalse
и поэтому игнорируется. ВозвращаетTrue
, если блокировка была получена, илиFalse
, если истек период тайм-аута.
-
release
()¶ Освободить блокировку. Эта функция может быть вызвана из любого процесса или потока, а не только из процесса или потока, который первоначально получил блокировку.
Поведение такое же, как и в
threading.Lock.release()
, за исключением того, что при вызове на разблокированной блокировке возникает ошибкаValueError
.
-
-
class
multiprocessing.
RLock
¶ Объект рекурсивной блокировки: близкий аналог
threading.RLock
. Рекурсивная блокировка должна быть освобождена процессом или потоком, который ее приобрел. После того как процесс или поток приобрел рекурсивную блокировку, тот же процесс или поток может приобрести ее снова без блокировки; этот процесс или поток должен освободить ее один раз за каждый раз, когда она была приобретена.Обратите внимание, что
RLock
на самом деле является фабричной функцией, которая возвращает экземплярmultiprocessing.synchronize.RLock
, инициализированный контекстом по умолчанию.RLock
поддерживает протокол context manager и поэтому может использоваться в утвержденияхwith
.-
acquire
(block=True, timeout=None)¶ Получение блокировки, блокирующей или неблокирующей.
При вызове с аргументом block, установленным в
True
, блокируется до тех пор, пока блокировка не перейдет в разблокированное состояние (не будет принадлежать ни одному процессу или потоку), если только блокировка уже не принадлежит текущему процессу или потоку. Затем текущий процесс или поток получает право собственности на блокировку (если он еще не владеет ею), а уровень рекурсии внутри блокировки увеличивается на единицу, в результате чего возвращается значениеTrue
. Обратите внимание, что в поведении этого первого аргумента есть несколько отличий от реализацииthreading.RLock.acquire()
, начиная с имени самого аргумента.При вызове с аргументом block, установленным в
False
, не блокировать. Если блокировка уже была приобретена (и, следовательно, принадлежит) другим процессом или потоком, текущий процесс или поток не принимает права собственности и уровень рекурсии в блокировке не изменяется, в результате чего возвращается значениеFalse
. Если блокировка находится в разблокированном состоянии, текущий процесс или поток получает право собственности, а уровень рекурсии увеличивается, в результате чего возвращается значениеTrue
.Использование и поведение аргумента timeout такие же, как и в
Lock.acquire()
. Обратите внимание, что некоторые из этих поведений timeout отличаются от реализованных вthreading.RLock.acquire()
.
-
release
()¶ Освободите блокировку, уменьшив уровень рекурсии. Если после декремента уровень рекурсии равен нулю, сбросьте блокировку на разблокированную (не принадлежащую ни одному процессу или потоку), и если другие процессы или потоки заблокированы в ожидании разблокировки блокировки, разрешите продолжить выполнение только одному из них. Если после декремента уровень рекурсии все еще ненулевой, блокировка остается заблокированной и принадлежит вызывающему процессу или потоку.
Вызывайте этот метод только тогда, когда вызывающий процесс или поток владеет блокировкой. Если этот метод вызывается не владельцем, а процессом или потоком, или если блокировка находится в разблокированном (не принадлежащем владельцу) состоянии, то возникает исключение
AssertionError
. Обратите внимание, что тип исключения, возникающего в этой ситуации, отличается от реализованного поведения вthreading.RLock.release()
.
-
-
class
multiprocessing.
Semaphore
([value])¶ Объект семафора: близкий аналог
threading.Semaphore
.Единственное отличие от близкого аналога: первый аргумент метода
acquire
имеет имя block, как это соответствуетLock.acquire()
.
Примечание
В macOS функция sem_timedwait
не поддерживается, поэтому вызов acquire()
с таймаутом будет эмулировать поведение этой функции с помощью спящего цикла.
Примечание
Если сигнал SIGINT, генерируемый командой Ctrl-C, поступает в то время, когда основной поток заблокирован вызовом BoundedSemaphore.acquire()
, Lock.acquire()
, RLock.acquire()
, Semaphore.acquire()
, Condition.acquire()
или Condition.wait()
, то вызов будет немедленно прерван и будет вызван сигнал KeyboardInterrupt
.
Это отличается от поведения threading
, где SIGINT будет игнорироваться, пока выполняются эквивалентные блокирующие вызовы.
Примечание
Некоторые функции этого пакета требуют наличия в операционной системе хоста работающей реализации общего семафора. При отсутствии таковой модуль multiprocessing.synchronize
будет отключен, а попытки импортировать его приведут к ошибке ImportError
. Дополнительную информацию см. в разделе bpo-3770.
Менеджеры¶
Менеджеры обеспечивают способ создания данных, которые могут совместно использоваться различными процессами, включая обмен по сети между процессами, запущенными на разных машинах. Объект менеджера управляет серверным процессом, который управляет общими объектами. Другие процессы могут получить доступ к общим объектам, используя прокси.
-
multiprocessing.
Manager
()¶ Возвращает объект started
SyncManager
, который может быть использован для совместного использования объектов между процессами. Возвращаемый объект менеджера соответствует порожденному дочернему процессу и имеет методы, которые будут создавать общие объекты и возвращать соответствующие прокси.
Процессы менеджера будут завершены, как только они будут собраны в мусор или их родительский процесс завершится. Классы менеджеров определены в модуле multiprocessing.managers
:
-
class
multiprocessing.managers.
BaseManager
([address[, authkey]])¶ Создайте объект BaseManager.
После создания необходимо вызвать
start()
илиget_server().serve_forever()
, чтобы убедиться, что объект менеджера относится к запущенному процессу менеджера.адрес - это адрес, по которому процесс менеджера прослушивает новые соединения. Если address равен
None
, то выбирается произвольный адрес.authkey - это ключ аутентификации, который будет использоваться для проверки валидности входящих соединений с серверным процессом. Если authkey равен
None
, то используетсяcurrent_process().authkey
. В противном случае используется authkey, который должен быть байтовой строкой.-
start
([initializer[, initargs]])¶ Запуск подпроцесса для запуска менеджера. Если initializer не
None
, то при запуске подпроцесс вызоветinitializer(*initargs)
.
-
get_server
()¶ Возвращает объект
Server
, представляющий реальный сервер под управлением менеджера. ОбъектServer
поддерживает методserve_forever()
:>>> from multiprocessing.managers import BaseManager >>> manager = BaseManager(address=('', 50000), authkey=b'abc') >>> server = manager.get_server() >>> server.serve_forever()
Server
дополнительно имеет атрибутaddress
.
-
connect
()¶ Подключите локальный объект менеджера к удаленному процессу менеджера:
>>> from multiprocessing.managers import BaseManager >>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc') >>> m.connect()
-
shutdown
()¶ Остановить процесс, используемый менеджером. Это доступно только в том случае, если для запуска процесса сервера использовалась команда
start()
.Это может быть вызвано несколько раз.
-
register
(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])¶ Метод класса, который может быть использован для регистрации типа или вызываемого объекта в классе менеджера.
typeid - это «идентификатор типа», который используется для идентификации конкретного типа разделяемого объекта. Это должна быть строка.
callable - вызываемый объект, используемый для создания объектов для данного идентификатора типа. Если экземпляр менеджера будет подключен к серверу с помощью метода
connect()
, или если аргумент create_method равенFalse
, то это можно оставить какNone
.proxytype - это подкласс
BaseProxy
, который используется для создания прокси для общих объектов с данным typeid. ЕслиNone
, то класс прокси создается автоматически.exposed используется для указания последовательности имен методов, доступ к которым прокси для данного typeid должен быть разрешен с помощью
BaseProxy._callmethod()
. (Если exposed равенNone
, то вместо него используетсяproxytype._exposed_
, если он существует). В случае, когда список exposed не указан, все «публичные методы» разделяемого объекта будут доступны. (Здесь «публичный метод» означает любой атрибут, который имеет метод__call__()
и имя которого не начинается с'_'
).method_to_typeid - это связка, используемая для указания возвращаемого типа тех открытых методов, которые должны возвращать прокси. Оно сопоставляет имена методов со строками typeid. (Если method_to_typeid имеет значение
None
, то вместо него используетсяproxytype._method_to_typeid_
, если оно существует). Если имя метода не является ключом этого отображения или если отображение имеет значениеNone
, то объект, возвращаемый методом, будет скопирован по значению.create_method определяет, должен ли быть создан метод с именем typeid, который может быть использован для указания серверному процессу создать новый разделяемый объект и вернуть для него прокси. По умолчанию это
True
.
Экземпляры
BaseManager
также имеют одно свойство, доступное только для чтения:-
address
¶ Адрес, используемый менеджером.
Изменено в версии 3.3: Объекты менеджера поддерживают протокол управления контекстом - см. Типы контекстного менеджера.
__enter__()
запускает процесс сервера (если он еще не запущен) и возвращает объект менеджера.__exit__()
вызываетshutdown()
.В предыдущих версиях
__enter__()
не запускал серверный процесс менеджера, если он еще не был запущен.-
-
class
multiprocessing.managers.
SyncManager
¶ Подкласс
BaseManager
, который может быть использован для синхронизации процессов. Объекты этого типа возвращаютсяmultiprocessing.Manager()
.Его методы создают и возвращают Прокси-объекты для ряда часто используемых типов данных, которые должны быть синхронизированы между процессами. К ним, в частности, относятся общие списки и словари.
-
Barrier
(parties[, action[, timeout]])¶ Создайте общий объект
threading.Barrier
и верните для него прокси.Добавлено в версии 3.3.
-
BoundedSemaphore
([value])¶ Создайте общий объект
threading.BoundedSemaphore
и верните для него прокси.
-
Condition
([lock])¶ Создайте общий объект
threading.Condition
и верните для него прокси.Если указан lock, то он должен быть прокси для объекта
threading.Lock
илиthreading.RLock
.Изменено в версии 3.3: Был добавлен метод
wait_for()
.
-
Event
()¶ Создайте общий объект
threading.Event
и верните для него прокси.
-
Lock
()¶ Создайте общий объект
threading.Lock
и верните для него прокси.
-
Queue
([maxsize])¶ Создайте общий объект
queue.Queue
и верните для него прокси.
-
RLock
()¶ Создайте общий объект
threading.RLock
и верните для него прокси.
-
Semaphore
([value])¶ Создайте общий объект
threading.Semaphore
и верните для него прокси.
-
Array
(typecode, sequence)¶ Создайте массив и верните для него прокси.
-
Value
(typecode, value)¶ Создайте объект с атрибутом записи
value
и верните для него прокси.
Изменено в версии 3.6: Общие объекты могут быть вложенными. Например, разделяемый объект-контейнер, такой как разделяемый список, может содержать другие разделяемые объекты, которые будут управляться и синхронизироваться
SyncManager
.-
-
class
multiprocessing.managers.
Namespace
¶ Тип, который может регистрироваться с помощью
SyncManager
.Объект пространства имен не имеет открытых методов, но имеет атрибуты, доступные для записи. Его представление показывает значения его атрибутов.
Однако при использовании прокси для объекта пространства имен атрибут, начинающийся с
'_'
, будет атрибутом прокси, а не атрибутом референта:>>> manager = multiprocessing.Manager() >>> Global = manager.Namespace() >>> Global.x = 10 >>> Global.y = 'hello' >>> Global._z = 12.3 # this is an attribute of the proxy >>> print(Global) Namespace(x=10, y='hello')
Индивидуальные менеджеры¶
Для создания собственного менеджера создается подкласс BaseManager
и используется метод класса register()
для регистрации новых типов или callables в классе менеджера. Например:
from multiprocessing.managers import BaseManager
class MathsClass:
def add(self, x, y):
return x + y
def mul(self, x, y):
return x * y
class MyManager(BaseManager):
pass
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
with MyManager() as manager:
maths = manager.Maths()
print(maths.add(4, 3)) # prints 7
print(maths.mul(7, 8)) # prints 56
Использование удаленного менеджера¶
Можно запустить сервер менеджера на одной машине, а клиенты могут использовать его с других машин (при условии, что соответствующие брандмауэры позволяют это сделать).
Выполнение следующих команд создает сервер для одной общей очереди, к которой могут получить доступ удаленные клиенты:
>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
Один клиент может получить доступ к серверу следующим образом:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')
Другой клиент также может использовать его:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'
Локальные процессы также могут получить доступ к этой очереди, используя код выше на клиенте для удаленного доступа к ней:
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
... def __init__(self, q):
... self.q = q
... super().__init__()
... def run(self):
... self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
Прокси-объекты¶
Прокси - это объект, который ссылается на общий объект, который живет (предположительно) в другом процессе. Считается, что общий объект является референтом прокси. Несколько прокси-объектов могут иметь одного и того же референта.
Прокси-объект имеет методы, которые вызывают соответствующие методы своего референта (хотя не все методы референта обязательно будут доступны через прокси). Таким образом, прокси-объект можно использовать так же, как и его референт:
>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
Обратите внимание, что применение str()
к прокси вернет представление референта, тогда как применение repr()
вернет представление прокси.
Важной особенностью прокси-объектов является то, что их можно передавать между процессами. Как таковой, референт может содержать Прокси-объекты. Это позволяет встраивать эти управляемые списки, таблицы и другие Прокси-объекты:
>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b) # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']
Аналогично, прокси dict и list могут быть вложены друг в друга:
>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}
Если в референте содержатся стандартные (непрокси) объекты list
или dict
, модификации этих изменяемых значений не будут передаваться через менеджер, поскольку прокси не имеет возможности узнать, когда изменяются содержащиеся в нем значения. Однако, сохранение значения в контейнерном прокси (что вызывает __setitem__
на прокси-объекте) распространяется через менеджер, поэтому для эффективного изменения такого элемента можно переназначить измененное значение контейнерному прокси:
# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d
Этот подход, возможно, менее удобен, чем использование вложенных Прокси-объекты для большинства случаев использования, но также демонстрирует уровень контроля над синхронизацией.
Примечание
Прокси-типы в multiprocessing
ничего не делают для поддержки сравнения по значению. Так, например, мы имеем:
>>> manager.list([1,2,3]) == [1,2,3]
False
При сравнении следует просто использовать копию референта.
-
class
multiprocessing.managers.
BaseProxy
¶ Прокси-объекты являются экземплярами подклассов
BaseProxy
.-
_callmethod
(methodname[, args[, kwds]])¶ Вызвать и вернуть результат метода референта прокси.
Если
proxy
является прокси, референтом которого являетсяobj
, то выражениеproxy._callmethod(methodname, args, kwds)
оценит выражение
getattr(obj, methodname)(*args, **kwds)
в процессе работы менеджера.
Возвращаемое значение будет копией результата вызова или прокси на новый разделяемый объект - см. документацию для аргумента method_to_typeid в
BaseManager.register()
.Если при вызове возникло исключение, то оно повторно вызывается командой
_callmethod()
. Если в процессе менеджера возникает какое-то другое исключение, то оно преобразуется в исключениеRemoteError
и вызывается командой_callmethod()
.Обратите внимание, в частности, что будет вызвано исключение, если имя метода не было раскрыто.
Пример использования
_callmethod()
:>>> l = manager.list(range(10)) >>> l._callmethod('__len__') 10 >>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7] [2, 3, 4, 5, 6] >>> l._callmethod('__getitem__', (20,)) # equivalent to l[20] Traceback (most recent call last): ... IndexError: list index out of range
-
_getvalue
()¶ Возвращает копию референта.
Если референт является неприкасаемым, это вызовет исключение.
-
__repr__
()¶ Возвращает представление объекта прокси.
-
__str__
()¶ Возвращает представление референта.
-
Очистка¶
Прокси-объект использует обратный вызов weakref, чтобы при сборке мусора он отменил регистрацию от менеджера, которому принадлежит его референт.
Общий объект удаляется из процесса менеджера, когда больше нет прокси, ссылающихся на него.
Пулы процессов¶
Можно создать пул процессов, которые будут выполнять задания, переданные им с помощью класса Pool
.
-
class
multiprocessing.pool.
Pool
([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])¶ Объект пула процессов, который управляет пулом рабочих процессов, на которые могут быть отправлены задания. Он поддерживает асинхронные результаты с таймаутами и обратными вызовами и имеет параллельную реализацию карты.
processes - это количество рабочих процессов, которые необходимо использовать. Если processes равно
None
, то используется число, возвращаемоеos.cpu_count()
.Если initializer не
None
, то каждый рабочий процесс будет вызыватьinitializer(*initargs)
при запуске.maxtasksperchild - это количество задач, которые рабочий процесс может выполнить, прежде чем он завершится и будет заменен новым рабочим процессом, чтобы освободить неиспользуемые ресурсы. По умолчанию maxtasksperchild равно
None
, что означает, что рабочие процессы будут жить столько же, сколько и пул.context может использоваться для указания контекста, используемого для запуска рабочих процессов. Обычно пул создается с помощью функции
multiprocessing.Pool()
или методаPool()
объекта контекста. В обоих случаях context устанавливается соответствующим образом.Обратите внимание, что методы объекта пула должны вызываться только процессом, который создал пул.
Предупреждение
Объекты
multiprocessing.pool
имеют внутренние ресурсы, которыми необходимо правильно управлять (как и любым другим ресурсом), используя пул в качестве менеджера контекста или вызываяclose()
иterminate()
вручную. Если этого не сделать, процесс может зависнуть при завершении.Обратите внимание, что неправильно полагаться на сборщик мусора для уничтожения пула, поскольку CPython не гарантирует, что финализатор пула будет вызван (см.
object.__del__()
для получения дополнительной информации).Добавлено в версии 3.2: maxtasksperchild
Добавлено в версии 3.4: контекст
Примечание
Рабочие процессы в пуле
Pool
обычно живут в течение всего времени очереди работ пула. Часто встречающийся в других системах (таких как Apache, mod_wsgi и т.д.) паттерн освобождения ресурсов, удерживаемых рабочими процессами, состоит в том, чтобы позволить рабочему процессу в пуле завершить только заданный объем работы перед выходом, очисткой и порождением нового процесса взамен старого. Аргумент maxtasksperchild вPool
открывает эту возможность конечному пользователю.-
apply
(func[, args[, kwds]])¶ Вызовите func с аргументами args и аргументами ключевых слов kwds. Она блокируется до тех пор, пока не будет готов результат. Учитывая эту блокировку,
apply_async()
лучше подходит для параллельного выполнения работы. Кроме того, func выполняется только в одном из рабочих пула.
-
apply_async
(func[, args[, kwds[, callback[, error_callback]]]])¶ Вариант метода
apply()
, который возвращает объектAsyncResult
.Если указан callback, то это должен быть вызываемый элемент, принимающий один аргумент. Когда результат становится готовым, к нему применяется callback, то есть если вызов не завершился неудачей, в этом случае вместо него применяется error_callback.
Если указан error_callback, то он должен быть вызываемой функцией, принимающей один аргумент. Если целевая функция не работает, то error_callback вызывается с экземпляром исключения.
Обратные вызовы должны завершаться немедленно, поскольку в противном случае поток, обрабатывающий результаты, будет заблокирован.
-
map
(func, iterable[, chunksize])¶ Параллельный эквивалент встроенной функции
map()
(поддерживает только один аргумент iterable, для нескольких итераций см.starmap()
). Она блокируется до тех пор, пока не будет готов результат.Этот метод разбивает итерабельную таблицу на несколько фрагментов, которые он передает в пул процессов как отдельные задачи. Размер (приблизительный) этих фрагментов можно задать, установив chunksize в целое положительное число.
Обратите внимание, что это может привести к большим затратам памяти для очень длинных итераций. Рассмотрите возможность использования
imap()
илиimap_unordered()
с явным параметром chunksize для большей эффективности.
-
map_async
(func, iterable[, chunksize[, callback[, error_callback]]])¶ Вариант метода
map()
, который возвращает объектAsyncResult
.Если указан callback, то это должен быть вызываемый элемент, принимающий один аргумент. Когда результат становится готовым, к нему применяется callback, то есть если вызов не завершился неудачей, в этом случае вместо него применяется error_callback.
Если указан error_callback, то он должен быть вызываемой функцией, принимающей один аргумент. Если целевая функция не работает, то error_callback вызывается с экземпляром исключения.
Обратные вызовы должны завершаться немедленно, поскольку в противном случае поток, обрабатывающий результаты, будет заблокирован.
-
imap
(func, iterable[, chunksize])¶ Более ленивая версия
map()
.Аргумент chunksize такой же, как и аргумент, используемый методом
map()
. Для очень длинных итераций использование большого значения chunksize может ускорить выполнение задания на много, чем использование значения по умолчанию1
.Также если chunksize равен
1
, то методnext()
итератора, возвращаемого методомimap()
, имеет необязательный параметр timeout:next(timeout)
вызовет ошибкуmultiprocessing.TimeoutError
, если результат не может быть возвращен в течение timeout секунд.
-
imap_unordered
(func, iterable[, chunksize])¶ То же самое, что и
imap()
, за исключением того, что порядок следования результатов из возвращаемого итератора следует считать произвольным. (Только при наличии только одного рабочего процесса порядок гарантированно «правильный»).
-
starmap
(func, iterable[, chunksize])¶ Подобно
map()
, за исключением того, что элементы iterable, как ожидается, будут итерациями, которые распаковываются как аргументы.Следовательно, итерабельность из
[(1,2), (3, 4)]
приводит к[func(1,2), func(3,4)]
.Добавлено в версии 3.3.
-
starmap_async
(func, iterable[, chunksize[, callback[, error_callback]]])¶ Комбинация
starmap()
иmap_async()
, которая выполняет итерацию над iterable итераций и вызывает func с распакованными итерациями. Возвращает объект результата.Добавлено в версии 3.3.
-
close
()¶ Предотвращает дальнейшую передачу заданий в пул. После выполнения всех заданий рабочие процессы завершатся.
-
terminate
()¶ Немедленно останавливает рабочие процессы, не завершая оставшуюся работу. Когда объект пула будет собран
terminate()
будет немедленно вызван.
-
join
()¶ Дождитесь завершения рабочих процессов. Перед использованием
close()
необходимо вызватьterminate()
илиjoin()
.
Добавлено в версии 3.3: Объекты пула теперь поддерживают протокол управления контекстом - см. Типы контекстного менеджера.
__enter__()
возвращает объект пула, а__exit__()
вызываетterminate()
.-
-
class
multiprocessing.pool.
AsyncResult
¶ Класс результата, возвращаемого
Pool.apply_async()
иPool.map_async()
.-
get
([timeout])¶ Верните результат, когда он будет получен. Если timeout не равен
None
и результат не приходит в течение timeout секунд, то выдается сообщениеmultiprocessing.TimeoutError
. Если удаленный вызов вызвал исключение, то это исключение будет повторно вызвано командойget()
.
-
wait
([timeout])¶ Подождите, пока результат будет доступен или пока не пройдет timeout секунд.
-
ready
()¶ Возвращает, завершился ли вызов.
-
successful
()¶ Возвращает, завершился ли вызов без возникновения исключения. Вызовет
ValueError
, если результат не готов.Изменено в версии 3.7: Если результат не готов, вместо
ValueError
выдаетсяAssertionError
.
-
Следующий пример демонстрирует использование пула:
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
with Pool(processes=4) as pool: # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print(next(it)) # prints "0"
print(next(it)) # prints "1"
print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print(result.get(timeout=1)) # raises multiprocessing.TimeoutError
Слушатели и клиенты¶
Обычно передача сообщений между процессами осуществляется с помощью очередей или с помощью объектов Connection
, возвращаемых Pipe()
.
Однако модуль multiprocessing.connection
обеспечивает дополнительную гибкость. В основном он предоставляет высокоуровневый API, ориентированный на сообщения, для работы с сокетами или именованными трубами Windows. Он также поддерживает digest-аутентификацию с помощью модуля hmac
и опрос нескольких соединений одновременно.
-
multiprocessing.connection.
deliver_challenge
(connection, authkey)¶ Отправьте случайно сгенерированное сообщение на другой конец соединения и дождитесь ответа.
Если ответ соответствует дайджесту сообщения с использованием authkey в качестве ключа, то на другой конец соединения отправляется приветственное сообщение. В противном случае выдается сообщение
AuthenticationError
.
-
multiprocessing.connection.
answer_challenge
(connection, authkey)¶ Получение сообщения, вычисление дайджеста сообщения с использованием authkey в качестве ключа, а затем отправка дайджеста обратно.
Если приветственное сообщение не получено, то выдается сообщение
AuthenticationError
.
-
multiprocessing.connection.
Client
(address[, family[, authkey]])¶ Попытка установить соединение со слушателем, который использует адрес address, возвращая
Connection
.Тип соединения определяется аргументом family, но обычно его можно опустить, так как он может быть выведен из формата address. (См. Форматы адресов)
Если указан authkey, а не None, он должен быть байтовой строкой и будет использоваться в качестве секретного ключа для вызова аутентификации на основе HMAC. Аутентификация не выполняется, если authkey равен None. При неудачной аутентификации выдается сообщение
AuthenticationError
. См. Ключи аутентификации.
-
class
multiprocessing.connection.
Listener
([address[, family[, backlog[, authkey]]]])¶ Обертка для связанного сокета или именованной трубы Windows, которая «слушает» соединения.
address - это адрес, который будет использоваться связанным сокетом или именованной трубой объекта слушателя.
Примечание
Если используется адрес „0.0.0.0“, этот адрес не будет подключаемой конечной точкой в Windows. Если вам требуется подключаемая конечная точка, следует использовать адрес „127.0.0.0.1“.
family - это тип используемого сокета (или именованной трубы). Это может быть одна из строк
'AF_INET'
(для сокета TCP),'AF_UNIX'
(для сокета домена Unix) или'AF_PIPE'
(для именованной трубы Windows). Из них только первый гарантированно доступен. Если семейство равноNone
, то семейство определяется по формату адрес. Если address такжеNone
, то выбирается значение по умолчанию. Это семейство по умолчанию считается самым быстрым из доступных. См. Форматы адресов. Обратите внимание, что если семейство равно'AF_UNIX'
и адрес равенNone
, то сокет будет создан в частном временном каталоге, созданном с помощьюtempfile.mkstemp()
.Если объект слушателя использует сокет, то backlog (по умолчанию 1) передается в метод
listen()
сокета после его привязки.Если указан authkey, а не None, он должен быть байтовой строкой и будет использоваться в качестве секретного ключа для вызова аутентификации на основе HMAC. Аутентификация не выполняется, если authkey равен None. При неудачной аутентификации выдается сообщение
AuthenticationError
. См. Ключи аутентификации.-
accept
()¶ Принимает соединение на связанном сокете или именованной трубе объекта слушателя и возвращает объект
Connection
. Если попытка аутентификации была предпринята и не удалась, то выдается сообщениеAuthenticationError
.
-
close
()¶ Закрытие связанного сокета или именованной трубы объекта слушателя. Этот вызов происходит автоматически при сборке мусора слушателя. Однако рекомендуется вызывать его явно.
Объекты слушателей имеют следующие свойства, доступные только для чтения:
-
address
¶ Адрес, который используется объектом Listener.
-
last_accepted
¶ Адрес, с которого пришло последнее принятое соединение. Если он недоступен, то это
None
.
Добавлено в версии 3.3: Объекты слушателей теперь поддерживают протокол управления контекстом - см. Типы контекстного менеджера.
__enter__()
возвращает объект слушателя, а__exit__()
вызываетclose()
.-
-
multiprocessing.connection.
wait
(object_list, timeout=None)¶ Подождите, пока объект из object_list будет готов. Возвращает список тех объектов в object_list, которые готовы. Если timeout имеет значение float, то вызов блокируется не более чем на столько секунд. Если timeout равен
None
, то вызов будет блокироваться неограниченное время. Отрицательный таймаут эквивалентен нулевому таймауту.Как для Unix, так и для Windows, объект может появиться в object_list, если он является
читаемый объект
Connection
;подключенный и читаемый объект
socket.socket
; или
Соединение или объект сокета готов, когда есть данные, доступные для чтения из него, или другой конец закрыт.
Unix:
wait(object_list, timeout)
почти эквивалентенselect.select(object_list, [], [], timeout)
. Разница в том, что еслиselect.select()
прерван сигналом, он может поднятьOSError
с номером ошибкиEINTR
, тогда какwait()
этого не сделает.Windows: Элемент в object_list должен быть либо целочисленным хэндлом, который является ожидаемым (в соответствии с определением, используемым в документации Win32 функции
WaitForMultipleObjects()
), либо объектом с методомfileno()
, который возвращает хэндл сокета или хэндл трубы. (Обратите внимание, что дескрипторы труб и дескрипторы сокетов не являются **** ожидающими дескрипторами).Добавлено в версии 3.3.
Примеры.
Следующий код сервера создает слушателя, который использует 'secret password'
в качестве ключа аутентификации. Затем он ожидает соединения и отправляет некоторые данные клиенту:
from multiprocessing.connection import Listener
from array import array
address = ('localhost', 6000) # family is deduced to be 'AF_INET'
with Listener(address, authkey=b'secret password') as listener:
with listener.accept() as conn:
print('connection accepted from', listener.last_accepted)
conn.send([2.25, None, 'junk', float])
conn.send_bytes(b'hello')
conn.send_bytes(array('i', [42, 1729]))
Следующий код подключается к серверу и получает от него некоторые данные:
from multiprocessing.connection import Client
from array import array
address = ('localhost', 6000)
with Client(address, authkey=b'secret password') as conn:
print(conn.recv()) # => [2.25, None, 'junk', float]
print(conn.recv_bytes()) # => 'hello'
arr = array('i', [0, 0, 0, 0, 0])
print(conn.recv_bytes_into(arr)) # => 8
print(arr) # => array('i', [42, 1729, 0, 0, 0])
Следующий код использует wait()
для ожидания сообщений от нескольких процессов одновременно:
import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait
def foo(w):
for i in range(10):
w.send((i, current_process().name))
w.close()
if __name__ == '__main__':
readers = []
for i in range(4):
r, w = Pipe(duplex=False)
readers.append(r)
p = Process(target=foo, args=(w,))
p.start()
# We close the writable end of the pipe now to be sure that
# p is the only process which owns a handle for it. This
# ensures that when p closes its handle for the writable end,
# wait() will promptly report the readable end as being ready.
w.close()
while readers:
for r in wait(readers):
try:
msg = r.recv()
except EOFError:
readers.remove(r)
else:
print(msg)
Форматы адресов¶
Адрес
'AF_INET'
представляет собой кортеж вида(hostname, port)
, где hostname - строка, а port - целое число.Адрес
'AF_UNIX'
- это строка, представляющая имя файла в файловой системе.Адрес
'AF_PIPE'
- это строка видаr'\.\pipe{PipeName}'
. Чтобы использоватьClient()
для подключения к именованной трубе на удаленном компьютере под названием ServerName, необходимо использовать адрес видаr'\ServerName\pipe{PipeName}'
.
Обратите внимание, что любая строка, начинающаяся с двух обратных косых черт, по умолчанию считается адресом 'AF_PIPE'
, а не адресом 'AF_UNIX'
.
Ключи аутентификации¶
Когда вы используете Connection.recv
, полученные данные автоматически распаковываются. К сожалению, распаковка данных из ненадежного источника представляет собой риск для безопасности. Поэтому Listener
и Client()
используют модуль hmac
для обеспечения аутентификации дайджеста.
Ключ аутентификации - это байтовая строка, которую можно рассматривать как пароль: после установления соединения оба конца будут требовать доказательств того, что другой знает ключ аутентификации. (Демонстрация того, что оба конца используют один и тот же ключ, не включает передачу ключа по соединению).
Если аутентификация запрошена, но ключ аутентификации не указан, то используется возвращаемое значение current_process().authkey
(см. Process
). Это значение будет автоматически наследоваться любым объектом Process
, который создаст текущий процесс. Это означает, что (по умолчанию) все процессы многопроцессной программы будут иметь один ключ аутентификации, который можно использовать при установке соединений между собой.
Подходящие ключи аутентификации также могут быть сгенерированы с помощью os.urandom()
.
Ведение журнала¶
Имеется некоторая поддержка протоколирования. Обратите внимание, однако, что пакет logging
не использует общие блокировки процессов, поэтому возможно (в зависимости от типа обработчика) смешивание сообщений от разных процессов.
-
multiprocessing.
get_logger
()¶ Возвращает регистратор, используемый
multiprocessing
. При необходимости будет создан новый.При первом создании регистратор имеет уровень
logging.NOTSET
и не имеет обработчика по умолчанию. Сообщения, отправленные в этот регистратор, по умолчанию не будут передаваться в корневой регистратор.Обратите внимание, что в Windows дочерние процессы наследуют только уровень регистратора родительского процесса - любая другая настройка регистратора не наследуется.
-
multiprocessing.
log_to_stderr
(level=None)¶ Эта функция выполняет вызов
get_logger()
, но в дополнение к возврату логгера, созданного get_logger, она добавляет обработчик, который отправляет вывод наsys.stderr
, используя формат'[%(levelname)s/%(processName)s] %(message)s'
. Вы можете изменитьlevelname
логгера, передав аргументlevel
.
Ниже приведен пример сеанса с включенным протоколированием:
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0
Полную таблицу уровней протоколирования см. в модуле logging
.
Модуль multiprocessing.dummy
¶
multiprocessing.dummy
повторяет API multiprocessing
, но является не более чем оберткой вокруг модуля threading
.
В частности, функция Pool
, предоставляемая multiprocessing.dummy
, возвращает экземпляр ThreadPool
, который является подклассом Pool
, поддерживающим все те же вызовы методов, но использующим пул рабочих потоков, а не рабочих процессов.
-
class
multiprocessing.pool.
ThreadPool
([processes[, initializer[, initargs]]])¶ Объект пула потоков, который управляет пулом рабочих потоков, на которые могут быть отправлены задания. Экземпляры
ThreadPool
полностью совместимы по интерфейсу с экземплярамиPool
, и их ресурсами также необходимо правильно управлять, либо используя пул в качестве менеджера контекста, либо вызываяclose()
иterminate()
вручную.processes - это количество рабочих потоков, которые необходимо использовать. Если processes равно
None
, то используется число, возвращаемоеos.cpu_count()
.Если initializer не
None
, то каждый рабочий процесс будет вызыватьinitializer(*initargs)
при запуске.В отличие от
Pool
, maxtasksperchild и context не могут быть предоставлены.Примечание
ThreadPool
имеет тот же интерфейс, что иPool
, который разработан вокруг пула процессов и предшествует появлению модуляconcurrent.futures
. Как таковой, он наследует некоторые операции, которые не имеют смысла для пула, поддерживаемого потоками, и имеет свой собственный тип для представления статуса асинхронных заданий,AsyncResult
, который не понимается другими библиотеками.Пользователи обычно предпочитают использовать
concurrent.futures.ThreadPoolExecutor
, который имеет более простой интерфейс, который с самого начала был разработан с учетом потоков, и который возвращает экземплярыconcurrent.futures.Future
, совместимые со многими другими библиотеками, включаяasyncio
.
Руководство по программированию¶
Существуют определенные рекомендации и идиомы, которых следует придерживаться при использовании multiprocessing
.
Все методы запуска¶
Следующее относится ко всем методам запуска.
Избегайте общего состояния
По возможности нужно стараться избегать перемещения больших объемов данных между процессами.
Вероятно, лучше придерживаться использования очередей или труб для связи между процессами, а не использовать примитивы синхронизации нижнего уровня.
Маринуемость
Убедитесь, что аргументы методов прокси являются picklable.
Безопасность прокси-серверов
Не используйте прокси-объект из более чем одного потока, если вы не защитили его блокировкой.
(Никогда не возникает проблем с использованием разными процессами одного и того же прокси).
Объединение зомби-процессов
В Unix, когда процесс завершается, но не был присоединен, он становится зомби. Их никогда не должно быть очень много, потому что каждый раз, когда начинается новый процесс (или вызывается
active_children()
), все завершенные процессы, которые еще не были присоединены, будут присоединены. Также вызовProcess.is_alive
завершенного процесса приведет к его присоединению. Даже в этом случае, вероятно, хорошей практикой является явное присоединение всех процессов, которые вы запускаете.
Лучше наследовать, чем отбирать/не отбирать
При использовании методов запуска spawn или forkserver многие типы из
multiprocessing
должны быть picklable, чтобы дочерние процессы могли их использовать. Однако, как правило, следует избегать передачи разделяемых объектов другим процессам с помощью труб или очередей. Вместо этого следует организовать программу так, чтобы процесс, которому нужен доступ к разделяемому ресурсу, созданному в другом месте, мог унаследовать его от процесса-предка.
Избегайте завершения процессов
Использование метода
Process.terminate
для остановки процесса может привести к тому, что любые общие ресурсы (такие как блокировки, семафоры, каналы и очереди), используемые процессом в данный момент, станут сломанными или недоступными для других процессов.Поэтому, вероятно, лучше использовать
Process.terminate
только для процессов, которые никогда не используют общие ресурсы.
Присоединение к процессам, использующим очереди
Имейте в виду, что процесс, поместивший элементы в очередь, будет ждать перед завершением, пока все буферизованные элементы не будут переданы потоком «фидера» в базовую трубу. (Дочерний процесс может вызвать метод
Queue.cancel_join_thread
очереди, чтобы избежать такого поведения).Это означает, что при использовании очереди необходимо убедиться, что все элементы, которые были поставлены в очередь, в конечном итоге будут удалены до того, как процесс будет завершен. В противном случае вы не можете быть уверены, что процессы, поставившие элементы в очередь, завершатся. Помните также, что недемонические процессы будут присоединяться автоматически.
Пример, который приведет к тупику, следующий:
from multiprocessing import Process, Queue def f(q): q.put('X' * 1000000) if __name__ == '__main__': queue = Queue() p = Process(target=f, args=(queue,)) p.start() p.join() # this deadlocks obj = queue.get()Исправлением здесь может быть замена двух последних строк местами (или просто удаление строки
p.join()
).
Явная передача ресурсов дочерним процессам
В Unix при использовании метода запуска fork дочерний процесс может использовать общий ресурс, созданный в родительском процессе с помощью глобального ресурса. Однако лучше передать объект в качестве аргумента в конструктор дочернего процесса.
Помимо того, что код (потенциально) совместим с Windows и другими методами запуска, это также гарантирует, что пока дочерний процесс жив, объект не будет собран в родительском процессе. Это может быть важно, если какой-то ресурс освобождается, когда объект собирается в родительском процессе.
Так, например,
from multiprocessing import Process, Lock def f(): ... do something using "lock" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f).start()следует переписать как
from multiprocessing import Process, Lock def f(l): ... do something using "l" ... if __name__ == '__main__': lock = Lock() for i in range(10): Process(target=f, args=(lock,)).start()
Остерегайтесь замены sys.stdin
на «файлоподобный объект»
multiprocessing
первоначально безоговорочно назывался:os.close(sys.stdin.fileno())в методе
multiprocessing.Process._bootstrap()
— это приводило к проблемам с процессами в процессах. Это было изменено на:sys.stdin.close() sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)Это решает фундаментальную проблему столкновения процессов друг с другом, что приводит к ошибке плохого дескриптора файла, но создает потенциальную опасность для приложений, которые заменяют
sys.stdin()
на «файлоподобный объект» с буферизацией вывода. Эта опасность заключается в том, что если несколько процессов вызовутclose()
на этом файлоподобном объекте, это может привести к тому, что одни и те же данные будут выгружены в объект несколько раз, что приведет к повреждению.Если вы напишете файлоподобный объект и реализуете собственное кэширование, вы можете сделать его fork-safe, сохраняя pid при каждом добавлении в кэш и отбрасывая кэш при изменении pid. Например:
@property def cache(self): pid = os.getpid() if pid != self._pid: self._pid = pid self._cache = [] return self._cacheДля получения дополнительной информации смотрите bpo-5155, bpo-5313 и bpo-5331.
Методы запуска spawn и forkserver¶
Есть несколько дополнительных ограничений, которые не применяются к методу запуска fork.
Большая маринованность
Убедитесь, что все аргументы метода
Process.__init__()
являются picklable. Также, если вы подклассProcess
, то убедитесь, что экземпляры будут picklable при вызове методаProcess.start
.
Глобальные переменные
Следует помнить, что если код, запущенный в дочернем процессе, попытается получить доступ к глобальной переменной, то значение, которое он увидит (если оно есть), может не совпадать со значением в родительском процессе в момент вызова
Process.start
.Однако глобальные переменные, которые являются просто константами уровня модуля, не вызывают проблем.
Безопасный импорт главного модуля
Убедитесь, что главный модуль может быть безопасно импортирован новым интерпретатором Python без возникновения непреднамеренных побочных эффектов (например, запуска нового процесса).
Например, при использовании метода запуска spawn или forkserver запуск следующего модуля завершится ошибкой
RuntimeError
:from multiprocessing import Process def foo(): print('hello') p = Process(target=foo) p.start()Вместо этого следует защитить «точку входа» программы, используя
if __name__ == '__main__':
следующим образом:from multiprocessing import Process, freeze_support, set_start_method def foo(): print('hello') if __name__ == '__main__': freeze_support() set_start_method('spawn') p = Process(target=foo) p.start()(Строку
freeze_support()
можно опустить, если программа будет выполняться нормально, а не заморожена).Это позволяет вновь порожденному интерпретатору Python безопасно импортировать модуль и затем выполнить функцию модуля
foo()
.Аналогичные ограничения действуют, если пул или менеджер создается в главном модуле.
Примеры¶
Демонстрация того, как создавать и использовать настраиваемые менеджеры и прокси:
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
##
class Foo:
def f(self):
print('you called Foo.f()')
def g(self):
print('you called Foo.g()')
def _h(self):
print('you called Foo._h()')
# A simple generator function
def baz():
for i in range(10):
yield i*i
# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
_exposed_ = ['__next__']
def __iter__(self):
return self
def __next__(self):
return self._callmethod('__next__')
# Function to return the operator module
def get_operator_module():
return operator
##
class MyManager(BaseManager):
pass
# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)
# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))
# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)
# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)
##
def test():
manager = MyManager()
manager.start()
print('-' * 20)
f1 = manager.Foo1()
f1.f()
f1.g()
assert not hasattr(f1, '_h')
assert sorted(f1._exposed_) == sorted(['f', 'g'])
print('-' * 20)
f2 = manager.Foo2()
f2.g()
f2._h()
assert not hasattr(f2, 'f')
assert sorted(f2._exposed_) == sorted(['g', '_h'])
print('-' * 20)
it = manager.baz()
for i in it:
print('<%d>' % i, end=' ')
print()
print('-' * 20)
op = manager.operator()
print('op.add(23, 45) =', op.add(23, 45))
print('op.pow(2, 94) =', op.pow(2, 94))
print('op._exposed_ =', op._exposed_)
##
if __name__ == '__main__':
freeze_support()
test()
Использование Pool
:
import multiprocessing
import time
import random
import sys
#
# Functions used by test code
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, args, result
)
def calculatestar(args):
return calculate(*args)
def mul(a, b):
time.sleep(0.5 * random.random())
return a * b
def plus(a, b):
time.sleep(0.5 * random.random())
return a + b
def f(x):
return 1.0 / (x - 5.0)
def pow3(x):
return x ** 3
def noop(x):
pass
#
# Test code
#
def test():
PROCESSES = 4
print('Creating pool with %d processes\n' % PROCESSES)
with multiprocessing.Pool(PROCESSES) as pool:
#
# Tests
#
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print('Ordered results using pool.apply_async():')
for r in results:
print('\t', r.get())
print()
print('Ordered results using pool.imap():')
for x in imap_it:
print('\t', x)
print()
print('Unordered results using pool.imap_unordered():')
for x in imap_unordered_it:
print('\t', x)
print()
print('Ordered results using pool.map() --- will block till complete:')
for x in pool.map(calculatestar, TASKS):
print('\t', x)
print()
#
# Test error handling
#
print('Testing error handling:')
try:
print(pool.apply(f, (5,)))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.apply()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(pool.map(f, list(range(10))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.map()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(list(pool.imap(f, list(range(10)))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from list(pool.imap())')
else:
raise AssertionError('expected ZeroDivisionError')
it = pool.imap(f, list(range(10)))
for i in range(10):
try:
x = next(it)
except ZeroDivisionError:
if i == 5:
pass
except StopIteration:
break
else:
if i == 5:
raise AssertionError('expected ZeroDivisionError')
assert i == 9
print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
print()
#
# Testing timeouts
#
print('Testing ApplyResult.get() with timeout:', end=' ')
res = pool.apply_async(calculate, TASKS[0])
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % res.get(0.02))
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
print('Testing IMapIterator.next() with timeout:', end=' ')
it = pool.imap(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.02))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
if __name__ == '__main__':
multiprocessing.freeze_support()
test()
Пример, показывающий, как использовать очереди для подачи заданий на набор рабочих процессов и сбора результатов:
import time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
#
# Function used to calculate result
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
#
# Functions referenced by tasks
#
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
#
#
#
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) for i in range(20)]
TASKS2 = [(plus, (i, 8)) for i in range(10)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print('Unordered results:')
for i in range(len(TASKS1)):
print('\t', done_queue.get())
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print('\t', done_queue.get())
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()