multiprocessing — Параллелизм на основе процессов

Исходный код: Lib/multiprocessing/.


Введение

multiprocessing - это пакет, поддерживающий порождение процессов с помощью API, аналогичного модулю threading. Пакет multiprocessing предлагает локальный и удаленный параллелизм, эффективно обходя Global Interpreter Lock путем использования подпроцессов вместо потоков. Благодаря этому модуль multiprocessing позволяет программисту полностью задействовать несколько процессоров на данной машине. Он работает как на Unix, так и на Windows.

Модуль multiprocessing также вводит API, не имеющие аналогов в модуле threading. Ярким примером этого является объект Pool, который предлагает удобное средство распараллеливания выполнения функции по нескольким входным значениям, распределяя входные данные между процессами (параллелизм данных). Следующий пример демонстрирует обычную практику определения таких функций в модуле, чтобы дочерние процессы могли успешно импортировать этот модуль. Этот базовый пример параллелизма данных с использованием Pool,

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

выведет на стандартный вывод

[1, 4, 9]

См.также

concurrent.futures.ProcessPoolExecutor предлагает интерфейс более высокого уровня для передачи задач в фоновый процесс без блокирования выполнения вызывающего процесса. По сравнению с непосредственным использованием интерфейса Pool, API concurrent.futures позволяет отделить передачу работы в пул базовых процессов от ожидания результатов.

Класс Process

В multiprocessing процессы порождаются путем создания объекта Process и последующего вызова его метода start(). Process следует API threading.Thread. Тривиальным примером многопроцессной программы является

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

Чтобы показать отдельные идентификаторы процессов, вот расширенный пример:

from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

Для объяснения того, зачем нужна часть if __name__ == '__main__', смотрите Руководство по программированию.

Контексты и методы запуска

В зависимости от платформы, multiprocessing поддерживает три способа запуска процесса. Этими способами запуска являются

spawn

Родительский процесс запускает новый процесс интерпретатора Python. Дочерний процесс наследует только те ресурсы, которые необходимы для выполнения метода run() объекта process. В частности, ненужные дескрипторы файлов и хэндлы от родительского процесса не будут унаследованы. Запуск процесса с помощью этого метода довольно медленный по сравнению с использованием fork или forkserver.

Доступен на Unix и Windows. По умолчанию в Windows и macOS.

fork

Родительский процесс использует команду os.fork() для форка интерпретатора Python. Дочерний процесс, когда он запускается, фактически идентичен родительскому процессу. Все ресурсы родительского процесса наследуются дочерним процессом. Обратите внимание, что безопасное форкирование многопоточного процесса проблематично.

Доступно только на Unix. По умолчанию в Unix.

forkserver

Когда программа запускается и выбирает метод запуска forkserver, запускается серверный процесс. С этого момента всякий раз, когда требуется новый процесс, родительский процесс соединяется с сервером и запрашивает у него форк нового процесса. Процесс fork server является однопоточным, поэтому для него безопасно использовать os.fork(). Никакие ненужные ресурсы не наследуются.

Доступен на платформах Unix, которые поддерживают передачу файловых дескрипторов через Unix pipes.

Изменено в версии 3.8: В macOS метод запуска spawn теперь используется по умолчанию. Метод запуска fork следует считать небезопасным, поскольку он может привести к сбою подпроцесса. См. bpo-33725.

Изменено в версии 3.4: spawn добавлен на всех платформах unix, а forkserver добавлен для некоторых платформ unix. Дочерние процессы больше не наследуют все ручки, наследуемые родителями, в Windows.

На Unix при использовании методов запуска spawn или forkserver также запускается процесс resource tracker, который отслеживает несвязанные именованные системные ресурсы (такие как именованные семафоры или объекты SharedMemory), созданные процессами программы. Когда все процессы завершатся, программа отслеживания ресурсов отсоединяет все оставшиеся отслеживаемые объекты. Обычно их не должно быть, но если процесс был убит сигналом, могут остаться «утечки» ресурсов. (Ни утечка семафоров, ни сегменты общей памяти не будут автоматически отсоединены до следующей перезагрузки. Это проблематично для обоих объектов, поскольку система позволяет использовать только ограниченное количество именованных семафоров, а сегменты разделяемой памяти занимают некоторое пространство в основной памяти).

Для выбора метода запуска используется set_start_method() в пункте if __name__ == '__main__' главного модуля. Например:

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

set_start_method() не должен использоваться более одного раза в программе.

В качестве альтернативы можно использовать get_context() для получения контекстного объекта. Контекстные объекты имеют тот же API, что и модуль мультипроцессинга, и позволяют использовать несколько методов запуска в одной программе.

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

Обратите внимание, что объекты, относящиеся к одному контексту, могут быть несовместимы с процессами для другого контекста. В частности, блокировки, созданные с помощью контекста fork, не могут быть переданы процессам, запущенным с помощью методов spawn или forkserver start.

Библиотека, которая хочет использовать определенный метод запуска, вероятно, должна использовать get_context(), чтобы не вмешиваться в выбор пользователя библиотеки.

Предупреждение

Методы запуска 'spawn' и 'forkserver' в настоящее время не могут быть использованы с «замороженными» исполняемыми файлами (т.е. двоичными файлами, созданными пакетами типа PyInstaller и cx_Freeze) на Unix. Метод запуска 'fork' работает.

Обмен объектами между процессами

multiprocessing поддерживает два типа канала связи между процессами:

Квоты

Класс Queue является почти клоном queue.Queue. Например:

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

Очереди безопасны для потоков и процессов.

Трубы

Функция Pipe() возвращает пару объектов соединения, соединенных трубой, которая по умолчанию является дуплексной (двусторонней). Например:

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

Два объекта соединения, возвращаемые методом Pipe(), представляют собой два конца трубы. Каждый объект соединения имеет методы send() и recv() (среди прочих). Обратите внимание, что данные в трубе могут быть повреждены, если два процесса (или потока) пытаются одновременно читать из или записывать в один и тот же конец трубы. Конечно, нет никакого риска повреждения от процессов, использующих разные концы трубы в одно и то же время.

Синхронизация между процессами

multiprocessing содержит эквиваленты всех примитивов синхронизации из threading. Например, можно использовать блокировку, чтобы гарантировать, что только один процесс одновременно печатает на стандартный вывод:

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

Без использования блокировки выходные данные различных процессов могут смешаться.

Совместное использование состояния между процессами

Как упоминалось выше, при параллельном программировании обычно лучше избегать использования общего состояния, насколько это возможно. Это особенно верно при использовании нескольких процессов.

Однако если вам действительно необходимо использовать общие данные, то multiprocessing предоставляет несколько способов сделать это.

Общая память

Данные могут храниться в общей карте памяти с помощью Value или Array. Например, следующий код

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

напечатает

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

Аргументы 'd' и 'i', используемые при создании num и arr, являются типовыми кодами, используемыми модулем array: 'd' обозначает float двойной точности и 'i' обозначает знаковое целое число. Эти разделяемые объекты будут безопасны для процессов и потоков.

Для большей гибкости в использовании общей памяти можно использовать модуль multiprocessing.sharedctypes, который поддерживает создание произвольных объектов ctypes, выделенных из общей памяти.

Серверный процесс

Объект менеджера, возвращаемый командой Manager(), управляет серверным процессом, который хранит объекты Python и позволяет другим процессам манипулировать ими с помощью прокси.

Менеджер, возвращаемый командой Manager(), будет поддерживать типы list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value и Array. Например,

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)

напечатает

{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

Менеджеры серверных процессов более гибкие, чем использование объектов общей памяти, потому что они могут быть сделаны для поддержки произвольных типов объектов. Кроме того, один менеджер может совместно использоваться процессами на разных компьютерах по сети. Однако они медленнее, чем использование общей памяти.

Использование резерва работников

Класс Pool представляет собой пул рабочих процессов. Он имеет методы, которые позволяют перегружать задачи на рабочие процессы несколькими различными способами.

Например:

from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    # start 4 worker processes
    with Pool(processes=4) as pool:

        # print "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))

        # print same numbers in arbitrary order
        for i in pool.imap_unordered(f, range(10)):
            print(i)

        # evaluate "f(20)" asynchronously
        res = pool.apply_async(f, (20,))      # runs in *only* one process
        print(res.get(timeout=1))             # prints "400"

        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid, ()) # runs in *only* one process
        print(res.get(timeout=1))             # prints the PID of that process

        # launching multiple evaluations asynchronously *may* use more processes
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        # make a single worker sleep for 10 secs
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")

        print("For the moment, the pool remains available for more work")

    # exiting the 'with'-block has stopped the pool
    print("Now the pool is closed and no longer available")

Обратите внимание, что методы пула должны использоваться только тем процессом, который его создал.

Примечание

Функциональность внутри этого пакета требует, чтобы модуль __main__ был импортируемым дочерними модулями. Это описано в Руководство по программированию, однако стоит указать на это здесь. Это означает, что некоторые примеры, такие как multiprocessing.pool.Pool, не будут работать в интерактивном интерпретаторе. Например:

>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
...     return x*x
...
>>> with p:
...   p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'
AttributeError: 'module' object has no attribute 'f'

(Если вы попробуете это сделать, то на самом деле будет выведено три полных трассировки, чередующихся полуслучайным образом, и тогда вам, возможно, придется как-то остановить родительский процесс).

Ссылка

Пакет multiprocessing в основном повторяет API модуля threading.

Process и исключения

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

Объекты процесса представляют деятельность, которая выполняется в отдельном процессе. Класс Process имеет эквиваленты всех методов threading.Thread.

Конструктор всегда должен вызываться с аргументами в виде ключевых слов. group всегда должна быть None; она существует исключительно для совместимости с threading.Thread. target - это вызываемый объект, который будет вызван методом run(). По умолчанию это значение равно None, что означает, что ничего не вызывается. name - это имя процесса (подробнее см. name). args - кортеж аргументов для целевого вызова. kwargs - словарь аргументов с ключевыми словами для целевого вызова. Если указано, то аргумент daemon, содержащий только ключевое слово, устанавливает флаг процесса daemon в значение True или False. Если None (по умолчанию), то этот флаг будет унаследован от создающего процесса.

По умолчанию никакие аргументы не передаются target.

Если подкласс переопределяет конструктор, он должен убедиться, что вызывает конструктор базового класса (Process.__init__()), прежде чем делать что-либо еще с процессом.

Изменено в версии 3.3: Добавлен аргумент daemon.

run()

Метод, представляющий деятельность процесса.

Вы можете переопределить этот метод в подклассе. Стандартный метод run() вызывает вызываемый объект, переданный в конструктор объекта в качестве целевого аргумента, если таковой имеется, с последовательными и ключевыми аргументами, взятыми из аргументов args и kwargs соответственно.

start()

Запустите активность процесса.

Этот вызов должен выполняться не более одного раза для каждого объекта процесса. Он организует вызов метода run() объекта в отдельном процессе.

join([timeout])

Если необязательный аргумент timeout равен None (по умолчанию), метод блокируется до тех пор, пока процесс, чей метод join() вызывается, не завершится. Если timeout - положительное число, метод блокирует не более timeout секунд. Обратите внимание, что метод возвращает None, если процесс завершается или если метод завершает работу. Проверьте exitcode процесса, чтобы определить, завершился ли он.

К процессу можно присоединяться много раз.

Процесс не может присоединиться к самому себе, поскольку это приведет к тупиковой ситуации. Ошибкой является попытка присоединиться к процессу до того, как он был запущен.

name

Имя процесса. Имя - это строка, используемая только для целей идентификации. Оно не имеет семантики. Одно и то же имя может быть присвоено нескольким процессам.

Начальное имя задается конструктором. Если конструктору не предоставлено явное имя, то строится имя вида „Process-N1:N2:…:Nk“, где каждый Nk является N-м дочерним элементом своего родителя.

is_alive()

Возвращает, жив ли процесс.

Грубо говоря, объект процесса жив с момента возврата метода start() до завершения дочернего процесса.

daemon

Флаг демона процесса, булево значение. Он должен быть установлен до вызова start().

Начальное значение наследуется от процесса создания.

Когда процесс завершается, он пытается завершить все свои дочерние демонические процессы.

Обратите внимание, что демоническому процессу не разрешается создавать дочерние процессы. В противном случае демонический процесс оставит своих детей сиротами, если он будет завершен при выходе из родительского процесса. Кроме того, это не демоны или службы Unix, это обычные процессы, которые будут завершены (и не присоединятся), если завершатся не демонические процессы.

В дополнение к API threading.Thread, объекты Process также поддерживают следующие атрибуты и методы:

pid

Возвращает идентификатор процесса. До порождения процесса это будет None.

exitcode

Код завершения дочернего процесса. Это будет None, если процесс еще не завершился.

Если дочерний метод run() вернулся нормально, код выхода будет 0. Если он завершился через sys.exit() с целочисленным аргументом N, код выхода будет N.

Если дочерняя программа завершилась из-за исключения, не пойманного в пределах run(), код выхода будет равен 1. Если он был завершен по сигналу N, код выхода будет отрицательным значением -N.

authkey

Ключ аутентификации процесса (байтовая строка).

При инициализации multiprocessing главному процессу присваивается случайная строка с помощью os.urandom().

При создании объекта Process он наследует ключ аутентификации своего родительского процесса, хотя это можно изменить, установив authkey в другую байтовую строку.

См. Ключи аутентификации.

sentinel

Числовой хэндл системного объекта, который станет «готовым» после завершения процесса.

Вы можете использовать это значение, если хотите ждать сразу несколько событий, используя multiprocessing.connection.wait(). В противном случае вызов join() является более простым.

В Windows это дескриптор ОС, используемый с помощью вызовов API семейства WaitForSingleObject и WaitForMultipleObjects. В Unix это дескриптор файла, используемый с примитивами модуля select.

Добавлено в версии 3.3.

terminate()

Завершить процесс. В Unix это делается с помощью сигнала SIGTERM; в Windows используется сигнал TerminateProcess(). Обратите внимание, что обработчики выхода, предложения finally и т.д. не будут выполнены.

Обратите внимание, что процессы-потомки этого процесса не будут завершены - они просто станут бесхозными.

Предупреждение

Если этот метод используется, когда связанный процесс использует канал или очередь, то канал или очередь могут быть повреждены и стать непригодными для использования другими процессами. Аналогично, если процесс получил блокировку или семафор и т.д., то его завершение может привести к тупику других процессов.

kill()

То же самое, что и terminate(), но с использованием сигнала SIGKILL на Unix.

Добавлено в версии 3.7.

close()

Закрывает объект Process, освобождая все связанные с ним ресурсы. Вызов ValueError происходит, если основной процесс все еще запущен. После успешного возврата close() большинство других методов и атрибутов объекта Process будут вызывать ValueError.

Добавлено в версии 3.7.

Обратите внимание, что методы start(), join(), is_alive(), terminate() и exitcode должны вызываться только процессом, создавшим объект процесса.

Пример использования некоторых методов Process:

 >>> import multiprocessing, time, signal
 >>> p = multiprocessing.Process(target=time.sleep, args=(1000,))
 >>> print(p, p.is_alive())
 <Process ... initial> False
 >>> p.start()
 >>> print(p, p.is_alive())
 <Process ... started> True
 >>> p.terminate()
 >>> time.sleep(0.1)
 >>> print(p, p.is_alive())
 <Process ... stopped exitcode=-SIGTERM> False
 >>> p.exitcode == -signal.SIGTERM
 True
exception multiprocessing.ProcessError

Базовый класс всех исключений multiprocessing.

exception multiprocessing.BufferTooShort

Исключение, вызванное Connection.recv_bytes_into(), когда предоставленный объект буфера слишком мал для прочитанного сообщения.

Если e является экземпляром BufferTooShort, то e.args[0] выдаст сообщение в виде байтовой строки.

exception multiprocessing.AuthenticationError

Возникает при ошибке аутентификации.

exception multiprocessing.TimeoutError

Вызывается методами с таймаутом, когда таймаут истекает.

Трубы и очереди

При использовании нескольких процессов обычно используется передача сообщений для связи между процессами, что позволяет избежать использования примитивов синхронизации, таких как блокировки.

Для передачи сообщений можно использовать Pipe() (для соединения между двумя процессами) или очередь (которая позволяет использовать несколько производителей и потребителей).

Типы Queue, SimpleQueue и JoinableQueue представляют собой многопроизводственные, многопотребительские FIFO очереди, созданные по образцу класса queue.Queue в стандартной библиотеке. Они отличаются тем, что в Queue отсутствуют методы task_done() и join(), введенные в класс queue.Queue в Python 2.5.

Если вы используете JoinableQueue, то вы должны вызывать JoinableQueue.task_done() для каждой задачи, удаленной из очереди, иначе семафор, используемый для подсчета количества незавершенных задач, может в конечном итоге переполниться, вызвав исключение.

Обратите внимание, что можно также создать общую очередь с помощью объекта-менеджера - см. Менеджеры.

Примечание

multiprocessing использует обычные исключения queue.Empty и queue.Full для сигнализации тайм-аута. Они недоступны в пространстве имен multiprocessing, поэтому их необходимо импортировать из queue.

Примечание

Когда объект помещается в очередь, он пикируется, и фоновый поток позже сбрасывает пикированные данные в нижележащую трубу. Это имеет некоторые последствия, которые немного удивляют, но не должны вызывать никаких практических трудностей - если они вас действительно беспокоят, вы можете вместо этого использовать очередь, созданную с помощью manager.

  1. После помещения объекта в пустую очередь может произойти бесконечно малая задержка, прежде чем метод empty() очереди вернется False и get_nowait() сможет вернуться без возникновения queue.Empty.

  2. Если несколько процессов ставят объекты в очередь, возможно, что объекты будут получены на другом конце не по порядку. Однако объекты, выставляемые в очередь одним и тем же процессом, всегда будут располагаться в ожидаемом порядке по отношению друг к другу.

Предупреждение

Если процесс убит с помощью Process.terminate() или os.kill(), когда он пытается использовать Queue, то данные в очереди могут быть повреждены. Это может привести к тому, что любой другой процесс получит исключение при попытке использовать очередь в дальнейшем.

Предупреждение

Как упоминалось выше, если дочерний процесс поместил элементы в очередь (и не использовал JoinableQueue.cancel_join_thread), то этот процесс не завершится, пока все буферизированные элементы не будут смыты в трубу.

Это означает, что если вы попытаетесь присоединиться к этому процессу, вы можете получить тупик, если не будете уверены, что все элементы, которые были поставлены в очередь, были потреблены. Аналогично, если дочерний процесс не является демоническим, то родительский процесс может зависнуть при выходе, когда он попытается присоединиться ко всем своим недемоническим дочерним процессам.

Обратите внимание, что очередь, созданная с помощью менеджера, не имеет этой проблемы. См. раздел Руководство по программированию.

Пример использования очередей для межпроцессного взаимодействия приведен в Примеры.

multiprocessing.Pipe([duplex])

Возвращает пару (conn1, conn2) объектов Connection, представляющих концы трубы.

Если duplex имеет значение True (по умолчанию), то труба является двунаправленной. Если duplex равен False, то труба будет однонаправленной: conn1 может использоваться только для приема сообщений, а conn2 может использоваться только для отправки сообщений.

class multiprocessing.Queue([maxsize])

Возвращает общую очередь процесса, реализованную с помощью трубы и нескольких блокировок/семафоров. Когда процесс впервые помещает элемент в очередь, запускается поток фидера, который передает объекты из буфера в трубу.

Обычные исключения queue.Empty и queue.Full из модуля queue стандартной библиотеки поднимаются для сигнализации тайм-аута.

Queue реализует все методы queue.Queue, кроме task_done() и join().

qsize()

Возвращает приблизительный размер очереди. Из-за семантики многопоточности/многопроцессорности это число не является надежным.

Обратите внимание, что это может вызвать ошибку NotImplementedError на платформах Unix, таких как macOS, где sem_getvalue() не реализована.

empty()

Возвращает True, если очередь пуста, False в противном случае. Из-за семантики многопоточности/многопроцессорности это ненадежно.

full()

Возвращает True, если очередь заполнена, False в противном случае. Из-за семантики многопоточности/многопроцессорности это ненадежно.

put(obj[, block[, timeout]])

Поместить obj в очередь. Если дополнительный аргумент block равен True (по умолчанию) и timeout равен None (по умолчанию), то при необходимости блокируется до тех пор, пока не появится свободный слот. Если timeout - положительное число, то блокировка длится не более timeout секунд и вызывает исключение queue.Full, если в течение этого времени свободный слот не был доступен. В противном случае (block равно False), поместите элемент в очередь, если свободный слот доступен немедленно, иначе вызовите исключение queue.Full (timeout в этом случае игнорируется).

Изменено в версии 3.8: Если очередь закрыта, вместо ValueError выдается AssertionError.

put_nowait(obj)

Эквивалентно put(obj, False).

get([block[, timeout]])

Удалить и вернуть элемент из очереди. Если опциональные args block равно True (по умолчанию) и timeout равно None (по умолчанию), то при необходимости блокируется, пока элемент не будет доступен. Если timeout - положительное число, то блокируется не более timeout секунд и вызывает исключение queue.Empty, если в течение этого времени ни один элемент не был доступен. В противном случае (блок равен False), возвращает элемент, если он доступен немедленно, иначе вызывает исключение queue.Empty (timeout в этом случае игнорируется).

Изменено в версии 3.8: Если очередь закрыта, вместо ValueError выдается OSError.

get_nowait()

Эквивалентно get(False).

multiprocessing.Queue имеет несколько дополнительных методов, которых нет в queue.Queue. Эти методы обычно не нужны для большинства кода:

close()

Указывает, что текущий процесс больше не будет помещать данные в эту очередь. Фоновый поток завершится, как только передаст все буферизованные данные в трубу. Это вызывается автоматически, когда очередь собирается в мусор.

join_thread()

Присоединиться к фоновому потоку. Это можно использовать только после вызова close(). Он блокирует выполнение до выхода из фонового потока, гарантируя, что все данные в буфере были выведены в трубу.

По умолчанию, если процесс не является создателем очереди, то при выходе он попытается присоединиться к фоновому потоку очереди. Процесс может вызвать cancel_join_thread(), чтобы заставить join_thread() ничего не делать.

cancel_join_thread()

Предотвратить блокировку join_thread(). В частности, это предотвращает автоматическое присоединение фонового потока при завершении процесса - см. join_thread().

Более подходящим названием для этого метода может быть allow_exit_without_flush(). Он, скорее всего, приведет к потере данных из очереди, и почти наверняка вам не понадобится его использовать. Он действительно нужен только в том случае, если вам нужно, чтобы текущий процесс завершился немедленно, не дожидаясь, пока данные из очереди будут переданы в базовый канал, и вас не волнует потеря данных.

Примечание

Функциональность этого класса требует наличия функционирующей реализации общего семафора в операционной системе хоста. При отсутствии таковой функциональность этого класса будет отключена, а попытки инстанцировать Queue приведут к появлению ImportError. Дополнительную информацию см. в разделе bpo-3770. То же самое справедливо для любого из специализированных типов очередей, перечисленных ниже.

class multiprocessing.SimpleQueue

Это упрощенный тип Queue, очень близкий к заблокированному Pipe.

close()

Закрыть очередь: освободить внутренние ресурсы.

Очередь не должна больше использоваться после ее закрытия. Например, методы get(), put() и empty() больше не должны вызываться.

Добавлено в версии 3.9.

empty()

Возвращает True, если очередь пуста, False в противном случае.

get()

Удаление и возврат элемента из очереди.

put(item)

Поместите элемент в очередь.

class multiprocessing.JoinableQueue([maxsize])

JoinableQueue, подкласс Queue - это очередь, которая дополнительно имеет методы task_done() и join().

task_done()

Указывает, что ранее поставленная в очередь задача завершена. Используется потребителями очереди. Для каждого get(), используемого для получения задачи, последующий вызов task_done() сообщает очереди, что обработка задачи завершена.

Если join() в настоящее время блокируется, он возобновится, когда все элементы будут обработаны (это означает, что вызов task_done() был получен для каждого элемента, который был put() в очереди).

Вызывает ошибку ValueError, если вызывается большее количество раз, чем было помещено элементов в очередь.

join()

Блокировать, пока все элементы в очереди не будут получены и обработаны.

Счетчик незавершенных задач увеличивается всякий раз, когда в очередь добавляется элемент. Счетчик уменьшается всякий раз, когда потребитель вызывает task_done(), чтобы указать, что элемент был получен и вся работа над ним завершена. Когда количество незавершенных задач падает до нуля, join() разблокируется.

Разное

multiprocessing.active_children()

Возвращает список всех живых детей текущего процесса.

Вызов этого вызова имеет побочный эффект «присоединения» к любым процессам, которые уже завершились.

multiprocessing.cpu_count()

Возвращает количество центральных процессоров в системе.

Это число не эквивалентно количеству CPU, которые может использовать текущий процесс. Количество используемых ЦП можно получить с помощью len(os.sched_getaffinity(0)).

Если количество CPU не может быть определено, выдается сообщение NotImplementedError.

См.также

os.cpu_count()

multiprocessing.current_process()

Возвращает объект Process, соответствующий текущему процессу.

Аналог threading.current_thread().

multiprocessing.parent_process()

Возвращает объект Process, соответствующий родительскому процессу current_process(). Для главного процесса parent_process будет None.

Добавлено в версии 3.8.

multiprocessing.freeze_support()

Добавьте поддержку для случая, когда программа, использующая multiprocessing, была заморожена для создания исполняемого файла Windows. (Проверено с помощью py2exe, PyInstaller и cx_Freeze).

Вызывать эту функцию нужно сразу после строки if __name__ == '__main__' основного модуля. Например:

from multiprocessing import Process, freeze_support

def f():
    print('hello world!')

if __name__ == '__main__':
    freeze_support()
    Process(target=f).start()

Если строка freeze_support() опущена, то попытка запустить замороженный исполняемый файл приведет к ошибке RuntimeError.

Вызов freeze_support() не имеет эффекта при вызове на любой операционной системе, отличной от Windows. Кроме того, если модуль нормально выполняется интерпретатором Python на Windows (программа не была заморожена), то freeze_support() не имеет никакого эффекта.

multiprocessing.get_all_start_methods()

Возвращает список поддерживаемых методов запуска, первый из которых используется по умолчанию. Возможными методами запуска являются 'fork', 'spawn' и 'forkserver'. В Windows доступен только 'spawn'. На Unix всегда поддерживаются 'fork' и 'spawn', при этом по умолчанию используется 'fork'.

Добавлено в версии 3.4.

multiprocessing.get_context(method=None)

Возвращает объект контекста, который имеет те же атрибуты, что и модуль multiprocessing.

Если method равен None, то возвращается контекст по умолчанию. В противном случае method должен быть 'fork', 'spawn', 'forkserver'. Если указанный метод запуска недоступен, выдается сообщение ValueError.

Добавлено в версии 3.4.

multiprocessing.get_start_method(allow_none=False)

Возвращает имя метода запуска, используемого для запуска процессов.

Если метод запуска не был зафиксирован и allow_none равно false, то метод запуска фиксируется по умолчанию и возвращается имя. Если метод запуска не был зафиксирован и allow_none равно true, то возвращается None.

Возвращаемое значение может быть 'fork', 'spawn', 'forkserver' или None. 'fork' является значением по умолчанию в Unix, а 'spawn' является значением по умолчанию в Windows и macOS.

Изменено в версии 3.8: В macOS метод запуска spawn теперь используется по умолчанию. Метод запуска fork следует считать небезопасным, поскольку он может привести к сбою подпроцесса. См. bpo-33725.

Добавлено в версии 3.4.

multiprocessing.set_executable(executable)

Установите путь к интерпретатору Python, который будет использоваться при запуске дочернего процесса. (По умолчанию используется sys.executable). Встраиваемым программам, вероятно, потребуется сделать что-то вроде

set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))

прежде чем они смогут создавать дочерние процессы.

Изменено в версии 3.4: Теперь поддерживается на Unix, когда используется метод запуска 'spawn'.

multiprocessing.set_start_method(method)

Установите метод, который должен использоваться для запуска дочерних процессов. method может быть 'fork', 'spawn' или 'forkserver'.

Обратите внимание, что это должно быть вызвано не более одного раза и должно быть защищено внутри пункта if __name__ == '__main__' главного модуля.

Добавлено в версии 3.4.

Объекты соединения

Объекты соединений позволяют отправлять и получать picklable объекты или строки. Их можно рассматривать как ориентированные на сообщения подключенные сокеты.

Объекты соединений обычно создаются с помощью Pipe - см. также Слушатели и клиенты.

class multiprocessing.connection.Connection
send(obj)

Отправьте на другой конец соединения объект, который должен быть прочитан с помощью recv().

Объект должен быть picklable. Очень большие pickle (примерно 32 MiB+, хотя это зависит от ОС) могут вызвать исключение ValueError.

recv()

Возвращает объект, отправленный с другого конца соединения с помощью send(). Блокируется до тех пор, пока не будет что получать. Вызывает EOFError, если нечего принимать и другой конец был закрыт.

fileno()

Возвращает дескриптор файла или дескриптор, используемый соединением.

close()

Закройте соединение.

Это вызывается автоматически, когда соединение собирается в мусор.

poll([timeout])

Возвращает, есть ли данные, доступные для чтения.

Если timeout не указан, то возврат происходит немедленно. Если timeout - это число, то оно определяет максимальное время блокировки в секундах. Если timeout равен None, то используется бесконечный таймаут.

Обратите внимание, что несколько объектов соединения могут быть опрошены одновременно с помощью multiprocessing.connection.wait().

send_bytes(buffer[, offset[, size]])

Отправка байтовых данных из bytes-like object в виде полного сообщения.

Если задано offset, то данные считываются из этой позиции в буфере. Если указан size, то из буфера будет считано столько байт. Очень большие буферы (примерно 32 MiB+, хотя это зависит от ОС) могут вызвать исключение ValueError

recv_bytes([maxlength])

Возвращает полное сообщение байтовых данных, отправленных с другого конца соединения, в виде строки. Блокируется до тех пор, пока не будет что принимать. Вызывает EOFError, если нечего принимать и другой конец закрыт.

Если указано maxlength и сообщение длиннее, чем maxlength, то будет выдано сообщение OSError и соединение больше не будет доступно для чтения.

Изменено в версии 3.3: Эта функция раньше вызывала IOError, которая теперь является псевдонимом OSError.

recv_bytes_into(buffer[, offset])

Считывает в буфер полное сообщение байтовых данных, отправленное с другого конца соединения, и возвращает количество байтов в сообщении. Блокируется до тех пор, пока не будет что получать. Вызывает EOFError, если нечего принимать и другой конец был закрыт.

buffer должен быть записываемым bytes-like object. Если задано смещение, то сообщение будет записано в буфер с этой позиции. Смещение должно быть неотрицательным целым числом, меньшим, чем длина буфера (в байтах).

Если буфер слишком короткий, то возникает исключение BufferTooShort и полное сообщение доступно как e.args[0], где e - экземпляр исключения.

Изменено в версии 3.3: Сами объекты соединений теперь можно передавать между процессами с помощью Connection.send() и Connection.recv().

Добавлено в версии 3.3: Объекты соединения теперь поддерживают протокол управления контекстом - см. Типы контекстного менеджера. __enter__() возвращает объект соединения, а __exit__() вызывает close().

Например:

>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

Предупреждение

Метод Connection.recv() автоматически распикировывает полученные данные, что может представлять угрозу безопасности, если вы не доверяете процессу, отправившему сообщение.

Поэтому, если объект соединения не был создан с помощью Pipe(), вы должны использовать методы recv() и send() только после выполнения некоторой аутентификации. См. раздел Ключи аутентификации.

Предупреждение

Если процесс будет убит, когда он пытается читать или писать в трубу, то данные в трубе, скорее всего, будут повреждены, поскольку невозможно будет определить, где проходят границы сообщений.

Примитивы синхронизации

Как правило, примитивы синхронизации не так необходимы в многопроцессной программе, как в многопоточной. См. документацию по модулю threading.

Обратите внимание, что можно также создавать примитивы синхронизации с помощью объекта-менеджера - см. Менеджеры.

class multiprocessing.Barrier(parties[, action[, timeout]])

Объект барьера: клон threading.Barrier.

Добавлено в версии 3.3.

class multiprocessing.BoundedSemaphore([value])

Ограниченный объект семафора: близкий аналог threading.BoundedSemaphore.

Единственное отличие от близкого аналога: первый аргумент метода acquire имеет имя block, как это соответствует Lock.acquire().

Примечание

На macOS это неотличимо от Semaphore, поскольку sem_getvalue() на этой платформе не реализовано.

class multiprocessing.Condition([lock])

Переменная условия: псевдоним для threading.Condition.

Если указан lock, то это должен быть объект Lock или RLock из multiprocessing.

Изменено в версии 3.3: Был добавлен метод wait_for().

class multiprocessing.Event

Клон threading.Event.

class multiprocessing.Lock

Объект нерекурсивной блокировки: близкий аналог threading.Lock. Если процесс или поток получил блокировку, последующие попытки получить ее от любого процесса или потока будут блокироваться до тех пор, пока она не будет освобождена; любой процесс или поток может освободить ее. Концепции и поведение threading.Lock применительно к потокам повторяются здесь в multiprocessing.Lock применительно либо к процессам, либо к потокам, за исключением отмеченного.

Обратите внимание, что Lock на самом деле является фабричной функцией, которая возвращает экземпляр multiprocessing.synchronize.Lock, инициализированный контекстом по умолчанию.

Lock поддерживает протокол context manager и поэтому может использоваться в утверждениях with.

acquire(block=True, timeout=None)

Получение блокировки, блокирующей или неблокирующей.

Если аргумент block установлен в True (по умолчанию), вызов метода будет блокироваться до тех пор, пока замок не будет находиться в разблокированном состоянии, затем установит его в заблокированное состояние и вернет True. Обратите внимание, что имя этого первого аргумента отличается от имени аргумента в threading.Lock.acquire().

Если аргумент block установлен в False, вызов метода не блокируется. Если замок в данный момент находится в заблокированном состоянии, верните False; в противном случае переведите замок в заблокированное состояние и верните True.

При вызове с положительным значением с плавающей точкой для timeout, блокируется не более чем на количество секунд, указанное timeout, до тех пор, пока блокировка не может быть получена. Вызовы с отрицательным значением timeout эквивалентны нулевому timeout. Вызовы со значением timeout None (по умолчанию) устанавливают период тайм-аута бесконечным. Обратите внимание, что обработка отрицательных или None значений для timeout отличается от реализованного поведения в threading.Lock.acquire(). Аргумент timeout не имеет практических последствий, если аргумент block установлен в False и поэтому игнорируется. Возвращает True, если блокировка была получена, или False, если истек период тайм-аута.

release()

Освободить блокировку. Эта функция может быть вызвана из любого процесса или потока, а не только из процесса или потока, который первоначально получил блокировку.

Поведение такое же, как и в threading.Lock.release(), за исключением того, что при вызове на разблокированной блокировке возникает ошибка ValueError.

class multiprocessing.RLock

Объект рекурсивной блокировки: близкий аналог threading.RLock. Рекурсивная блокировка должна быть освобождена процессом или потоком, который ее приобрел. После того как процесс или поток приобрел рекурсивную блокировку, тот же процесс или поток может приобрести ее снова без блокировки; этот процесс или поток должен освободить ее один раз за каждый раз, когда она была приобретена.

Обратите внимание, что RLock на самом деле является фабричной функцией, которая возвращает экземпляр multiprocessing.synchronize.RLock, инициализированный контекстом по умолчанию.

RLock поддерживает протокол context manager и поэтому может использоваться в утверждениях with.

acquire(block=True, timeout=None)

Получение блокировки, блокирующей или неблокирующей.

При вызове с аргументом block, установленным в True, блокируется до тех пор, пока блокировка не перейдет в разблокированное состояние (не будет принадлежать ни одному процессу или потоку), если только блокировка уже не принадлежит текущему процессу или потоку. Затем текущий процесс или поток получает право собственности на блокировку (если он еще не владеет ею), а уровень рекурсии внутри блокировки увеличивается на единицу, в результате чего возвращается значение True. Обратите внимание, что в поведении этого первого аргумента есть несколько отличий от реализации threading.RLock.acquire(), начиная с имени самого аргумента.

При вызове с аргументом block, установленным в False, не блокировать. Если блокировка уже была приобретена (и, следовательно, принадлежит) другим процессом или потоком, текущий процесс или поток не принимает права собственности и уровень рекурсии в блокировке не изменяется, в результате чего возвращается значение False. Если блокировка находится в разблокированном состоянии, текущий процесс или поток получает право собственности, а уровень рекурсии увеличивается, в результате чего возвращается значение True.

Использование и поведение аргумента timeout такие же, как и в Lock.acquire(). Обратите внимание, что некоторые из этих поведений timeout отличаются от реализованных в threading.RLock.acquire().

release()

Освободите блокировку, уменьшив уровень рекурсии. Если после декремента уровень рекурсии равен нулю, сбросьте блокировку на разблокированную (не принадлежащую ни одному процессу или потоку), и если другие процессы или потоки заблокированы в ожидании разблокировки блокировки, разрешите продолжить выполнение только одному из них. Если после декремента уровень рекурсии все еще ненулевой, блокировка остается заблокированной и принадлежит вызывающему процессу или потоку.

Вызывайте этот метод только тогда, когда вызывающий процесс или поток владеет блокировкой. Если этот метод вызывается не владельцем, а процессом или потоком, или если блокировка находится в разблокированном (не принадлежащем владельцу) состоянии, то возникает исключение AssertionError. Обратите внимание, что тип исключения, возникающего в этой ситуации, отличается от реализованного поведения в threading.RLock.release().

class multiprocessing.Semaphore([value])

Объект семафора: близкий аналог threading.Semaphore.

Единственное отличие от близкого аналога: первый аргумент метода acquire имеет имя block, как это соответствует Lock.acquire().

Примечание

В macOS функция sem_timedwait не поддерживается, поэтому вызов acquire() с таймаутом будет эмулировать поведение этой функции с помощью спящего цикла.

Примечание

Если сигнал SIGINT, генерируемый командой Ctrl-C, поступает в то время, когда основной поток заблокирован вызовом BoundedSemaphore.acquire(), Lock.acquire(), RLock.acquire(), Semaphore.acquire(), Condition.acquire() или Condition.wait(), то вызов будет немедленно прерван и будет вызван сигнал KeyboardInterrupt.

Это отличается от поведения threading, где SIGINT будет игнорироваться, пока выполняются эквивалентные блокирующие вызовы.

Примечание

Некоторые функции этого пакета требуют наличия в операционной системе хоста работающей реализации общего семафора. При отсутствии таковой модуль multiprocessing.synchronize будет отключен, а попытки импортировать его приведут к ошибке ImportError. Дополнительную информацию см. в разделе bpo-3770.

Общие ctypes объекты

Можно создавать общие объекты, используя общую память, которая может быть унаследована дочерними процессами.

multiprocessing.Value(typecode_or_type, *args, lock=True)

Возвращает объект ctypes, выделенный из общей памяти. По умолчанию возвращаемое значение фактически является синхронизированной оберткой для объекта. Доступ к самому объекту можно получить через атрибут value в Value.

typecode_or_type определяет тип возвращаемого объекта: это либо тип ctypes, либо односимвольный типовой код типа, используемого модулем array. *args передается конструктору для типа.

Если lock равен True (по умолчанию), то для синхронизации доступа к значению создается новый объект рекурсивной блокировки. Если lock - это объект Lock или RLock, то он будет использоваться для синхронизации доступа к значению. Если lock является False, то доступ к возвращаемому объекту не будет автоматически защищен блокировкой, поэтому он не обязательно будет «безопасным для процесса».

Операции типа +=, включающие чтение и запись, не являются атомарными. Поэтому, если, например, вы хотите атомарно увеличить общее значение, недостаточно просто сделать

counter.value += 1

Если связанная блокировка является рекурсивной (что происходит по умолчанию), вы можете вместо этого сделать

with counter.get_lock():
    counter.value += 1

Обратите внимание, что lock - это аргумент только для ключевого слова.

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

Возвращает массив ctypes, выделенный из общей памяти. По умолчанию возвращаемое значение фактически является синхронизированной оберткой для массива.

typecode_or_type определяет тип элементов возвращаемого массива: это либо тип ctypes, либо односимвольный типовой код, используемый модулем array. Если size_or_initializer - целое число, то оно определяет длину массива, и массив будет изначально обнулен. В противном случае size_or_initializer является последовательностью, которая используется для инициализации массива и длина которой определяет длину массива.

Если lock равен True (по умолчанию), то для синхронизации доступа к значению создается новый объект блокировки. Если lock является объектом Lock или RLock, то он будет использоваться для синхронизации доступа к значению. Если lock имеет значение False, то доступ к возвращаемому объекту не будет автоматически защищен блокировкой, поэтому он не обязательно будет «безопасным для процесса».

Обратите внимание, что lock является аргументом только для ключевого слова.

Обратите внимание, что массив ctypes.c_char имеет атрибуты value и raw, которые позволяют использовать его для хранения и извлечения строк.

Модуль multiprocessing.sharedctypes

Модуль multiprocessing.sharedctypes предоставляет функции для выделения объектов ctypes из общей памяти, которые могут быть унаследованы дочерними процессами.

Примечание

Хотя можно хранить указатель в общей памяти, помните, что он будет ссылаться на местоположение в адресном пространстве конкретного процесса. Однако указатель, скорее всего, будет недействительным в контексте второго процесса, и попытка разыменовать указатель из второго процесса может привести к сбою.

multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)

Возвращает массив ctypes, выделенный из общей памяти.

typecode_or_type определяет тип элементов возвращаемого массива: это либо тип ctypes, либо односимвольный типовой код, используемый модулем array. Если size_or_initializer - целое число, то оно определяет длину массива, и массив будет изначально обнулен. В противном случае size_or_initializer является последовательностью, которая используется для инициализации массива и длина которой определяет длину массива.

Обратите внимание, что установка и получение элемента потенциально неатомарны – используйте Array() вместо этого, чтобы убедиться, что доступ автоматически синхронизируется с помощью блокировки.

multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)

Возвращает объект ctypes, выделенный из общей памяти.

typecode_or_type определяет тип возвращаемого объекта: это либо тип ctypes, либо односимвольный типовой код типа, используемого модулем array. *args передается конструктору для типа.

Обратите внимание, что установка и получение значения потенциально неатомарны – используйте Value() вместо этого, чтобы убедиться, что доступ автоматически синхронизируется с помощью блокировки.

Обратите внимание, что массив ctypes.c_char имеет атрибуты value и raw, которые позволяют использовать его для хранения и извлечения строк - см. документацию для ctypes.

multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *, lock=True)

То же, что и RawArray(), за исключением того, что в зависимости от значения lock вместо необработанного массива ctypes может быть возвращена обёртка синхронизации, безопасная для процесса.

Если lock равен True (по умолчанию), то для синхронизации доступа к значению создается новый объект блокировки. Если lock является объектом Lock или RLock, то он будет использоваться для синхронизации доступа к значению. Если lock имеет значение False, то доступ к возвращаемому объекту не будет автоматически защищен блокировкой, поэтому он не обязательно будет «безопасным для процесса».

Обратите внимание, что lock - это аргумент только для ключевого слова.

multiprocessing.sharedctypes.Value(typecode_or_type, *args, lock=True)

То же, что и RawValue(), за исключением того, что в зависимости от значения lock вместо необработанного объекта ctypes может быть возвращена обёртка синхронизации, безопасная для процесса.

Если lock равен True (по умолчанию), то для синхронизации доступа к значению создается новый объект блокировки. Если lock является объектом Lock или RLock, то он будет использоваться для синхронизации доступа к значению. Если lock имеет значение False, то доступ к возвращаемому объекту не будет автоматически защищен блокировкой, поэтому он не обязательно будет «безопасным для процесса».

Обратите внимание, что lock - это аргумент только для ключевого слова.

multiprocessing.sharedctypes.copy(obj)

Возвращает объект ctypes, выделенный из общей памяти, который является копией объекта ctypes obj.

multiprocessing.sharedctypes.synchronized(obj[, lock])

Возвращает безопасный для процесса объект-обертку для объекта ctypes, который использует lock для синхронизации доступа. Если lock равен None (по умолчанию), то автоматически создается объект multiprocessing.RLock.

Синхронизированная обертка будет иметь два метода в дополнение к методам объекта, который она обертывает: get_obj() возвращает обернутый объект и get_lock() возвращает объект блокировки, используемый для синхронизации.

Обратите внимание, что доступ к объекту ctypes через обертку может быть намного медленнее, чем доступ к необработанному объекту ctypes.

Изменено в версии 3.5: Синхронизированные объекты поддерживают протокол context manager.

В таблице ниже приводится сравнение синтаксиса для создания общих объектов ctypes из общей памяти с обычным синтаксисом ctypes. (В таблице MyStruct является некоторым подклассом ctypes.Structure).

ctypes

sharedctypes с использованием типа

разделяемые типы с использованием типового кода

c_double(2.4)

RawValue(c_double, 2.4)

RawValue(„d“, 2.4)

MyStruct(4, 6)

RawValue(MyStruct, 4, 6)

(c_short * 7)()

RawArray(c_short, 7)

RawArray(„h“, 7)

(c_int * 3)(9, 2, 8)

RawArray(c_int, (9, 2, 8))

RawArray(„i“, (9, 2, 8))

Ниже приведен пример, в котором несколько объектов ctypes изменяются дочерним процессом:

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double

class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]

def modify(n, x, s, A):
    n.value **= 2
    x.value **= 2
    s.value = s.value.upper()
    for a in A:
        a.x **= 2
        a.y **= 2

if __name__ == '__main__':
    lock = Lock()

    n = Value('i', 7)
    x = Value(c_double, 1.0/3.0, lock=False)
    s = Array('c', b'hello world', lock=lock)
    A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)

    p = Process(target=modify, args=(n, x, s, A))
    p.start()
    p.join()

    print(n.value)
    print(x.value)
    print(s.value)
    print([(a.x, a.y) for a in A])

Напечатанные результаты:

49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

Менеджеры

Менеджеры обеспечивают способ создания данных, которые могут совместно использоваться различными процессами, включая обмен по сети между процессами, запущенными на разных машинах. Объект менеджера управляет серверным процессом, который управляет общими объектами. Другие процессы могут получить доступ к общим объектам, используя прокси.

multiprocessing.Manager()

Возвращает объект started SyncManager, который может быть использован для совместного использования объектов между процессами. Возвращаемый объект менеджера соответствует порожденному дочернему процессу и имеет методы, которые будут создавать общие объекты и возвращать соответствующие прокси.

Процессы менеджера будут завершены, как только они будут собраны в мусор или их родительский процесс завершится. Классы менеджеров определены в модуле multiprocessing.managers:

class multiprocessing.managers.BaseManager([address[, authkey]])

Создайте объект BaseManager.

После создания необходимо вызвать start() или get_server().serve_forever(), чтобы убедиться, что объект менеджера относится к запущенному процессу менеджера.

адрес - это адрес, по которому процесс менеджера прослушивает новые соединения. Если address равен None, то выбирается произвольный адрес.

authkey - это ключ аутентификации, который будет использоваться для проверки валидности входящих соединений с серверным процессом. Если authkey равен None, то используется current_process().authkey. В противном случае используется authkey, который должен быть байтовой строкой.

start([initializer[, initargs]])

Запуск подпроцесса для запуска менеджера. Если initializer не None, то при запуске подпроцесс вызовет initializer(*initargs).

get_server()

Возвращает объект Server, представляющий реальный сервер под управлением менеджера. Объект Server поддерживает метод serve_forever():

>>> from multiprocessing.managers import BaseManager
>>> manager = BaseManager(address=('', 50000), authkey=b'abc')
>>> server = manager.get_server()
>>> server.serve_forever()

Server дополнительно имеет атрибут address.

connect()

Подключите локальный объект менеджера к удаленному процессу менеджера:

>>> from multiprocessing.managers import BaseManager
>>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc')
>>> m.connect()
shutdown()

Остановить процесс, используемый менеджером. Это доступно только в том случае, если для запуска процесса сервера использовалась команда start().

Это может быть вызвано несколько раз.

register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])

Метод класса, который может быть использован для регистрации типа или вызываемого объекта в классе менеджера.

typeid - это «идентификатор типа», который используется для идентификации конкретного типа разделяемого объекта. Это должна быть строка.

callable - вызываемый объект, используемый для создания объектов для данного идентификатора типа. Если экземпляр менеджера будет подключен к серверу с помощью метода connect(), или если аргумент create_method равен False, то это можно оставить как None.

proxytype - это подкласс BaseProxy, который используется для создания прокси для общих объектов с данным typeid. Если None, то класс прокси создается автоматически.

exposed используется для указания последовательности имен методов, доступ к которым прокси для данного typeid должен быть разрешен с помощью BaseProxy._callmethod(). (Если exposed равен None, то вместо него используется proxytype._exposed_, если он существует). В случае, когда список exposed не указан, все «публичные методы» разделяемого объекта будут доступны. (Здесь «публичный метод» означает любой атрибут, который имеет метод __call__() и имя которого не начинается с '_').

method_to_typeid - это связка, используемая для указания возвращаемого типа тех открытых методов, которые должны возвращать прокси. Оно сопоставляет имена методов со строками typeid. (Если method_to_typeid имеет значение None, то вместо него используется proxytype._method_to_typeid_, если оно существует). Если имя метода не является ключом этого отображения или если отображение имеет значение None, то объект, возвращаемый методом, будет скопирован по значению.

create_method определяет, должен ли быть создан метод с именем typeid, который может быть использован для указания серверному процессу создать новый разделяемый объект и вернуть для него прокси. По умолчанию это True.

Экземпляры BaseManager также имеют одно свойство, доступное только для чтения:

address

Адрес, используемый менеджером.

Изменено в версии 3.3: Объекты менеджера поддерживают протокол управления контекстом - см. Типы контекстного менеджера. __enter__() запускает процесс сервера (если он еще не запущен) и возвращает объект менеджера. __exit__() вызывает shutdown().

В предыдущих версиях __enter__() не запускал серверный процесс менеджера, если он еще не был запущен.

class multiprocessing.managers.SyncManager

Подкласс BaseManager, который может быть использован для синхронизации процессов. Объекты этого типа возвращаются multiprocessing.Manager().

Его методы создают и возвращают Прокси-объекты для ряда часто используемых типов данных, которые должны быть синхронизированы между процессами. К ним, в частности, относятся общие списки и словари.

Barrier(parties[, action[, timeout]])

Создайте общий объект threading.Barrier и верните для него прокси.

Добавлено в версии 3.3.

BoundedSemaphore([value])

Создайте общий объект threading.BoundedSemaphore и верните для него прокси.

Condition([lock])

Создайте общий объект threading.Condition и верните для него прокси.

Если указан lock, то он должен быть прокси для объекта threading.Lock или threading.RLock.

Изменено в версии 3.3: Был добавлен метод wait_for().

Event()

Создайте общий объект threading.Event и верните для него прокси.

Lock()

Создайте общий объект threading.Lock и верните для него прокси.

Namespace()

Создайте общий объект Namespace и верните для него прокси.

Queue([maxsize])

Создайте общий объект queue.Queue и верните для него прокси.

RLock()

Создайте общий объект threading.RLock и верните для него прокси.

Semaphore([value])

Создайте общий объект threading.Semaphore и верните для него прокси.

Array(typecode, sequence)

Создайте массив и верните для него прокси.

Value(typecode, value)

Создайте объект с атрибутом записи value и верните для него прокси.

dict()
dict(mapping)
dict(sequence)

Создайте общий объект dict и верните для него прокси.

list()
list(sequence)

Создайте общий объект list и верните для него прокси.

Изменено в версии 3.6: Общие объекты могут быть вложенными. Например, разделяемый объект-контейнер, такой как разделяемый список, может содержать другие разделяемые объекты, которые будут управляться и синхронизироваться SyncManager.

class multiprocessing.managers.Namespace

Тип, который может регистрироваться с помощью SyncManager.

Объект пространства имен не имеет открытых методов, но имеет атрибуты, доступные для записи. Его представление показывает значения его атрибутов.

Однако при использовании прокси для объекта пространства имен атрибут, начинающийся с '_', будет атрибутом прокси, а не атрибутом референта:

>>> manager = multiprocessing.Manager()
>>> Global = manager.Namespace()
>>> Global.x = 10
>>> Global.y = 'hello'
>>> Global._z = 12.3    # this is an attribute of the proxy
>>> print(Global)
Namespace(x=10, y='hello')

Индивидуальные менеджеры

Для создания собственного менеджера создается подкласс BaseManager и используется метод класса register() для регистрации новых типов или callables в классе менеджера. Например:

from multiprocessing.managers import BaseManager

class MathsClass:
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    with MyManager() as manager:
        maths = manager.Maths()
        print(maths.add(4, 3))         # prints 7
        print(maths.mul(7, 8))         # prints 56

Использование удаленного менеджера

Можно запустить сервер менеджера на одной машине, а клиенты могут использовать его с других машин (при условии, что соответствующие брандмауэры позволяют это сделать).

Выполнение следующих команд создает сервер для одной общей очереди, к которой могут получить доступ удаленные клиенты:

>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

Один клиент может получить доступ к серверу следующим образом:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')

Другой клиент также может использовать его:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'

Локальные процессы также могут получить доступ к этой очереди, используя код выше на клиенте для удаленного доступа к ней:

>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
...     def __init__(self, q):
...         self.q = q
...         super().__init__()
...     def run(self):
...         self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

Прокси-объекты

Прокси - это объект, который ссылается на общий объект, который живет (предположительно) в другом процессе. Считается, что общий объект является референтом прокси. Несколько прокси-объектов могут иметь одного и того же референта.

Прокси-объект имеет методы, которые вызывают соответствующие методы своего референта (хотя не все методы референта обязательно будут доступны через прокси). Таким образом, прокси-объект можно использовать так же, как и его референт:

>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]

Обратите внимание, что применение str() к прокси вернет представление референта, тогда как применение repr() вернет представление прокси.

Важной особенностью прокси-объектов является то, что их можно передавать между процессами. Как таковой, референт может содержать Прокси-объекты. Это позволяет встраивать эти управляемые списки, таблицы и другие Прокси-объекты:

>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b)         # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']

Аналогично, прокси dict и list могут быть вложены друг в друга:

>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}

Если в референте содержатся стандартные (непрокси) объекты list или dict, модификации этих изменяемых значений не будут передаваться через менеджер, поскольку прокси не имеет возможности узнать, когда изменяются содержащиеся в нем значения. Однако, сохранение значения в контейнерном прокси (что вызывает __setitem__ на прокси-объекте) распространяется через менеджер, поэтому для эффективного изменения такого элемента можно переназначить измененное значение контейнерному прокси:

# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d

Этот подход, возможно, менее удобен, чем использование вложенных Прокси-объекты для большинства случаев использования, но также демонстрирует уровень контроля над синхронизацией.

Примечание

Прокси-типы в multiprocessing ничего не делают для поддержки сравнения по значению. Так, например, мы имеем:

>>> manager.list([1,2,3]) == [1,2,3]
False

При сравнении следует просто использовать копию референта.

class multiprocessing.managers.BaseProxy

Прокси-объекты являются экземплярами подклассов BaseProxy.

_callmethod(methodname[, args[, kwds]])

Вызвать и вернуть результат метода референта прокси.

Если proxy является прокси, референтом которого является obj, то выражение

proxy._callmethod(methodname, args, kwds)

оценит выражение

getattr(obj, methodname)(*args, **kwds)

в процессе работы менеджера.

Возвращаемое значение будет копией результата вызова или прокси на новый разделяемый объект - см. документацию для аргумента method_to_typeid в BaseManager.register().

Если при вызове возникло исключение, то оно повторно вызывается командой _callmethod(). Если в процессе менеджера возникает какое-то другое исключение, то оно преобразуется в исключение RemoteError и вызывается командой _callmethod().

Обратите внимание, в частности, что будет вызвано исключение, если имя метода не было раскрыто.

Пример использования _callmethod():

>>> l = manager.list(range(10))
>>> l._callmethod('__len__')
10
>>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7]
[2, 3, 4, 5, 6]
>>> l._callmethod('__getitem__', (20,))          # equivalent to l[20]
Traceback (most recent call last):
...
IndexError: list index out of range
_getvalue()

Возвращает копию референта.

Если референт является неприкасаемым, это вызовет исключение.

__repr__()

Возвращает представление объекта прокси.

__str__()

Возвращает представление референта.

Очистка

Прокси-объект использует обратный вызов weakref, чтобы при сборке мусора он отменил регистрацию от менеджера, которому принадлежит его референт.

Общий объект удаляется из процесса менеджера, когда больше нет прокси, ссылающихся на него.

Пулы процессов

Можно создать пул процессов, которые будут выполнять задания, переданные им с помощью класса Pool.

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

Объект пула процессов, который управляет пулом рабочих процессов, на которые могут быть отправлены задания. Он поддерживает асинхронные результаты с таймаутами и обратными вызовами и имеет параллельную реализацию карты.

processes - это количество рабочих процессов, которые необходимо использовать. Если processes равно None, то используется число, возвращаемое os.cpu_count().

Если initializer не None, то каждый рабочий процесс будет вызывать initializer(*initargs) при запуске.

maxtasksperchild - это количество задач, которые рабочий процесс может выполнить, прежде чем он завершится и будет заменен новым рабочим процессом, чтобы освободить неиспользуемые ресурсы. По умолчанию maxtasksperchild равно None, что означает, что рабочие процессы будут жить столько же, сколько и пул.

context может использоваться для указания контекста, используемого для запуска рабочих процессов. Обычно пул создается с помощью функции multiprocessing.Pool() или метода Pool() объекта контекста. В обоих случаях context устанавливается соответствующим образом.

Обратите внимание, что методы объекта пула должны вызываться только процессом, который создал пул.

Предупреждение

Объекты multiprocessing.pool имеют внутренние ресурсы, которыми необходимо правильно управлять (как и любым другим ресурсом), используя пул в качестве менеджера контекста или вызывая close() и terminate() вручную. Если этого не сделать, процесс может зависнуть при завершении.

Обратите внимание, что неправильно полагаться на сборщик мусора для уничтожения пула, поскольку CPython не гарантирует, что финализатор пула будет вызван (см. object.__del__() для получения дополнительной информации).

Добавлено в версии 3.2: maxtasksperchild

Добавлено в версии 3.4: контекст

Примечание

Рабочие процессы в пуле Pool обычно живут в течение всего времени очереди работ пула. Часто встречающийся в других системах (таких как Apache, mod_wsgi и т.д.) паттерн освобождения ресурсов, удерживаемых рабочими процессами, состоит в том, чтобы позволить рабочему процессу в пуле завершить только заданный объем работы перед выходом, очисткой и порождением нового процесса взамен старого. Аргумент maxtasksperchild в Pool открывает эту возможность конечному пользователю.

apply(func[, args[, kwds]])

Вызовите func с аргументами args и аргументами ключевых слов kwds. Она блокируется до тех пор, пока не будет готов результат. Учитывая эту блокировку, apply_async() лучше подходит для параллельного выполнения работы. Кроме того, func выполняется только в одном из рабочих пула.

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

Вариант метода apply(), который возвращает объект AsyncResult.

Если указан callback, то это должен быть вызываемый элемент, принимающий один аргумент. Когда результат становится готовым, к нему применяется callback, то есть если вызов не завершился неудачей, в этом случае вместо него применяется error_callback.

Если указан error_callback, то он должен быть вызываемой функцией, принимающей один аргумент. Если целевая функция не работает, то error_callback вызывается с экземпляром исключения.

Обратные вызовы должны завершаться немедленно, поскольку в противном случае поток, обрабатывающий результаты, будет заблокирован.

map(func, iterable[, chunksize])

Параллельный эквивалент встроенной функции map() (поддерживает только один аргумент iterable, для нескольких итераций см. starmap()). Она блокируется до тех пор, пока не будет готов результат.

Этот метод разбивает итерабельную таблицу на несколько фрагментов, которые он передает в пул процессов как отдельные задачи. Размер (приблизительный) этих фрагментов можно задать, установив chunksize в целое положительное число.

Обратите внимание, что это может привести к большим затратам памяти для очень длинных итераций. Рассмотрите возможность использования imap() или imap_unordered() с явным параметром chunksize для большей эффективности.

map_async(func, iterable[, chunksize[, callback[, error_callback]]])

Вариант метода map(), который возвращает объект AsyncResult.

Если указан callback, то это должен быть вызываемый элемент, принимающий один аргумент. Когда результат становится готовым, к нему применяется callback, то есть если вызов не завершился неудачей, в этом случае вместо него применяется error_callback.

Если указан error_callback, то он должен быть вызываемой функцией, принимающей один аргумент. Если целевая функция не работает, то error_callback вызывается с экземпляром исключения.

Обратные вызовы должны завершаться немедленно, поскольку в противном случае поток, обрабатывающий результаты, будет заблокирован.

imap(func, iterable[, chunksize])

Более ленивая версия map().

Аргумент chunksize такой же, как и аргумент, используемый методом map(). Для очень длинных итераций использование большого значения chunksize может ускорить выполнение задания на много, чем использование значения по умолчанию 1.

Также если chunksize равен 1, то метод next() итератора, возвращаемого методом imap(), имеет необязательный параметр timeout: next(timeout) вызовет ошибку multiprocessing.TimeoutError, если результат не может быть возвращен в течение timeout секунд.

imap_unordered(func, iterable[, chunksize])

То же самое, что и imap(), за исключением того, что порядок следования результатов из возвращаемого итератора следует считать произвольным. (Только при наличии только одного рабочего процесса порядок гарантированно «правильный»).

starmap(func, iterable[, chunksize])

Подобно map(), за исключением того, что элементы iterable, как ожидается, будут итерациями, которые распаковываются как аргументы.

Следовательно, итерабельность из [(1,2), (3, 4)] приводит к [func(1,2), func(3,4)].

Добавлено в версии 3.3.

starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])

Комбинация starmap() и map_async(), которая выполняет итерацию над iterable итераций и вызывает func с распакованными итерациями. Возвращает объект результата.

Добавлено в версии 3.3.

close()

Предотвращает дальнейшую передачу заданий в пул. После выполнения всех заданий рабочие процессы завершатся.

terminate()

Немедленно останавливает рабочие процессы, не завершая оставшуюся работу. Когда объект пула будет собран terminate() будет немедленно вызван.

join()

Дождитесь завершения рабочих процессов. Перед использованием close() необходимо вызвать terminate() или join().

Добавлено в версии 3.3: Объекты пула теперь поддерживают протокол управления контекстом - см. Типы контекстного менеджера. __enter__() возвращает объект пула, а __exit__() вызывает terminate().

class multiprocessing.pool.AsyncResult

Класс результата, возвращаемого Pool.apply_async() и Pool.map_async().

get([timeout])

Верните результат, когда он будет получен. Если timeout не равен None и результат не приходит в течение timeout секунд, то выдается сообщение multiprocessing.TimeoutError. Если удаленный вызов вызвал исключение, то это исключение будет повторно вызвано командой get().

wait([timeout])

Подождите, пока результат будет доступен или пока не пройдет timeout секунд.

ready()

Возвращает, завершился ли вызов.

successful()

Возвращает, завершился ли вызов без возникновения исключения. Вызовет ValueError, если результат не готов.

Изменено в версии 3.7: Если результат не готов, вместо ValueError выдается AssertionError.

Следующий пример демонстрирует использование пула:

from multiprocessing import Pool
import time

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(processes=4) as pool:         # start 4 worker processes
        result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
        print(result.get(timeout=1))        # prints "100" unless your computer is *very* slow

        print(pool.map(f, range(10)))       # prints "[0, 1, 4,..., 81]"

        it = pool.imap(f, range(10))
        print(next(it))                     # prints "0"
        print(next(it))                     # prints "1"
        print(it.next(timeout=1))           # prints "4" unless your computer is *very* slow

        result = pool.apply_async(time.sleep, (10,))
        print(result.get(timeout=1))        # raises multiprocessing.TimeoutError

Слушатели и клиенты

Обычно передача сообщений между процессами осуществляется с помощью очередей или с помощью объектов Connection, возвращаемых Pipe().

Однако модуль multiprocessing.connection обеспечивает дополнительную гибкость. В основном он предоставляет высокоуровневый API, ориентированный на сообщения, для работы с сокетами или именованными трубами Windows. Он также поддерживает digest-аутентификацию с помощью модуля hmac и опрос нескольких соединений одновременно.

multiprocessing.connection.deliver_challenge(connection, authkey)

Отправьте случайно сгенерированное сообщение на другой конец соединения и дождитесь ответа.

Если ответ соответствует дайджесту сообщения с использованием authkey в качестве ключа, то на другой конец соединения отправляется приветственное сообщение. В противном случае выдается сообщение AuthenticationError.

multiprocessing.connection.answer_challenge(connection, authkey)

Получение сообщения, вычисление дайджеста сообщения с использованием authkey в качестве ключа, а затем отправка дайджеста обратно.

Если приветственное сообщение не получено, то выдается сообщение AuthenticationError.

multiprocessing.connection.Client(address[, family[, authkey]])

Попытка установить соединение со слушателем, который использует адрес address, возвращая Connection.

Тип соединения определяется аргументом family, но обычно его можно опустить, так как он может быть выведен из формата address. (См. Форматы адресов)

Если указан authkey, а не None, он должен быть байтовой строкой и будет использоваться в качестве секретного ключа для вызова аутентификации на основе HMAC. Аутентификация не выполняется, если authkey равен None. При неудачной аутентификации выдается сообщение AuthenticationError. См. Ключи аутентификации.

class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])

Обертка для связанного сокета или именованной трубы Windows, которая «слушает» соединения.

address - это адрес, который будет использоваться связанным сокетом или именованной трубой объекта слушателя.

Примечание

Если используется адрес „0.0.0.0“, этот адрес не будет подключаемой конечной точкой в Windows. Если вам требуется подключаемая конечная точка, следует использовать адрес „127.0.0.0.1“.

family - это тип используемого сокета (или именованной трубы). Это может быть одна из строк 'AF_INET' (для сокета TCP), 'AF_UNIX' (для сокета домена Unix) или 'AF_PIPE' (для именованной трубы Windows). Из них только первый гарантированно доступен. Если семейство равно None, то семейство определяется по формату адрес. Если address также None, то выбирается значение по умолчанию. Это семейство по умолчанию считается самым быстрым из доступных. См. Форматы адресов. Обратите внимание, что если семейство равно 'AF_UNIX' и адрес равен None, то сокет будет создан в частном временном каталоге, созданном с помощью tempfile.mkstemp().

Если объект слушателя использует сокет, то backlog (по умолчанию 1) передается в метод listen() сокета после его привязки.

Если указан authkey, а не None, он должен быть байтовой строкой и будет использоваться в качестве секретного ключа для вызова аутентификации на основе HMAC. Аутентификация не выполняется, если authkey равен None. При неудачной аутентификации выдается сообщение AuthenticationError. См. Ключи аутентификации.

accept()

Принимает соединение на связанном сокете или именованной трубе объекта слушателя и возвращает объект Connection. Если попытка аутентификации была предпринята и не удалась, то выдается сообщение AuthenticationError.

close()

Закрытие связанного сокета или именованной трубы объекта слушателя. Этот вызов происходит автоматически при сборке мусора слушателя. Однако рекомендуется вызывать его явно.

Объекты слушателей имеют следующие свойства, доступные только для чтения:

address

Адрес, который используется объектом Listener.

last_accepted

Адрес, с которого пришло последнее принятое соединение. Если он недоступен, то это None.

Добавлено в версии 3.3: Объекты слушателей теперь поддерживают протокол управления контекстом - см. Типы контекстного менеджера. __enter__() возвращает объект слушателя, а __exit__() вызывает close().

multiprocessing.connection.wait(object_list, timeout=None)

Подождите, пока объект из object_list будет готов. Возвращает список тех объектов в object_list, которые готовы. Если timeout имеет значение float, то вызов блокируется не более чем на столько секунд. Если timeout равен None, то вызов будет блокироваться неограниченное время. Отрицательный таймаут эквивалентен нулевому таймауту.

Как для Unix, так и для Windows, объект может появиться в object_list, если он является

Соединение или объект сокета готов, когда есть данные, доступные для чтения из него, или другой конец закрыт.

Unix: wait(object_list, timeout) почти эквивалентен select.select(object_list, [], [], timeout). Разница в том, что если select.select() прерван сигналом, он может поднять OSError с номером ошибки EINTR, тогда как wait() этого не сделает.

Windows: Элемент в object_list должен быть либо целочисленным хэндлом, который является ожидаемым (в соответствии с определением, используемым в документации Win32 функции WaitForMultipleObjects()), либо объектом с методом fileno(), который возвращает хэндл сокета или хэндл трубы. (Обратите внимание, что дескрипторы труб и дескрипторы сокетов не являются **** ожидающими дескрипторами).

Добавлено в версии 3.3.

Примеры.

Следующий код сервера создает слушателя, который использует 'secret password' в качестве ключа аутентификации. Затем он ожидает соединения и отправляет некоторые данные клиенту:

from multiprocessing.connection import Listener
from array import array

address = ('localhost', 6000)     # family is deduced to be 'AF_INET'

with Listener(address, authkey=b'secret password') as listener:
    with listener.accept() as conn:
        print('connection accepted from', listener.last_accepted)

        conn.send([2.25, None, 'junk', float])

        conn.send_bytes(b'hello')

        conn.send_bytes(array('i', [42, 1729]))

Следующий код подключается к серверу и получает от него некоторые данные:

from multiprocessing.connection import Client
from array import array

address = ('localhost', 6000)

with Client(address, authkey=b'secret password') as conn:
    print(conn.recv())                  # => [2.25, None, 'junk', float]

    print(conn.recv_bytes())            # => 'hello'

    arr = array('i', [0, 0, 0, 0, 0])
    print(conn.recv_bytes_into(arr))    # => 8
    print(arr)                          # => array('i', [42, 1729, 0, 0, 0])

Следующий код использует wait() для ожидания сообщений от нескольких процессов одновременно:

import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait

def foo(w):
    for i in range(10):
        w.send((i, current_process().name))
    w.close()

if __name__ == '__main__':
    readers = []

    for i in range(4):
        r, w = Pipe(duplex=False)
        readers.append(r)
        p = Process(target=foo, args=(w,))
        p.start()
        # We close the writable end of the pipe now to be sure that
        # p is the only process which owns a handle for it.  This
        # ensures that when p closes its handle for the writable end,
        # wait() will promptly report the readable end as being ready.
        w.close()

    while readers:
        for r in wait(readers):
            try:
                msg = r.recv()
            except EOFError:
                readers.remove(r)
            else:
                print(msg)

Форматы адресов

  • Адрес 'AF_INET' представляет собой кортеж вида (hostname, port), где hostname - строка, а port - целое число.

  • Адрес 'AF_UNIX' - это строка, представляющая имя файла в файловой системе.

  • Адрес 'AF_PIPE' - это строка вида r'\.\pipe{PipeName}'. Чтобы использовать Client() для подключения к именованной трубе на удаленном компьютере под названием ServerName, необходимо использовать адрес вида r'\ServerName\pipe{PipeName}'.

Обратите внимание, что любая строка, начинающаяся с двух обратных косых черт, по умолчанию считается адресом 'AF_PIPE', а не адресом 'AF_UNIX'.

Ключи аутентификации

Когда вы используете Connection.recv, полученные данные автоматически распаковываются. К сожалению, распаковка данных из ненадежного источника представляет собой риск для безопасности. Поэтому Listener и Client() используют модуль hmac для обеспечения аутентификации дайджеста.

Ключ аутентификации - это байтовая строка, которую можно рассматривать как пароль: после установления соединения оба конца будут требовать доказательств того, что другой знает ключ аутентификации. (Демонстрация того, что оба конца используют один и тот же ключ, не включает передачу ключа по соединению).

Если аутентификация запрошена, но ключ аутентификации не указан, то используется возвращаемое значение current_process().authkey (см. Process). Это значение будет автоматически наследоваться любым объектом Process, который создаст текущий процесс. Это означает, что (по умолчанию) все процессы многопроцессной программы будут иметь один ключ аутентификации, который можно использовать при установке соединений между собой.

Подходящие ключи аутентификации также могут быть сгенерированы с помощью os.urandom().

Ведение журнала

Имеется некоторая поддержка протоколирования. Обратите внимание, однако, что пакет logging не использует общие блокировки процессов, поэтому возможно (в зависимости от типа обработчика) смешивание сообщений от разных процессов.

multiprocessing.get_logger()

Возвращает регистратор, используемый multiprocessing. При необходимости будет создан новый.

При первом создании регистратор имеет уровень logging.NOTSET и не имеет обработчика по умолчанию. Сообщения, отправленные в этот регистратор, по умолчанию не будут передаваться в корневой регистратор.

Обратите внимание, что в Windows дочерние процессы наследуют только уровень регистратора родительского процесса - любая другая настройка регистратора не наследуется.

multiprocessing.log_to_stderr(level=None)

Эта функция выполняет вызов get_logger(), но в дополнение к возврату логгера, созданного get_logger, она добавляет обработчик, который отправляет вывод на sys.stderr, используя формат '[%(levelname)s/%(processName)s] %(message)s'. Вы можете изменить levelname логгера, передав аргумент level.

Ниже приведен пример сеанса с включенным протоколированием:

>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0

Полную таблицу уровней протоколирования см. в модуле logging.

Модуль multiprocessing.dummy

multiprocessing.dummy повторяет API multiprocessing, но является не более чем оберткой вокруг модуля threading.

В частности, функция Pool, предоставляемая multiprocessing.dummy, возвращает экземпляр ThreadPool, который является подклассом Pool, поддерживающим все те же вызовы методов, но использующим пул рабочих потоков, а не рабочих процессов.

class multiprocessing.pool.ThreadPool([processes[, initializer[, initargs]]])

Объект пула потоков, который управляет пулом рабочих потоков, на которые могут быть отправлены задания. Экземпляры ThreadPool полностью совместимы по интерфейсу с экземплярами Pool, и их ресурсами также необходимо правильно управлять, либо используя пул в качестве менеджера контекста, либо вызывая close() и terminate() вручную.

processes - это количество рабочих потоков, которые необходимо использовать. Если processes равно None, то используется число, возвращаемое os.cpu_count().

Если initializer не None, то каждый рабочий процесс будет вызывать initializer(*initargs) при запуске.

В отличие от Pool, maxtasksperchild и context не могут быть предоставлены.

Примечание

ThreadPool имеет тот же интерфейс, что и Pool, который разработан вокруг пула процессов и предшествует появлению модуля concurrent.futures. Как таковой, он наследует некоторые операции, которые не имеют смысла для пула, поддерживаемого потоками, и имеет свой собственный тип для представления статуса асинхронных заданий, AsyncResult, который не понимается другими библиотеками.

Пользователи обычно предпочитают использовать concurrent.futures.ThreadPoolExecutor, который имеет более простой интерфейс, который с самого начала был разработан с учетом потоков, и который возвращает экземпляры concurrent.futures.Future, совместимые со многими другими библиотеками, включая asyncio.

Руководство по программированию

Существуют определенные рекомендации и идиомы, которых следует придерживаться при использовании multiprocessing.

Все методы запуска

Следующее относится ко всем методам запуска.

Избегайте общего состояния

По возможности нужно стараться избегать перемещения больших объемов данных между процессами.

Вероятно, лучше придерживаться использования очередей или труб для связи между процессами, а не использовать примитивы синхронизации нижнего уровня.

Маринуемость

Убедитесь, что аргументы методов прокси являются picklable.

Безопасность прокси-серверов

Не используйте прокси-объект из более чем одного потока, если вы не защитили его блокировкой.

(Никогда не возникает проблем с использованием разными процессами одного и того же прокси).

Объединение зомби-процессов

В Unix, когда процесс завершается, но не был присоединен, он становится зомби. Их никогда не должно быть очень много, потому что каждый раз, когда начинается новый процесс (или вызывается active_children()), все завершенные процессы, которые еще не были присоединены, будут присоединены. Также вызов Process.is_alive завершенного процесса приведет к его присоединению. Даже в этом случае, вероятно, хорошей практикой является явное присоединение всех процессов, которые вы запускаете.

Лучше наследовать, чем отбирать/не отбирать

При использовании методов запуска spawn или forkserver многие типы из multiprocessing должны быть picklable, чтобы дочерние процессы могли их использовать. Однако, как правило, следует избегать передачи разделяемых объектов другим процессам с помощью труб или очередей. Вместо этого следует организовать программу так, чтобы процесс, которому нужен доступ к разделяемому ресурсу, созданному в другом месте, мог унаследовать его от процесса-предка.

Избегайте завершения процессов

Использование метода Process.terminate для остановки процесса может привести к тому, что любые общие ресурсы (такие как блокировки, семафоры, каналы и очереди), используемые процессом в данный момент, станут сломанными или недоступными для других процессов.

Поэтому, вероятно, лучше использовать Process.terminate только для процессов, которые никогда не используют общие ресурсы.

Присоединение к процессам, использующим очереди

Имейте в виду, что процесс, поместивший элементы в очередь, будет ждать перед завершением, пока все буферизованные элементы не будут переданы потоком «фидера» в базовую трубу. (Дочерний процесс может вызвать метод Queue.cancel_join_thread очереди, чтобы избежать такого поведения).

Это означает, что при использовании очереди необходимо убедиться, что все элементы, которые были поставлены в очередь, в конечном итоге будут удалены до того, как процесс будет завершен. В противном случае вы не можете быть уверены, что процессы, поставившие элементы в очередь, завершатся. Помните также, что недемонические процессы будут присоединяться автоматически.

Пример, который приведет к тупику, следующий:

from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()

Исправлением здесь может быть замена двух последних строк местами (или просто удаление строки p.join()).

Явная передача ресурсов дочерним процессам

В Unix при использовании метода запуска fork дочерний процесс может использовать общий ресурс, созданный в родительском процессе с помощью глобального ресурса. Однако лучше передать объект в качестве аргумента в конструктор дочернего процесса.

Помимо того, что код (потенциально) совместим с Windows и другими методами запуска, это также гарантирует, что пока дочерний процесс жив, объект не будет собран в родительском процессе. Это может быть важно, если какой-то ресурс освобождается, когда объект собирается в родительском процессе.

Так, например,

from multiprocessing import Process, Lock

def f():
    ... do something using "lock" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f).start()

следует переписать как

from multiprocessing import Process, Lock

def f(l):
    ... do something using "l" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f, args=(lock,)).start()

Остерегайтесь замены sys.stdin на «файлоподобный объект»

multiprocessing первоначально безоговорочно назывался:

os.close(sys.stdin.fileno())

в методе multiprocessing.Process._bootstrap() — это приводило к проблемам с процессами в процессах. Это было изменено на:

sys.stdin.close()
sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)

Это решает фундаментальную проблему столкновения процессов друг с другом, что приводит к ошибке плохого дескриптора файла, но создает потенциальную опасность для приложений, которые заменяют sys.stdin() на «файлоподобный объект» с буферизацией вывода. Эта опасность заключается в том, что если несколько процессов вызовут close() на этом файлоподобном объекте, это может привести к тому, что одни и те же данные будут выгружены в объект несколько раз, что приведет к повреждению.

Если вы напишете файлоподобный объект и реализуете собственное кэширование, вы можете сделать его fork-safe, сохраняя pid при каждом добавлении в кэш и отбрасывая кэш при изменении pid. Например:

@property
def cache(self):
    pid = os.getpid()
    if pid != self._pid:
        self._pid = pid
        self._cache = []
    return self._cache

Для получения дополнительной информации смотрите bpo-5155, bpo-5313 и bpo-5331.

Методы запуска spawn и forkserver

Есть несколько дополнительных ограничений, которые не применяются к методу запуска fork.

Большая маринованность

Убедитесь, что все аргументы метода Process.__init__() являются picklable. Также, если вы подкласс Process, то убедитесь, что экземпляры будут picklable при вызове метода Process.start.

Глобальные переменные

Следует помнить, что если код, запущенный в дочернем процессе, попытается получить доступ к глобальной переменной, то значение, которое он увидит (если оно есть), может не совпадать со значением в родительском процессе в момент вызова Process.start.

Однако глобальные переменные, которые являются просто константами уровня модуля, не вызывают проблем.

Безопасный импорт главного модуля

Убедитесь, что главный модуль может быть безопасно импортирован новым интерпретатором Python без возникновения непреднамеренных побочных эффектов (например, запуска нового процесса).

Например, при использовании метода запуска spawn или forkserver запуск следующего модуля завершится ошибкой RuntimeError:

from multiprocessing import Process

def foo():
    print('hello')

p = Process(target=foo)
p.start()

Вместо этого следует защитить «точку входа» программы, используя if __name__ == '__main__': следующим образом:

from multiprocessing import Process, freeze_support, set_start_method

def foo():
    print('hello')

if __name__ == '__main__':
    freeze_support()
    set_start_method('spawn')
    p = Process(target=foo)
    p.start()

(Строку freeze_support() можно опустить, если программа будет выполняться нормально, а не заморожена).

Это позволяет вновь порожденному интерпретатору Python безопасно импортировать модуль и затем выполнить функцию модуля foo().

Аналогичные ограничения действуют, если пул или менеджер создается в главном модуле.

Примеры

Демонстрация того, как создавать и использовать настраиваемые менеджеры и прокси:

from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator

##

class Foo:
    def f(self):
        print('you called Foo.f()')
    def g(self):
        print('you called Foo.g()')
    def _h(self):
        print('you called Foo._h()')

# A simple generator function
def baz():
    for i in range(10):
        yield i*i

# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
    _exposed_ = ['__next__']
    def __iter__(self):
        return self
    def __next__(self):
        return self._callmethod('__next__')

# Function to return the operator module
def get_operator_module():
    return operator

##

class MyManager(BaseManager):
    pass

# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)

# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))

# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)

# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)

##

def test():
    manager = MyManager()
    manager.start()

    print('-' * 20)

    f1 = manager.Foo1()
    f1.f()
    f1.g()
    assert not hasattr(f1, '_h')
    assert sorted(f1._exposed_) == sorted(['f', 'g'])

    print('-' * 20)

    f2 = manager.Foo2()
    f2.g()
    f2._h()
    assert not hasattr(f2, 'f')
    assert sorted(f2._exposed_) == sorted(['g', '_h'])

    print('-' * 20)

    it = manager.baz()
    for i in it:
        print('<%d>' % i, end=' ')
    print()

    print('-' * 20)

    op = manager.operator()
    print('op.add(23, 45) =', op.add(23, 45))
    print('op.pow(2, 94) =', op.pow(2, 94))
    print('op._exposed_ =', op._exposed_)

##

if __name__ == '__main__':
    freeze_support()
    test()

Использование Pool:

import multiprocessing
import time
import random
import sys

#
# Functions used by test code
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % (
        multiprocessing.current_process().name,
        func.__name__, args, result
        )

def calculatestar(args):
    return calculate(*args)

def mul(a, b):
    time.sleep(0.5 * random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5 * random.random())
    return a + b

def f(x):
    return 1.0 / (x - 5.0)

def pow3(x):
    return x ** 3

def noop(x):
    pass

#
# Test code
#

def test():
    PROCESSES = 4
    print('Creating pool with %d processes\n' % PROCESSES)

    with multiprocessing.Pool(PROCESSES) as pool:
        #
        # Tests
        #

        TASKS = [(mul, (i, 7)) for i in range(10)] + \
                [(plus, (i, 8)) for i in range(10)]

        results = [pool.apply_async(calculate, t) for t in TASKS]
        imap_it = pool.imap(calculatestar, TASKS)
        imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)

        print('Ordered results using pool.apply_async():')
        for r in results:
            print('\t', r.get())
        print()

        print('Ordered results using pool.imap():')
        for x in imap_it:
            print('\t', x)
        print()

        print('Unordered results using pool.imap_unordered():')
        for x in imap_unordered_it:
            print('\t', x)
        print()

        print('Ordered results using pool.map() --- will block till complete:')
        for x in pool.map(calculatestar, TASKS):
            print('\t', x)
        print()

        #
        # Test error handling
        #

        print('Testing error handling:')

        try:
            print(pool.apply(f, (5,)))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.apply()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(pool.map(f, list(range(10))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.map()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(list(pool.imap(f, list(range(10)))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from list(pool.imap())')
        else:
            raise AssertionError('expected ZeroDivisionError')

        it = pool.imap(f, list(range(10)))
        for i in range(10):
            try:
                x = next(it)
            except ZeroDivisionError:
                if i == 5:
                    pass
            except StopIteration:
                break
            else:
                if i == 5:
                    raise AssertionError('expected ZeroDivisionError')

        assert i == 9
        print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
        print()

        #
        # Testing timeouts
        #

        print('Testing ApplyResult.get() with timeout:', end=' ')
        res = pool.apply_async(calculate, TASKS[0])
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % res.get(0.02))
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()

        print('Testing IMapIterator.next() with timeout:', end=' ')
        it = pool.imap(calculatestar, TASKS)
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % it.next(0.02))
            except StopIteration:
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()


if __name__ == '__main__':
    multiprocessing.freeze_support()
    test()

Пример, показывающий, как использовать очереди для подачи заданий на набор рабочих процессов и сбора результатов:

import time
import random

from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

#
# Function used to calculate result
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)

#
# Functions referenced by tasks
#

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

#
#
#

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # Get and print results
    print('Unordered results:')
    for i in range(len(TASKS1)):
        print('\t', done_queue.get())

    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)

    # Get and print some more results
    for i in range(len(TASKS2)):
        print('\t', done_queue.get())

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')


if __name__ == '__main__':
    freeze_support()
    test()
Вернуться на верх