multiprocessing — Параллелизм на основе процессов

Исходный код: Lib/multiprocessing/


Availability: это не Emscripten, это был не я.

Этот модуль не работает или недоступен на платформах WebAssembly wasm32-emscripten и wasm32-wasi. Дополнительную информацию смотрите в разделе Платформы веб-сборки.

Вступление

multiprocessing - это пакет, который поддерживает процессы запуска с использованием API, аналогичного модулю threading. Пакет multiprocessing обеспечивает как локальный, так и удаленный параллелизм, эффективно обходя Global Interpreter Lock за счет использования подпроцессов вместо потоков. Благодаря этому модуль multiprocessing позволяет программисту в полной мере использовать несколько процессоров на данной машине. Он работает как в Unix, так и в Windows.

В модуле multiprocessing также представлены API, которые не имеют аналогов в модуле threading. Ярким примером этого является объект Pool, который предлагает удобное средство распараллеливания выполнения функции с несколькими входными значениями, распределяя входные данные по процессам (параллелизм данных). Следующий пример демонстрирует общепринятую практику определения таких функций в модуле, чтобы дочерние процессы могли успешно импортировать этот модуль. Это базовый пример параллелизма данных с использованием Pool,

from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
        print(p.map(f, [1, 2, 3]))

будет напечатан в стандартном формате вывода

[1, 4, 9]

См.также

concurrent.futures.ProcessPoolExecutor предлагает интерфейс более высокого уровня для передачи задач фоновому процессу, не блокируя выполнение вызывающего процесса. По сравнению с непосредственным использованием интерфейса Pool, API concurrent.futures позволяет отделить отправку работы в базовый пул процессов от ожидания результатов.

Класс Process

В multiprocessing процессы запускаются путем создания объекта Process и последующего вызова его метода start(). Process следует API threading.Thread. Тривиальным примером многопроцессорной программы является

from multiprocessing import Process

def f(name):
    print('hello', name)

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

Чтобы показать используемые идентификаторы отдельных процессов, приведем расширенный пример:

from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

Объяснение того, почему необходима часть if __name__ == '__main__', смотрите в разделе Руководящие принципы программирования.

Контексты и методы запуска

В зависимости от платформы, multiprocessing поддерживает три способа запуска процесса. Этими методами запуска являются

порождать

Родительский процесс запускает новый процесс интерпретатора Python. Дочерний процесс унаследует только те ресурсы, которые необходимы для запуска метода объекта process run(). В частности, ненужные файловые дескрипторы и дескрипторы дескрипторов из родительского процесса унаследованы не будут. Запуск процесса с использованием этого метода происходит довольно медленно по сравнению с использованием fork или forkserver.

Доступно в Unix и Windows. Используется по умолчанию в Windows и Mac OS.

вилка

Родительский процесс использует os.fork() для разветвления интерпретатора Python. Дочерний процесс, когда он запускается, фактически идентичен родительскому процессу. Все ресурсы родительского процесса наследуются дочерним процессом. Обратите внимание, что безопасное разветвление многопоточного процесса проблематично.

Доступно только в Unix. Используется в Unix по умолчанию.

сервер форков

Когда программа запускается и выбирает метод forkserver start, запускается серверный процесс. С этого момента всякий раз, когда требуется новый процесс, родительский процесс подключается к серверу и запрашивает, чтобы он запустил новый процесс. Серверный процесс fork является однопоточным, поэтому для него безопасно использовать os.fork(). Никакие ненужные ресурсы не наследуются.

Доступен на платформах Unix, которые поддерживают передачу файловых дескрипторов по каналам Unix.

Изменено в версии 3.4: Функция spawn добавлена на все платформы Unix, а функция forkserver - на некоторые платформы Unix. Дочерние процессы больше не наследуют все родительские дескрипторы, наследуемые в Windows.

Изменено в версии 3.8: В macOS метод spawn start теперь используется по умолчанию. Метод fork start следует считать небезопасным, поскольку он может привести к сбоям в работе подпроцесса, поскольку системные библиотеки mac OS могут запускать потоки. Смотрите bpo-33725.

В POSIX использование методов запуска spawn или forkserver также запустит процесс отслеживания ресурсов, который отслеживает несвязанные именованные системные ресурсы (такие как именованные семафоры или объекты SharedMemory), созданные процессами программы. Когда все процессы завершат работу, система отслеживания ресурсов отключит все оставшиеся отслеживаемые объекты. Обычно их не должно быть, но если процесс был остановлен из-за сигнала, может произойти «утечка» ресурсов. (Ни просочившиеся семафоры, ни сегменты общей памяти не будут автоматически отсоединены до следующей перезагрузки. Это проблематично для обоих объектов, поскольку система допускает только ограниченное количество именованных семафоров, а сегменты общей памяти занимают некоторое пространство в основной памяти.)

Чтобы выбрать метод запуска, вы используете set_start_method() в предложении if __name__ == '__main__' основного модуля. Например:

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

set_start_method() не следует использовать в программе более одного раза.

В качестве альтернативы вы можете использовать get_context() для получения контекстного объекта. Контекстные объекты имеют тот же API, что и модуль многопроцессорной обработки, и позволяют использовать несколько методов запуска в одной программе.

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

Обратите внимание, что объекты, относящиеся к одному контексту, могут быть несовместимы с процессами из другого контекста. В частности, блокировки, созданные с использованием контекста fork, не могут быть переданы процессам, запущенным с использованием методов spawn или forkserver start.

Библиотеке, которая хочет использовать определенный метод запуска, вероятно, следует использовать get_context(), чтобы не вмешиваться в выбор пользователя библиотеки.

Предупреждение

Методы 'spawn' и 'forkserver' start в настоящее время нельзя использовать с «замороженными» исполняемыми файлами (т.е. двоичными файлами, созданными такими пакетами, как PyInstaller и cx_Freeze) в Unix. Метод 'fork' start действительно работает.

Обмен объектами между процессами

multiprocessing поддерживает два типа каналов связи между процессами:

Очереди

Класс Queue является близким клоном класса queue.Queue. Например:

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

Очереди безопасны для потоков и процессов.

Трубы

Функция Pipe() возвращает пару объектов connection, соединенных каналом, который по умолчанию является дуплексным (двусторонним). Например:

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

Два объекта connection, возвращаемые с помощью Pipe(), представляют собой два конца канала. Каждый объект connection имеет методы send() и recv() (среди прочих). Обратите внимание, что данные в канале могут быть повреждены, если два процесса (или потока) одновременно попытаются выполнить чтение с одного и того же конца канала или запись в него. Конечно, процессы, использующие разные концы канала одновременно, не подвержены риску повреждения.

Синхронизация между процессами

multiprocessing содержит эквиваленты всех примитивов синхронизации из threading. Например, можно использовать блокировку, чтобы гарантировать, что только один процесс одновременно выводит стандартный вывод:

from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        Process(target=f, args=(lock, num)).start()

Без использования блокировки выходные данные из разных процессов могут перепутаться.

Совместное использование состояния между процессами

Как упоминалось выше, при параллельном программировании обычно лучше по возможности избегать использования общего состояния. Это особенно актуально при использовании нескольких процессов.

Однако, если вам действительно нужно использовать некоторые общие данные, то multiprocessing предоставляет несколько способов сделать это.

Общая память

Данные могут быть сохранены в общей карте памяти с помощью Value или Array. Например, следующий код

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

будет печатать

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

Аргументы 'd' и 'i', используемые при создании num и arr, являются кодами типов, которые используются модулем array: 'd' указывает число с плавающей точкой двойной точности и 'i' обозначают целое число со знаком. Эти общие объекты будут обработаны и потокобезопасны.

Для большей гибкости в использовании общей памяти можно использовать модуль multiprocessing.sharedctypes, который поддерживает создание объектов произвольных типов, выделенных из общей памяти.

Серверный процесс

Объект manager, возвращаемый Manager(), управляет серверным процессом, который содержит объекты Python и позволяет другим процессам манипулировать ими с помощью прокси-серверов.

Менеджер, возвращаемый с помощью Manager(), будет поддерживать типы list, dict, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value и Array. Например,

from multiprocessing import Process, Manager

def f(d, l):
    d[1] = '1'
    d['2'] = 2
    d[0.25] = None
    l.reverse()

if __name__ == '__main__':
    with Manager() as manager:
        d = manager.dict()
        l = manager.list(range(10))

        p = Process(target=f, args=(d, l))
        p.start()
        p.join()

        print(d)
        print(l)

будет печатать

{0.25: None, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

Серверные менеджеры процессов более гибки, чем при использовании объектов с общей памятью, поскольку они могут поддерживать произвольные типы объектов. Кроме того, процессы на разных компьютерах могут совместно использовать один и тот же менеджер по сети. Однако они работают медленнее, чем при использовании общей памяти.

Использование пула работников

Класс Pool представляет собой пул рабочих процессов. В нем есть методы, которые позволяют передавать задачи рабочим процессам несколькими различными способами.

Например:

from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    # start 4 worker processes
    with Pool(processes=4) as pool:

        # print "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))

        # print same numbers in arbitrary order
        for i in pool.imap_unordered(f, range(10)):
            print(i)

        # evaluate "f(20)" asynchronously
        res = pool.apply_async(f, (20,))      # runs in *only* one process
        print(res.get(timeout=1))             # prints "400"

        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid, ()) # runs in *only* one process
        print(res.get(timeout=1))             # prints the PID of that process

        # launching multiple evaluations asynchronously *may* use more processes
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        # make a single worker sleep for 10 seconds
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")

        print("For the moment, the pool remains available for more work")

    # exiting the 'with'-block has stopped the pool
    print("Now the pool is closed and no longer available")

Обратите внимание, что методы пула должны использоваться только тем процессом, который его создал.

Примечание

Функциональность этого пакета требует, чтобы дочерние элементы могли импортировать модуль __main__. Это описано в Руководящие принципы программирования, однако здесь стоит обратить на это внимание. Это означает, что некоторые примеры, такие как multiprocessing.pool.Pool, не будут работать в интерактивном интерпретаторе. Например:

>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
...     return x*x
...
>>> with p:
...     p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
AttributeError: Can't get attribute 'f' on <module '__main__' (built-in)>
AttributeError: Can't get attribute 'f' on <module '__main__' (built-in)>
AttributeError: Can't get attribute 'f' on <module '__main__' (built-in)>

(Если вы попробуете это, это фактически выведет три полных трассировки, чередующихся полуслучайным образом, и тогда вам, возможно, придется каким-то образом остановить родительский процесс.)

Ссылка

Пакет multiprocessing в основном копирует API модуля threading.

Process и исключения

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

Объекты Process представляют собой действие, выполняемое в отдельном процессе. Класс Process содержит эквиваленты всех методов threading.Thread.

Конструктор всегда должен вызываться с аргументами ключевого слова. group всегда должен быть None; он существует исключительно для совместимости с threading.Thread. target - это вызываемый объект, который должен быть вызван методом run(). По умолчанию используется значение None, что означает, что ничего не вызывается. name - это имя процесса (более подробную информацию смотрите в name). args - это кортеж аргументов для целевого вызова. kwargs - это словарь аргументов по ключевым словам для целевого вызова. Если он указан, аргумент daemon, содержащий только ключевое слово, устанавливает для параметра process daemon значение True или False. Если None (значение по умолчанию), то этот флаг будет унаследован от процесса создания.

По умолчанию в target аргументы не передаются. Аргумент args, значение которого по умолчанию равно (), можно использовать для указания списка или набора аргументов, которые будут переданы в target.

Если подкласс переопределяет конструктор, он должен убедиться, что вызывает конструктор базового класса (Process.__init__()), прежде чем выполнять какие-либо другие действия с процессом.

Изменено в версии 3.3: Добавлен параметр daemon.

run()

Метод, представляющий активность процесса.

Вы можете переопределить этот метод в подклассе. Стандартный метод run() вызывает вызываемый объект, передаваемый конструктору объекта в качестве целевого аргумента, если таковой имеется, с аргументами sequential и keyword, взятыми из аргументов args и kwargs соответственно.

Используя список или кортеж в качестве аргумента args, передаваемого в Process, достигается тот же эффект.

Пример:

>>> from multiprocessing import Process
>>> p = Process(target=print, args=[1])
>>> p.run()
1
>>> p = Process(target=print, args=(1,))
>>> p.run()
1
start()

Запустите действие процесса.

Это должно вызываться не более одного раза для каждого объекта процесса. Это позволяет вызывать метод объекта run() в отдельном процессе.

join([timeout])

Если необязательный аргумент timeout равен None (значение по умолчанию), метод блокируется до тех пор, пока не завершится процесс, для которого вызывается метод join(). Если timeout является положительным числом, он блокируется не более чем на время ожидания секунд. Обратите внимание, что метод возвращает None, если его процесс завершается или если время ожидания метода истекло. Проверьте exitcode процесса, чтобы определить, завершился ли он.

К одному процессу можно присоединяться много раз.

Процесс не может присоединиться сам к себе, поскольку это приведет к взаимоблокировке. Попытка присоединиться к процессу до его запуска является ошибкой.

name

Имя процесса. Имя представляет собой строку, используемую только для идентификации. Оно не имеет семантики. Нескольким процессам может быть присвоено одно и то же имя.

Начальное имя задается конструктором. Если конструктору не указано явное имя, то создается имя вида „Process-N1:N2:…:Nk“, где каждое N:sub:k является N-м дочерним элементом своего родителя.

is_alive()

Возвращает, запущен ли процесс.

Грубо говоря, объект process активен с момента возврата метода start() до завершения дочернего процесса.

daemon

Флаг демона процесса, логическое значение. Это значение должно быть установлено перед вызовом start().

Начальное значение наследуется в процессе создания.

Когда процесс завершает работу, он пытается завершить все свои демонические дочерние процессы.

Обратите внимание, что демоническому процессу запрещено создавать дочерние процессы. В противном случае демонический процесс оставил бы своих дочерних процессов сиротами, если бы он был завершен при завершении родительского процесса. Кроме того, это не демоны или службы Unix, это обычные процессы, которые будут завершены (а не присоединены), если завершатся недемонические процессы.

В дополнение к threading.Thread API, объекты Process также поддерживают следующие атрибуты и методы:

pid

Верните идентификатор процесса. Перед запуском процесса это значение будет None.

exitcode

Код завершения дочернего процесса. Это будет None, если процесс еще не завершен.

Если дочерний метод run() возвращает значение в обычном режиме, код завершения будет равен 0. Если он завершается с помощью sys.exit() с целочисленным аргументом N, кодом завершения будет N.

Если дочерний процесс завершился из-за исключения, не перехваченного в пределах run(), код завершения будет равен 1. Если он был завершен по сигналу N, кодом завершения будет отрицательное значение -N.

authkey

Ключ аутентификации процесса (строка байтов).

При инициализации multiprocessing основному процессу присваивается случайная строка с использованием os.urandom().

Когда создается объект Process, он наследует ключ аутентификации своего родительского процесса, хотя это можно изменить, установив значение authkey в другой байтовой строке.

Смотрите Ключи аутентификации.

sentinel

Цифровой дескриптор системного объекта, который станет «готовым» после завершения процесса.

Вы можете использовать это значение, если хотите дождаться нескольких событий одновременно, используя multiprocessing.connection.wait(). В противном случае проще вызвать join().

В Windows это дескриптор операционной системы, который можно использовать с вызовами семейства API WaitForSingleObject и WaitForMultipleObjects. В Unix это файловый дескриптор, который можно использовать с примитивами из модуля select.

Добавлено в версии 3.3.

terminate()

Завершите процесс. В POSIX это делается с помощью сигнала SIGTERM; в Windows используется TerminateProcess(). Обратите внимание, что обработчики exit и предложения finally и т.д. выполняться не будут.

Обратите внимание, что процессы-потомки этого процесса не будут завершены - они просто станут потерянными.

Предупреждение

Если этот метод используется, когда связанный процесс использует канал или очередь, то канал или очередь могут быть повреждены и могут стать непригодными для использования другим процессом. Аналогично, если процесс получил блокировку или семафор и т.д., то его завершение может привести к взаимоблокировке других процессов.

kill()

То же, что и terminate(), но с использованием сигнала SIGKILL в Unix.

Добавлено в версии 3.7.

close()

Закройте объект Process, освободив все связанные с ним ресурсы. ValueError вызывается, если базовый процесс все еще выполняется. Как только close() будет успешно возвращен, большинство других методов и атрибутов объекта Process вызовут ValueError.

Добавлено в версии 3.7.

Обратите внимание, что методы start(), join(), is_alive(), terminate() и exitcode должны вызываться только процессом, создавшим объект process.

Пример использования некоторых методов Process:

>>> import multiprocessing, time, signal
>>> p = multiprocessing.Process(target=time.sleep, args=(1000,))
>>> print(p, p.is_alive())
<Process ... initial> False
>>> p.start()
>>> print(p, p.is_alive())
<Process ... started> True
>>> p.terminate()
>>> time.sleep(0.1)
>>> print(p, p.is_alive())
<Process ... stopped exitcode=-SIGTERM> False
>>> p.exitcode == -signal.SIGTERM
True
exception multiprocessing.ProcessError

Базовый класс всех исключений multiprocessing.

exception multiprocessing.BufferTooShort

Исключение, вызванное Connection.recv_bytes_into(), когда предоставленный объект buffer слишком мал для чтения сообщения.

Если e является экземпляром BufferTooShort, то e.args[0] передаст сообщение в виде байтовой строки.

exception multiprocessing.AuthenticationError

Вызывается при возникновении ошибки аутентификации.

exception multiprocessing.TimeoutError

Вызывается методами с тайм-аутом по истечении этого времени.

Каналы и очереди

При использовании нескольких процессов обычно используется передача сообщений для взаимодействия между процессами и избегается необходимость использования каких-либо примитивов синхронизации, таких как блокировки.

Для передачи сообщений можно использовать Pipe() (для соединения между двумя процессами) или очередь (что позволяет использовать несколько производителей и потребителей).

Типы Queue, SimpleQueue и JoinableQueue представляют собой очереди с несколькими производителями и несколькими потребителями FIFO, созданные по образцу класса queue.Queue в стандартной библиотеке. Они отличаются тем, что в Queue отсутствуют методы task_done() и join(), представленные в классе queue.Queue Python 2.5.

Если вы используете JoinableQueue, то вы должны вызывать JoinableQueue.task_done() для каждой задачи, удаленной из очереди, иначе семафор, используемый для подсчета количества незавершенных задач, может в конечном итоге переполниться, вызывая исключение.

Обратите внимание, что можно также создать общую очередь с помощью объекта manager - см. Менеджеры.

Примечание

multiprocessing использует обычные исключения queue.Empty и queue.Full, чтобы сигнализировать о тайм-ауте. Они недоступны в пространстве имен multiprocessing, поэтому вам необходимо импортировать их из queue.

Примечание

Когда объект помещается в очередь, он обрабатывается, и фоновый поток позже сбрасывает обработанные данные в базовый канал. Это имеет некоторые последствия, которые немного удивляют, но не должны вызывать каких-либо практических трудностей - если они вас действительно беспокоят, вы можете вместо этого использовать очередь, созданную с помощью manager.

  1. После помещения объекта в пустую очередь может возникнуть бесконечно малая задержка, прежде чем метод очереди empty() вернет False и get_nowait(), не вызывая queue.Empty.

  2. Если несколько процессов помещают объекты в очередь, возможно, что объекты будут получены на другом конце не по порядку. Однако объекты, помещенные в очередь одним и тем же процессом, всегда будут находиться в ожидаемом порядке по отношению друг к другу.

Предупреждение

Если процесс прерывается с помощью Process.terminate() или os.kill(), в то время как он пытается использовать Queue, то данные в очереди, скорее всего, будут повреждены. Это может привести к тому, что любой другой процесс получит исключение при попытке использовать очередь позже.

Предупреждение

Как упоминалось выше, если дочерний процесс поместил элементы в очередь (и он не использовал JoinableQueue.cancel_join_thread), то этот процесс не завершится до тех пор, пока все буферизованные элементы не будут сброшены в канал.

Это означает, что если вы попытаетесь присоединиться к этому процессу, вы можете столкнуться с тупиковой ситуацией, если не будете уверены, что все элементы, которые были помещены в очередь, были использованы. Аналогично, если дочерний процесс не является демоническим, то родительский процесс может зависнуть при выходе, когда он попытается присоединиться ко всем своим недемоническим дочерним процессам.

Обратите внимание, что в очереди, созданной с помощью менеджера, такой проблемы нет. Смотрите Руководящие принципы программирования.

Пример использования очередей для межпроцессного взаимодействия приведен в разделе Примеры.

multiprocessing.Pipe([duplex])

Возвращает пару (conn1, conn2) объектов Connection, представляющих концы трубы.

Если значение duplex равно True (по умолчанию), то канал является двунаправленным. Если значение duplex равно False, то канал является однонаправленным: conn1 может использоваться только для приема сообщений, а conn2 может использоваться только для отправки сообщений.

class multiprocessing.Queue([maxsize])

Возвращает общую очередь процесса, реализованную с использованием канала и нескольких блокировок/семафоров. Когда процесс впервые помещает элемент в очередь, запускается поток-фидер, который передает объекты из буфера в канал.

Обычные исключения queue.Empty и queue.Full из модуля queue стандартной библиотеки генерируются для оповещения о тайм-аутах.

Queue реализует все методы queue.Queue, за исключением task_done() и join().

qsize()

Возвращает приблизительный размер очереди. Из-за семантики многопоточности/многопроцессорности это число является ненадежным.

Обратите внимание, что это может вызвать NotImplementedError на платформах Unix, таких как macOS, где sem_getvalue() не реализован.

empty()

Возвращает True, если очередь пуста, False в противном случае. Из-за семантики многопоточности/многопроцессорности это ненадежно.

full()

Возвращает True, если очередь заполнена, False в противном случае. Из-за семантики многопоточности/многопроцессорности это ненадежно.

put(obj[, block[, timeout]])

Поместите объект obj в очередь. Если необязательный аргумент block равен True (по умолчанию), а timeout равен None (по умолчанию), при необходимости заблокируйте, пока не освободится место. Если timeout является положительным числом, он блокирует не более timeout секунд и вызывает исключение queue.Full, если в течение этого времени не было доступно ни одного свободного места. В противном случае (block - это False), поместите элемент в очередь, если свободное место доступно немедленно, иначе вызовите исключение queue.Full (timeout в этом случае игнорируется).

Изменено в версии 3.8: Если очередь закрыта, то вместо AssertionError выводится ValueError.

put_nowait(obj)

Эквивалентно put(obj, False).

get([block[, timeout]])

Удалите и верните элемент из очереди. Если необязательные аргументы block имеют значение True (по умолчанию), а timeout - None (по умолчанию), при необходимости заблокируйте, пока элемент не станет доступен. Если timeout является положительным числом, он блокирует не более timeout секунд и вызывает исключение queue.Empty, если в течение этого времени не было доступно ни одного элемента. В противном случае (блок равен False), верните элемент, если он доступен немедленно, иначе вызовите исключение queue.Empty (тайм-аут в этом случае игнорируется).

Изменено в версии 3.8: Если очередь закрыта, то вместо OSError выводится ValueError.

get_nowait()

Эквивалентно get(False).

multiprocessing.Queue содержит несколько дополнительных методов, которых нет в queue.Queue. В большинстве случаев эти методы не требуются:

close()

Укажите, что текущий процесс больше не будет помещать данные в эту очередь. Фоновый поток завершит работу, как только он сбросит все буферизованные данные в канал. Это вызывается автоматически, когда очередь заполняется мусором.

join_thread()

Присоединиться к фоновому потоку. Это можно использовать только после вызова close(). Он блокируется до завершения фонового потока, гарантируя, что все данные из буфера будут сброшены в канал.

По умолчанию, если процесс не является создателем очереди, то при выходе он попытается присоединиться к фоновому потоку очереди. Процесс может вызвать cancel_join_thread(), чтобы заставить join_thread() ничего не делать.

cancel_join_thread()

Предотвращать блокировку join_thread(). В частности, это предотвращает автоматическое присоединение фонового потока при завершении процесса - см. join_thread().

Более подходящим названием для этого метода могло бы быть allow_exit_without_flush(). Это может привести к потере данных, помещенных в очередь, и вам почти наверняка не понадобится его использовать. На самом деле это возможно только в том случае, если вам нужно немедленно завершить текущий процесс, не дожидаясь сброса данных, помещенных в очередь, в базовый канал, и вы не заботитесь о потере данных.

Примечание

Функциональность этого класса требует наличия функционирующей реализации общего семафора в операционной системе хоста. Без нее функциональность этого класса будет отключена, а попытки создать экземпляр Queue приведут к появлению ImportError. Смотрите bpo-3770 для получения дополнительной информации. То же самое относится к любому из специализированных типов очередей, перечисленных ниже.

class multiprocessing.SimpleQueue

Это упрощенный тип Queue, очень близкий к закрытому типу Pipe.

close()

Закройте очередь: освободите внутренние ресурсы.

Очередь больше не должна использоваться после ее закрытия. Например, методы, get(), put() и empty() больше не должны вызываться.

Добавлено в версии 3.9.

empty()

Верните True, если очередь пуста, False в противном случае.

get()

Удаление и возврат элемента из очереди.

put(item)

Поместите элемент в очередь.

class multiprocessing.JoinableQueue([maxsize])

JoinableQueue, подкласс Queue, представляет собой очередь, которая дополнительно содержит методы task_done() и join().

task_done()

Указывает, что ранее поставленная в очередь задача завершена. Используется пользователями очереди. Для каждого get(), используемого для получения задачи, последующий вызов task_done() сообщает очереди, что обработка задачи завершена.

Если join() в данный момент заблокирован, он возобновится, когда все элементы будут обработаны (это означает, что для каждого элемента, который был помещен put() в очередь, был получен вызов task_done()).

Вызывает ValueError, если вызывается больше раз, чем было элементов, помещенных в очередь.

join()

Блокируйте до тех пор, пока все элементы в очереди не будут получены и обработаны.

Количество незавершенных задач увеличивается всякий раз, когда элемент добавляется в очередь. Количество незавершенных задач уменьшается всякий раз, когда пользователь набирает task_done(), чтобы сообщить, что элемент был получен и вся работа с ним завершена. Когда количество незавершенных задач упадет до нуля, join() разблокируется.

Разнообразный

multiprocessing.active_children()

Возвращает список всех живых дочерних элементов текущего процесса.

Вызов этого метода имеет побочный эффект «присоединения» ко всем процессам, которые уже завершились.

multiprocessing.cpu_count()

Возвращает количество процессоров в системе.

Это число не соответствует количеству процессоров, которые может использовать текущий процесс. Количество используемых процессоров можно получить с помощью len(os.sched_getaffinity(0))

Когда количество процессоров не может быть определено, выводится значение NotImplementedError.

См.также

os.cpu_count()

multiprocessing.current_process()

Возвращает объект Process, соответствующий текущему процессу.

Аналог threading.current_thread().

multiprocessing.parent_process()

Возвращает Process объект, соответствующий родительскому процессу current_process(). Для основного процесса parent_process будет None.

Добавлено в версии 3.8.

multiprocessing.freeze_support()

Добавлена поддержка для случаев, когда программа, использующая multiprocessing, была заморожена для создания исполняемого файла Windows. (Было протестировано с помощью py2exe, PyInstaller и cx_Freeze.)

Эту функцию нужно вызывать сразу после строки if __name__ == '__main__' основного модуля. Например:

from multiprocessing import Process, freeze_support

def f():
    print('hello world!')

if __name__ == '__main__':
    freeze_support()
    Process(target=f).start()

Если строка freeze_support() опущена, то при попытке запустить замороженный исполняемый файл будет выведено значение RuntimeError.

Вызов freeze_support() не имеет эффекта при вызове в любой операционной системе, отличной от Windows. Кроме того, если модуль нормально запускается интерпретатором Python в Windows (программа не была заморожена), то freeze_support() не имеет эффекта.

multiprocessing.get_all_start_methods()

Возвращает список поддерживаемых методов запуска, первый из которых используется по умолчанию. Возможными методами запуска являются 'fork', 'spawn' и 'forkserver'. В Windows доступен только 'spawn'. В Unix всегда поддерживаются 'fork' и 'spawn', по умолчанию используется 'fork'.

Добавлено в версии 3.4.

multiprocessing.get_context(method=None)

Возвращает объект контекста, который имеет те же атрибуты, что и модуль multiprocessing.

Если значение method равно None, то возвращается контекст по умолчанию. В противном случае метод должен быть 'fork', 'spawn', 'forkserver'. ValueError вызывается, если указанный метод запуска недоступен.

Добавлено в версии 3.4.

multiprocessing.get_start_method(allow_none=False)

Возвращает имя метода запуска, используемого для запуска процессов.

Если метод start не был исправлен и значение allow_none равно false, то для метода start устанавливается значение по умолчанию и возвращается имя. Если метод start не был исправлен и значение allow_none равно true, то возвращается значение None.

Возвращаемое значение может быть 'fork', 'spawn', 'forkserver' или None. 'fork' является значением по умолчанию в Unix, в то время как 'spawn' является по умолчанию в Windows и Mac OS.

Добавлено в версии 3.4.

Изменено в версии 3.8: В macOS метод spawn start теперь используется по умолчанию. Метод fork start следует считать небезопасным, поскольку он может привести к сбоям в работе подпроцесса. Смотрите bpo-33725.

multiprocessing.set_executable(executable)

Укажите путь к интерпретатору Python, который будет использоваться при запуске дочернего процесса. (По умолчанию используется sys.executable). Вероятно, разработчикам встраивания потребуется сделать что-то вроде

set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))

прежде чем они смогут создавать дочерние процессы.

Изменено в версии 3.4: Теперь поддерживается в Unix при использовании метода 'spawn' start.

Изменено в версии 3.11: Принимает значение path-like object.

multiprocessing.set_start_method(method, force=False)

Задайте метод, который должен использоваться для запуска дочерних процессов. Аргументом method может быть 'fork', 'spawn' или 'forkserver'. Вызывает RuntimeError, если метод запуска уже был установлен, а значение force не равно True. Если значение method равно None, а значение force равно True, то для метода start устанавливается значение None. Если значение method равно None, а значение force равно False, то для контекста устанавливается значение по умолчанию.

Обратите внимание, что это должно вызываться не более одного раза, и оно должно быть защищено внутри предложения if __name__ == '__main__' основного модуля.

Добавлено в версии 3.4.

Объекты подключения

Объекты Connection позволяют отправлять и получать объекты или строки с возможностью выбора. Их можно рассматривать как подключенные сокеты, ориентированные на сообщения.

Объекты подключения обычно создаются с помощью Pipe - смотрите также Слушатели и клиенты.

class multiprocessing.connection.Connection
send(obj)

Отправьте объект на другой конец соединения, который должен быть прочитан с помощью recv().

Объект должен быть пригоден для маринования. Очень большие размеры (примерно 32 МБ и более, хотя это зависит от операционной системы) могут вызвать исключение ValueError.

recv()

Возвращает объект, отправленный с другого конца соединения, используя send(). Блокирует до тех пор, пока не появится что-либо для получения. Возвращает EOFError, если больше нечего получать, а другой конец был закрыт.

fileno()

Возвращает файловый дескриптор или дескриптор, используемый соединением.

close()

Закройте соединение.

Это вызывается автоматически, когда соединение подвергается сборке мусора.

poll([timeout])

Возвращает, доступны ли какие-либо данные для чтения.

Если значение timeout не указано, то оно будет возвращено немедленно. Если значение timeout является числом, то оно указывает максимальное время блокировки в секундах. Если значение timeout равно None, то используется бесконечный тайм-аут.

Обратите внимание, что с помощью multiprocessing.connection.wait() можно опросить сразу несколько объектов подключения.

send_bytes(buffer[, offset[, size]])

Отправьте байтовые данные из bytes-like object в виде полного сообщения.

Если задано значение offset, то данные считываются из этой позиции в buffer. Если задан size, то из буфера будет считано именно это количество байт. Очень большие буферы (приблизительно 32 Мб+, хотя это зависит от операционной системы) могут вызвать исключение ValueError

recv_bytes([maxlength])

Возвращает полное сообщение, состоящее из байтовых данных, отправленных с другого конца соединения в виде строки. Блокируется до тех пор, пока не появится что-либо для приема. Возвращает значение EOFError, если принимать больше нечего и другой конец закрыт.

Если указано значение maxlength и сообщение длиннее, чем maxlength, то выводится значение OSError и соединение больше не будет доступно для чтения.

Изменено в версии 3.3: Эта функция раньше вызывала IOError, который теперь является псевдонимом OSError.

recv_bytes_into(buffer[, offset])

Считывает в буфер полное сообщение, состоящее из байтовых данных, отправленных с другого конца соединения, и возвращает количество байт в сообщении. Блокирует до тех пор, пока не появится что-либо для получения. Увеличивает значение EOFError, если больше нечего получать, а другой конец был закрыт.

буфер должен быть доступен для записи bytes-like object. Если задано значение offset, то сообщение будет записано в буфер с этой позиции. Значение Offset должно быть неотрицательным целым числом, меньшим длины buffer (в байтах).

Если буфер слишком короткий, то возникает исключение BufferTooShort и полное сообщение доступно в виде e.args[0], где e - это экземпляр исключения.

Изменено в версии 3.3: Сами объекты подключения теперь могут передаваться между процессами с помощью Connection.send() и Connection.recv().

Объекты Connection также теперь поддерживают протокол управления контекстом - смотрите Типы контекстных менеджеров. __enter__() возвращает объект connection, а __exit__() вызывает close().

Например:

>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

Предупреждение

Метод Connection.recv() автоматически удаляет данные, которые он получает, что может представлять угрозу безопасности, если вы не доверяете процессу, отправившему сообщение.

Следовательно, если объект connection не был создан с использованием Pipe(), вам следует использовать методы recv() и send() только после выполнения какой-либо проверки подлинности. Смотрите Ключи аутентификации.

Предупреждение

Если процесс прерывается при попытке чтения или записи в канал, то данные в канале, скорее всего, будут повреждены, поскольку может оказаться невозможным определить, где находятся границы сообщений.

Примитивы синхронизации

Как правило, примитивы синхронизации не так необходимы в многопроцессорной программе, как в многопоточной. Смотрите документацию по модулю threading.

Обратите внимание, что можно также создавать примитивы синхронизации с помощью объекта manager - см. Менеджеры.

class multiprocessing.Barrier(parties[, action[, timeout]])

Объект барьера: клон threading.Barrier.

Добавлено в версии 3.3.

class multiprocessing.BoundedSemaphore([value])

Ограниченный объект-семафор: близкий аналог threading.BoundedSemaphore.

Существует единственное отличие от его близкой аналогии: первый аргумент его метода acquire называется block, что согласуется с Lock.acquire().

Примечание

В macOS это неотличимо от Semaphore, потому что sem_getvalue() не реализован на этой платформе.

class multiprocessing.Condition([lock])

Условная переменная: псевдоним для threading.Condition.

Если указан lock, то это должен быть Lock или RLock объект из multiprocessing.

Изменено в версии 3.3: Был добавлен метод wait_for().

class multiprocessing.Event

Клон threading.Event.

class multiprocessing.Lock

Объект нерекурсивной блокировки: близкий аналог threading.Lock. Как только процесс или поток получает блокировку, последующие попытки получить ее от любого процесса или потока будут блокироваться до тех пор, пока она не будет снята; любой процесс или поток может снять блокировку. Концепции и поведение threading.Lock в том виде, в каком они применимы к потокам, воспроизведены здесь в multiprocessing.Lock в том виде, в каком они применимы как к процессам, так и к потокам, за исключением отмеченных случаев.

Обратите внимание, что Lock на самом деле является фабричной функцией, которая возвращает экземпляр multiprocessing.synchronize.Lock, инициализированный контекстом по умолчанию.

Lock поддерживает протокол context manager и, таким образом, может использоваться в инструкциях with.

acquire(block=True, timeout=None)

Получите блокировку, блокирующую или неблокирующую.

Если аргументу block присвоено значение True (по умолчанию), вызов метода будет блокироваться до тех пор, пока блокировка не перейдет в разблокированное состояние, затем установите для него значение locked и верните True. Обратите внимание, что имя этого первого аргумента отличается от имени в threading.Lock.acquire().

Если аргументу block присвоено значение False, вызов метода не блокируется. Если блокировка в данный момент находится в заблокированном состоянии, верните False; в противном случае установите блокировку в заблокированное состояние и верните True.

При вызове с положительным значением с плавающей запятой для параметра timeout, блокируйте не более чем на количество секунд, указанное в параметре timeout, до тех пор, пока блокировка не будет получена. Вызовы с отрицательным значением параметра timeout эквивалентны нулевому времени ожидания. Вызовы со значением timeout, равным None (по умолчанию), устанавливают бесконечный период ожидания. Обратите внимание, что обработка отрицательных значений или None для timeout отличается от реализованного поведения в threading.Lock.acquire(). Аргумент timeout не имеет практического значения, если аргументу block присвоено значение False и, таким образом, игнорируется. Возвращает True, если блокировка была получена, или False, если истек тайм-аут.

release()

Снимите блокировку. Это может быть вызвано из любого процесса или потока, а не только из процесса или потока, которые изначально получили блокировку.

Поведение такое же, как в threading.Lock.release(), за исключением того, что при вызове разблокированной блокировки возникает ValueError.

class multiprocessing.RLock

Объект рекурсивной блокировки: близкий аналог threading.RLock. Рекурсивная блокировка должна быть снята процессом или потоком, который ее получил. Как только процесс или поток получает рекурсивную блокировку, тот же самый процесс или поток может получить ее снова без блокировки; этот процесс или поток должен освобождать ее один раз за каждый раз, когда она была получена.

Обратите внимание, что RLock на самом деле является фабричной функцией, которая возвращает экземпляр multiprocessing.synchronize.RLock, инициализированный контекстом по умолчанию.

RLock поддерживает протокол context manager и, таким образом, может использоваться в инструкциях with.

acquire(block=True, timeout=None)

Получите блокировку, блокирующую или неблокирующую.

При вызове с аргументом block, равным True, блокируйте до тех пор, пока блокировка не перейдет в разблокированное состояние (не будет принадлежать какому-либо процессу или потоку), если только блокировка уже не принадлежит текущему процессу или потоку. Затем текущий процесс или поток становится владельцем блокировки (если у него еще нет владельца), и уровень рекурсии внутри блокировки увеличивается на единицу, в результате чего возвращается значение True. Обратите внимание, что в поведении этого первого аргумента есть несколько отличий по сравнению с реализацией threading.RLock.acquire(), начиная с имени самого аргумента.

При вызове с аргументом block, равным False, не блокируйте. Если блокировка уже была получена (и, следовательно, принадлежит) другому процессу или потоку, текущий процесс или поток не становится владельцем, и уровень рекурсии в пределах блокировки не изменяется, что приводит к возвращаемому значению False. Если блокировка находится в разблокированном состоянии, текущий процесс или поток становится владельцем, и уровень рекурсии увеличивается, в результате чего возвращается значение True.

Использование и поведение аргумента timeout такие же, как в Lock.acquire(). Обратите внимание, что некоторые из этих действий timeout отличаются от реализованных в threading.RLock.acquire().

release()

Снимите блокировку, уменьшив уровень рекурсии. Если после уменьшения уровень рекурсии равен нулю, сбросьте блокировку на разблокированную (не принадлежащую ни одному процессу или потоку), и если какие-либо другие процессы или потоки заблокированы в ожидании разблокировки блокировки, разрешите продолжить работу только одному из них. Если после уменьшения уровень рекурсии по-прежнему отличен от нуля, блокировка остается заблокированной и принадлежит вызывающему процессу или потоку.

Вызывайте этот метод только в том случае, если блокировка принадлежит вызывающему процессу или потоку. Значение AssertionError возникает, если этот метод вызывается процессом или потоком, отличным от владельца, или если блокировка находится в незаблокированном (не принадлежащем) состоянии. Обратите внимание, что тип исключения, возникающего в этой ситуации, отличается от реализованного поведения в threading.RLock.release().

class multiprocessing.Semaphore([value])

Объект-семафор: близкий аналог threading.Semaphore.

Существует единственное отличие от его близкой аналогии: первый аргумент его метода acquire называется block, что согласуется с Lock.acquire().

Примечание

В Mac OS sem_timedwait не поддерживается, поэтому вызов acquire() с тайм-аутом будет имитировать поведение этой функции, используя спящий цикл.

Примечание

Если сигнал SIGINT, сгенерированный Ctrl-C, поступает в то время, когда основной поток заблокирован вызовом BoundedSemaphore.acquire(), Lock.acquire(), RLock.acquire(), Semaphore.acquire(), Condition.acquire() или Condition.wait(), то вызов будет немедленно прерван и будет запущен KeyboardInterrupt.

Это отличается от поведения threading, где SIGINT будет игнорироваться во время выполнения эквивалентных вызовов блокировки.

Примечание

Для некоторых функций этого пакета требуется функционирующая реализация общего семафора в операционной системе хоста. Без этого модуль multiprocessing.synchronize будет отключен, а попытки его импорта приведут к ошибке ImportError. Дополнительную информацию смотрите в разделе bpo-3770.

Общие ctypes Объекты

Можно создавать общие объекты, используя общую память, которая может быть унаследована дочерними процессами.

multiprocessing.Value(typecode_or_type, *args, lock=True)

Возвращает объект ctypes, выделенный из общей памяти. По умолчанию возвращаемое значение на самом деле является синхронизированной оболочкой для объекта. Доступ к самому объекту можно получить с помощью атрибута value для Value.

typecode_or_type определяет тип возвращаемого объекта: это либо тип ctypes, либо односимвольный код типа, используемый модулем array. *args передается в конструктор для данного типа.

Если значение lock равно True (по умолчанию), то для синхронизации доступа к значению создается новый объект рекурсивной блокировки. Если lock является объектом Lock или RLock, то он будет использоваться для синхронизации доступа к значению. Если значение lock равно False, то доступ к возвращаемому объекту не будет автоматически защищен блокировкой, поэтому он не обязательно будет «безопасен для процесса».

Операции, подобные +=, которые включают в себя чтение и запись, не являются атомарными. Поэтому, если, например, вы хотите атомарно увеличить общее значение, недостаточно просто выполнить

counter.value += 1

Предполагая, что связанная блокировка является рекурсивной (каковой она является по умолчанию), вы можете вместо этого сделать

with counter.get_lock():
    counter.value += 1

Обратите внимание, что lock - это аргумент только для ключевого слова.

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

Возвращает массив ctypes, выделенный из общей памяти. По умолчанию возвращаемое значение фактически является синхронизированной оболочкой для массива.

typecode_or_type определяет тип элементов возвращаемого массива: это либо тип ctypes, либо односимвольный код типа, используемый модулем array. Если size_or_initializer является целым числом, то оно определяет длину массива, и изначально массив будет обнулен. В противном случае, size_or_initializer - это последовательность, которая используется для инициализации массива и длина которой определяет длину массива.

Если значение lock равно True (по умолчанию), то для синхронизации доступа к значению создается новый объект блокировки. Если lock является объектом Lock или RLock, то он будет использоваться для синхронизации доступа к значению. Если значение lock равно False, то доступ к возвращаемому объекту не будет автоматически защищен блокировкой, поэтому он не обязательно будет «безопасен для процесса».

Обратите внимание, что lock - это аргумент только для ключевого слова.

Обратите внимание, что массив ctypes.c_char имеет атрибуты value и raw, которые позволяют использовать его для хранения и извлечения строк.

Модуль multiprocessing.sharedctypes

Модуль multiprocessing.sharedctypes предоставляет функции для выделения ctypes объектов из общей памяти, которые могут быть унаследованы дочерними процессами.

Примечание

Хотя указатель можно сохранить в общей памяти, помните, что он будет ссылаться на местоположение в адресном пространстве определенного процесса. Однако указатель, скорее всего, будет недействительным в контексте второго процесса, и попытка разыменования указателя из второго процесса может привести к сбою.

multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)

Возвращает массив ctypes, выделенный из общей памяти.

typecode_or_type определяет тип элементов возвращаемого массива: это либо тип ctypes, либо односимвольный код типа, используемый модулем array. Если size_or_initializer является целым числом, то оно определяет длину массива, и изначально массив будет обнулен. В противном случае size_or_initializer - это последовательность, которая используется для инициализации массива и длина которой определяет длину массива.

Обратите внимание, что установка и получение элемента потенциально неатомарны - вместо этого используйте Array(), чтобы убедиться, что доступ автоматически синхронизируется с использованием блокировки.

multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)

Возвращает объект ctypes, выделенный из общей памяти.

typecode_or_type определяет тип возвращаемого объекта: это либо тип ctypes, либо односимвольный код типа, используемый модулем array. *args передается в конструктор для данного типа.

Обратите внимание, что установка и получение значения потенциально неатомарны - вместо этого используйте Value(), чтобы убедиться, что доступ автоматически синхронизируется с использованием блокировки.

Обратите внимание, что массив ctypes.c_char имеет атрибуты value и raw, которые позволяют использовать его для хранения и извлечения строк - смотрите документацию для ctypes.

multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *, lock=True)

То же, что и RawArray(), за исключением того, что в зависимости от значения lock вместо необработанного массива ctypes может быть возвращена оболочка синхронизации, безопасная для процесса.

Если значение lock равно True (по умолчанию), то для синхронизации доступа к значению создается новый объект блокировки. Если lock является объектом Lock или RLock, то он будет использоваться для синхронизации доступа к значению. Если значение lock равно False, то доступ к возвращаемому объекту не будет автоматически защищен блокировкой, поэтому он не обязательно будет «безопасен для процесса».

Обратите внимание, что lock - это аргумент только для ключевого слова.

multiprocessing.sharedctypes.Value(typecode_or_type, *args, lock=True)

То же, что и RawValue(), за исключением того, что в зависимости от значения lock вместо необработанного объекта ctypes может быть возвращена оболочка синхронизации, безопасная для процесса.

Если значение lock равно True (по умолчанию), то для синхронизации доступа к значению создается новый объект блокировки. Если lock является объектом Lock или RLock, то он будет использоваться для синхронизации доступа к значению. Если значение lock равно False, то доступ к возвращаемому объекту не будет автоматически защищен блокировкой, поэтому он не обязательно будет «безопасен для процесса».

Обратите внимание, что lock - это аргумент только для ключевого слова.

multiprocessing.sharedctypes.copy(obj)

Возвращает объект ctypes, выделенный из общей памяти, который является копией объекта ctypes obj.

multiprocessing.sharedctypes.synchronized(obj[, lock])

Возвращает защищенный от обработки объект-оболочку для объекта ctypes, который использует lock для синхронизации доступа. Если значение lock равно None (по умолчанию), то объект multiprocessing.RLock создается автоматически.

Синхронизированная оболочка будет иметь два метода в дополнение к методам объекта, который она обертывает: get_obj() возвращает обернутый объект и get_lock() возвращает объект блокировки, используемый для синхронизации.

Обратите внимание, что доступ к объекту ctypes через оболочку может быть намного медленнее, чем доступ к необработанному объекту ctypes.

Изменено в версии 3.5: Синхронизированные объекты поддерживают протокол context manager.

В таблице ниже сравнивается синтаксис для создания объектов shared ctypes из общей памяти с синтаксисом обычных типов. (В таблице MyStruct является некоторым подклассом ctypes.Structure.)

типы данных

sharedctypes использует тип

sharedctypes использует typecode

c_double(2.4)

Исходное значение(c_double, 2.4)

Исходное значение(„d“, 2.4)

Моя структура(4, 6)

Исходное значение(MyStruct, 4, 6)

(c_short * 7)()

Необработанный массив(c_short, 7)

Необработанный массив(„h“, 7)

(c_int * 3)(9, 2, 8)

Необработанный массив(c_int, (9, 2, 8))

Необработанный массив(„i“, (9, 2, 8))

Ниже приведен пример, в котором дочерний процесс изменяет объекты нескольких типов:

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double

class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]

def modify(n, x, s, A):
    n.value **= 2
    x.value **= 2
    s.value = s.value.upper()
    for a in A:
        a.x **= 2
        a.y **= 2

if __name__ == '__main__':
    lock = Lock()

    n = Value('i', 7)
    x = Value(c_double, 1.0/3.0, lock=False)
    s = Array('c', b'hello world', lock=lock)
    A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)

    p = Process(target=modify, args=(n, x, s, A))
    p.start()
    p.join()

    print(n.value)
    print(x.value)
    print(s.value)
    print([(a.x, a.y) for a in A])

Напечатанные результаты таковы

49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

Менеджеры

Менеджеры предоставляют способ создания данных, которые могут совместно использоваться различными процессами, включая совместное использование по сети между процессами, запущенными на разных компьютерах. Объект manager управляет серверным процессом, который управляет общими объектами. Другие процессы могут получать доступ к общим объектам с помощью прокси-серверов.

multiprocessing.Manager()

Возвращает запущенный SyncManager объект, который может использоваться для совместного использования объектов между процессами. Возвращаемый объект manager соответствует созданному дочернему процессу и имеет методы, которые будут создавать общие объекты и возвращать соответствующие прокси.

Управляющие процессы будут завершены, как только они завершат сборку мусора или завершат работу их родительского процесса. Классы управляющих определены в модуле multiprocessing.managers:

class multiprocessing.managers.BaseManager(address=None, authkey=None, serializer='pickle', ctx=None, *, shutdown_timeout=1.0)

Создайте объект BaseManager.

После создания следует вызвать start() или get_server().serve_forever(), чтобы убедиться, что объект manager ссылается на запущенный процесс manager.

address - это адрес, по которому процесс manager прослушивает новые соединения. Если address равен None, то выбирается произвольный адрес.

authkey - это ключ аутентификации, который будет использоваться для проверки правильности входящих подключений к серверному процессу. Если значение authkey равно None, то используется current_process().authkey. В противном случае используется authkey, и это должна быть строка в байтах.

сериализатор должен быть 'pickle' (используйте pickle сериализацию) или 'xmlrpclib' (используйте xmlrpc.client сериализацию).

ctx - это объект контекста, или None (используйте текущий контекст). Смотрите функцию get_context().

shutdown_timeout - это время ожидания в секундах, используемое для завершения процесса, используемого менеджером в методе shutdown(). Если время ожидания истекло, процесс завершается. Если время завершения процесса также истекает, процесс завершается.

Изменено в версии 3.11: Добавлен параметр shutdown_timeout.

start([initializer[, initargs]])

Запустите подпроцесс, чтобы запустить менеджер. Если инициализатор не равен None, то при запуске подпроцесса будет вызван initializer(*initargs).

get_server()

Возвращает объект Server, который представляет фактический сервер, находящийся под управлением администратора. Объект Server поддерживает метод serve_forever():

>>> from multiprocessing.managers import BaseManager
>>> manager = BaseManager(address=('', 50000), authkey=b'abc')
>>> server = manager.get_server()
>>> server.serve_forever()

Server дополнительно имеет атрибут address.

connect()

Подключите объект локального менеджера к процессу удаленного менеджера:

>>> from multiprocessing.managers import BaseManager
>>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc')
>>> m.connect()
shutdown()

Остановите процесс, используемый менеджером. Это доступно только в том случае, если для запуска серверного процесса было использовано значение start().

Это может быть вызвано несколько раз.

register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])

Метод класса, который может быть использован для регистрации типа или вызываемого объекта в классе manager.

typeid - это «идентификатор типа», который используется для идентификации определенного типа общего объекта. Это должна быть строка.

callable - это вызываемый объект, используемый для создания объектов с этим идентификатором типа. Если экземпляр manager будет подключен к серверу с помощью метода connect() или если аргумент create_method равен False, то это можно оставить как None.

proxytype - это подкласс BaseProxy, который используется для создания прокси-серверов для совместно используемых объектов с этим typeid. Если None, то класс прокси-серверов создается автоматически.

exposed используется для указания последовательности имен методов, доступ к которым должен быть разрешен прокси-серверам для этого typeid с помощью BaseProxy._callmethod(). (Если значение exposed равно None, то вместо него используется proxytype._exposed_, если оно существует.) В случае, когда открытый список не указан, все «общедоступные методы» общего объекта будут доступны. (Здесь «открытый метод» означает любой атрибут, который имеет метод __call__() и имя которого не начинается с '_'.)

method_to_typeid - это сопоставление, используемое для указания типа возвращаемого значения тех открытых методов, которые должны возвращать прокси-сервер. Оно сопоставляет имена методов со строками typeid. (Если method_to_typeid равен None, то вместо него используется proxytype._method_to_typeid_, если он существует.) Если имя метода не является ключом к этому отображению или если отображение равно None, то объект, возвращаемый методом, будет скопирован по значению.

create_method определяет, следует ли создавать метод с именем typeid, который может использоваться для указания серверному процессу создать новый общий объект и вернуть прокси-сервер для него. По умолчанию это True.

BaseManager экземпляры также имеют одно свойство, доступное только для чтения:

address

Адрес, используемый менеджером.

Изменено в версии 3.3: Объекты Manager поддерживают протокол управления контекстом - смотрите Типы контекстных менеджеров. __enter__() запускает серверный процесс (если он еще не запущен), а затем возвращает объект manager. __exit__() вызывает shutdown().

В предыдущих версиях __enter__() не запускался серверный процесс менеджера, если он еще не был запущен.

class multiprocessing.managers.SyncManager

Подкласс BaseManager, который может использоваться для синхронизации процессов. Объекты этого типа возвращаются с помощью multiprocessing.Manager().

Его методы создают и возвращают Прокси-объекты для ряда часто используемых типов данных, которые должны быть синхронизированы между процессами. Это, в частности, включает в себя общие списки и словари.

Barrier(parties[, action[, timeout]])

Создайте общий объект threading.Barrier и верните для него прокси-сервер.

Добавлено в версии 3.3.

BoundedSemaphore([value])

Создайте общий объект threading.BoundedSemaphore и верните для него прокси-сервер.

Condition([lock])

Создайте общий объект threading.Condition и верните для него прокси-сервер.

Если указан параметр lock, то это должен быть прокси-сервер для объекта threading.Lock или threading.RLock.

Изменено в версии 3.3: Был добавлен метод wait_for().

Event()

Создайте общий объект threading.Event и верните для него прокси-сервер.

Lock()

Создайте общий объект threading.Lock и верните для него прокси-сервер.

Namespace()

Создайте общий объект Namespace и верните для него прокси-сервер.

Queue([maxsize])

Создайте общий объект queue.Queue и верните для него прокси-сервер.

RLock()

Создайте общий объект threading.RLock и верните для него прокси-сервер.

Semaphore([value])

Создайте общий объект threading.Semaphore и верните для него прокси-сервер.

Array(typecode, sequence)

Создайте массив и верните для него прокси-сервер.

Value(typecode, value)

Создайте объект с доступным для записи атрибутом value и верните для него прокси-сервер.

dict()
dict(mapping)
dict(sequence)

Создайте общий объект dict и верните для него прокси-сервер.

list()
list(sequence)

Создайте общий объект list и верните для него прокси-сервер.

Изменено в версии 3.6: Общие объекты могут быть вложенными. Например, общий контейнерный объект, такой как общий список, может содержать другие общие объекты, которые будут управляться и синхронизироваться с помощью SyncManager.

class multiprocessing.managers.Namespace

Тип, который может регистрироваться с помощью SyncManager.

Объект пространства имен не имеет открытых методов, но имеет атрибуты, доступные для записи. В его представлении отображаются значения его атрибутов.

Однако при использовании прокси-сервера для объекта пространства имен атрибут, начинающийся с '_', будет атрибутом прокси-сервера, а не атрибутом референта:

>>> manager = multiprocessing.Manager()
>>> Global = manager.Namespace()
>>> Global.x = 10
>>> Global.y = 'hello'
>>> Global._z = 12.3    # this is an attribute of the proxy
>>> print(Global)
Namespace(x=10, y='hello')

Индивидуальные менеджеры

Чтобы создать свой собственный менеджер, пользователь создает подкласс BaseManager и использует метод класса register() для регистрации новых типов или вызываемых объектов в классе manager. Например:

from multiprocessing.managers import BaseManager

class MathsClass:
    def add(self, x, y):
        return x + y
    def mul(self, x, y):
        return x * y

class MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    with MyManager() as manager:
        maths = manager.Maths()
        print(maths.add(4, 3))         # prints 7
        print(maths.mul(7, 8))         # prints 56

Использование удаленного менеджера

Можно запустить сервер управления на одном компьютере и попросить клиентов использовать его с других компьютеров (при условии, что соответствующие брандмауэры позволяют это).

Выполнение следующих команд создает сервер для единой общей очереди, доступ к которой могут получить удаленные клиенты:

>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

Один клиент может получить доступ к серверу следующим образом:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')

Другой клиент также может использовать его:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'

Локальные процессы также могут получить доступ к этой очереди, используя приведенный выше код на клиенте для удаленного доступа к ней:

>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
...     def __init__(self, q):
...         self.q = q
...         super().__init__()
...     def run(self):
...         self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

Прокси-объекты

Прокси-сервер - это объект, который ссылается на общий объект, который находится (предположительно) в другом процессе. Общий объект называется «референтом» прокси-сервера. Несколько прокси-объектов могут иметь одну и ту же ссылку.

У прокси-объекта есть методы, которые вызывают соответствующие методы его референта (хотя не каждый метод референта обязательно будет доступен через прокси-сервер). Таким образом, прокси-сервер можно использовать так же, как и его референт:

>>> from multiprocessing import Manager
>>> manager = Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]

Обратите внимание, что применение str() к прокси-серверу вернет представление референта, тогда как применение repr() вернет представление прокси-сервера.

Важной особенностью прокси-объектов является то, что они доступны для выбора, поэтому их можно передавать между процессами. Таким образом, референт может содержать Прокси-объекты. Это позволяет размещать эти управляемые списки, диктовки и другие Прокси-объекты:

>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b)         # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']

Аналогично, прокси-серверы dict и list могут быть вложены друг в друга:

>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}

Если в референте содержатся стандартные (не являющиеся прокси-сервером) объекты list или dict, изменения этих изменяемых значений не будут распространяться через менеджер, поскольку прокси-сервер не имеет возможности узнать, когда изменяются значения, содержащиеся в нем. Однако сохранение значения в прокси-сервере контейнера (которое запускает __setitem__ для прокси-объекта) распространяется через менеджер, и поэтому для эффективного изменения такого элемента можно повторно присвоить измененное значение прокси-серверу контейнера:

# create a list proxy and append a mutable object (a dictionary)
lproxy = manager.list()
lproxy.append({})
# now mutate the dictionary
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# at this point, the changes to d are not yet synced, but by
# updating the dictionary, the proxy is notified of the change
lproxy[0] = d

Этот подход, возможно, менее удобен, чем использование вложенного Прокси-объекты для большинства случаев использования, но также демонстрирует уровень контроля над синхронизацией.

Примечание

Типы прокси-серверов в multiprocessing не поддерживают сравнение по значению. Так, например, у нас есть:

>>> manager.list([1,2,3]) == [1,2,3]
False

Вместо этого при проведении сравнений следует просто использовать копию референта.

class multiprocessing.managers.BaseProxy

Прокси-объекты являются экземплярами подклассов типа BaseProxy.

_callmethod(methodname[, args[, kwds]])

Вызовите и верните результат метода референта прокси-сервера.

Если proxy является прокси-сервером, референтом которого является obj, то выражение

proxy._callmethod(methodname, args, kwds)

вычислит выражение

getattr(obj, methodname)(*args, **kwds)

в процессе работы менеджера.

Возвращаемое значение будет копией результата вызова или прокси-сервером для нового общего объекта - смотрите документацию для аргумента method_to_typeid BaseManager.register().

Если вызов вызывает исключение, оно повторно вызывается с помощью _callmethod(). Если в процессе менеджера возникает какое-либо другое исключение, оно преобразуется в исключение RemoteError и вызывается с помощью _callmethod().

Обратите особое внимание на то, что исключение будет вызвано, если methodname не было предоставлено.

Пример использования _callmethod():

>>> l = manager.list(range(10))
>>> l._callmethod('__len__')
10
>>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7]
[2, 3, 4, 5, 6]
>>> l._callmethod('__getitem__', (20,))          # equivalent to l[20]
Traceback (most recent call last):
...
IndexError: list index out of range
_getvalue()

Верните копию референта.

Если референт недоступен для выбора, то это вызовет исключение.

__repr__()

Возвращает представление прокси-объекта.

__str__()

Возвращает представление референта.

Уборка

Прокси-объект использует обратный вызов weakref, так что, когда он получает собранный мусор, он отменяет регистрацию в менеджере, которому принадлежит его референт.

Общий объект удаляется из процесса управления, когда на него больше не ссылаются прокси-серверы.

Пулы процессов

Можно создать пул процессов, которые будут выполнять задачи, переданные ему с помощью класса Pool.

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

Объект пула процессов, который управляет пулом рабочих процессов, которым могут быть отправлены задания. Он поддерживает асинхронные результаты с тайм-аутами и обратными вызовами и имеет параллельную реализацию map.

processes - это количество используемых рабочих процессов. Если значение processes равно None, то используется число, возвращаемое параметром os.cpu_count().

Если инициализатор не равен None, то каждый рабочий процесс будет вызывать initializer(*initargs) при запуске.

maxtasksperchild - это количество задач, которые может выполнить рабочий процесс, прежде чем он завершит работу и будет заменен новым рабочим процессом, чтобы освободить неиспользуемые ресурсы. Значение по умолчанию maxtasksperchild равно None, что означает, что рабочие процессы будут жить столько, сколько существует пул.

context может использоваться для указания контекста, используемого для запуска рабочих процессов. Обычно пул создается с помощью функции multiprocessing.Pool() или метода Pool() объекта context. В обоих случаях context задается соответствующим образом.

Обратите внимание, что методы объекта pool должны вызываться только процессом, создавшим пул.

Предупреждение

multiprocessing.pool объекты имеют внутренние ресурсы, которыми необходимо должным образом управлять (как и любым другим ресурсом), используя пул в качестве контекстного менеджера или вызывая close() и terminate() вручную. Невыполнение этого требования может привести к тому, что процесс зависнет в процессе завершения.

Обратите внимание, что ** неправильно ** полагаться на сборщик мусора для уничтожения пула, поскольку CPython не гарантирует, что будет вызван финализатор пула (см. object.__del__() для получения дополнительной информации).

Изменено в версии 3.2: Добавлен параметр maxtasksperchild.

Изменено в версии 3.4: Добавлен параметр context.

Примечание

Рабочие процессы в пределах Pool обычно выполняются в течение всего срока действия рабочей очереди пула. Часто встречающийся в других системах (таких как Apache, mod_wsgi и т.д.) способ освобождения ресурсов, удерживаемых рабочими, заключается в том, чтобы позволить рабочему в пуле выполнить только определенный объем работы перед выходом, очисткой и запуском нового процесса для замены старого. Аргумент maxtasksperchild для параметра Pool предоставляет эту возможность конечному пользователю.

apply(func[, args[, kwds]])

Вызовите func с аргументами args и ключевыми словами kwds. Он блокируется до тех пор, пока результат не будет готов. Учитывая эти блоки, apply_async() лучше подходит для параллельного выполнения работы. Кроме того, func выполняется только в одном из рабочих элементов пула.

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

Вариант метода apply(), который возвращает объект AsyncResult.

Если указано значение callback, то оно должно быть вызываемым и принимать единственный аргумент. Когда результат становится готовым, к нему применяется callback, если только вызов не завершился неудачей, и в этом случае вместо него применяется error_callback.

Если указано значение error_callback, то оно должно быть вызываемым и принимать единственный аргумент. Если целевая функция завершается ошибкой, то вызывается error_callback с экземпляром exception.

Обратные вызовы должны завершаться немедленно, так как в противном случае поток, обрабатывающий результаты, будет заблокирован.

map(func, iterable[, chunksize])

Параллельный эквивалент встроенной функции map() (хотя она поддерживает только один повторяемый аргумент, для нескольких повторяемых значений см. starmap()). Она блокируется до тех пор, пока результат не будет готов.

Этот метод разбивает итерацию на несколько блоков, которые передаются в пул процессов в качестве отдельных задач. (Приблизительный) размер этих блоков можно задать, установив значение chunksize равным целому положительному числу.

Обратите внимание, что это может привести к интенсивному использованию памяти для очень длинных итераций. Рассмотрите возможность использования imap() или imap_unordered() с явной опцией chunksize для повышения эффективности.

map_async(func, iterable[, chunksize[, callback[, error_callback]]])

Вариант метода map(), который возвращает объект AsyncResult.

Если указано значение callback, то оно должно быть вызываемым и принимать единственный аргумент. Когда результат становится готовым, к нему применяется callback, если только вызов не завершился неудачей, и в этом случае вместо него применяется error_callback.

Если указано значение error_callback, то оно должно быть вызываемым и принимать единственный аргумент. Если целевая функция завершается ошибкой, то вызывается error_callback с экземпляром exception.

Обратные вызовы должны завершаться немедленно, так как в противном случае поток, обрабатывающий результаты, будет заблокирован.

imap(func, iterable[, chunksize])

Более ленивая версия map().

Аргумент chunksize такой же, как и тот, который используется методом map(). Для очень длинных итераций использование большого значения для chunksize может значительно ускорить выполнение задания, чем использование значения по умолчанию 1.

Также, если значение chunksize равно 1, то метод next() итератора, возвращаемый методом imap(), имеет необязательный параметр timeout: next(timeout) вызовет multiprocessing.TimeoutError если результат не может быть возвращен в течение таймаута секунд.

imap_unordered(func, iterable[, chunksize])

То же, что и imap(), за исключением того, что порядок следования результатов из возвращаемого итератора следует считать произвольным. (Только при наличии только одного рабочего процесса порядок гарантированно будет «правильным».)

starmap(func, iterable[, chunksize])

Как и map(), за исключением того, что элементы iterable должны быть повторяемыми, которые распаковываются как аргументы.

Следовательно, * повторяемое* значение [(1,2), (3, 4)] приводит к [func(1,2), func(3,4)].

Добавлено в версии 3.3.

starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])

Комбинация starmap() и map_async(), которая выполняет итерацию по iterable из iterables и вызывает func с распакованными iterables. Возвращает результирующий объект.

Добавлено в версии 3.3.

close()

Предотвращает отправку дополнительных задач в пул. Как только все задачи будут выполнены, рабочие процессы завершатся.

terminate()

Немедленно останавливает рабочие процессы, не завершая невыполненную работу. При сборке мусора для объекта пула будет немедленно вызван terminate().

join()

Дождитесь завершения рабочих процессов. Перед использованием join() необходимо вызвать close() или terminate().

Изменено в версии 3.3: Объекты пула теперь поддерживают протокол управления контекстом - смотрите Типы контекстных менеджеров. __enter__() возвращает объект пула, а __exit__() вызывает terminate().

class multiprocessing.pool.AsyncResult

Класс результата, возвращаемого Pool.apply_async() и Pool.map_async().

get([timeout])

Верните результат, когда он будет получен. Если значение timeout не равно None и результат не будет получен в течение времени ожидания секунд, то значение multiprocessing.TimeoutError будет увеличено. Если удаленный вызов вызвал исключение, то это исключение будет вызвано get().

wait([timeout])

Подождите, пока не появится результат или пока не пройдет тайм-аут секунд.

ready()

Возвращает, был ли вызов завершен.

successful()

Возвращает, завершился ли вызов без возникновения исключения. Вызовет ValueError, если результат не готов.

Изменено в версии 3.7: Если результат не готов, то вместо AssertionError выводится ValueError.

Следующий пример демонстрирует использование пула:

from multiprocessing import Pool
import time

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(processes=4) as pool:         # start 4 worker processes
        result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
        print(result.get(timeout=1))        # prints "100" unless your computer is *very* slow

        print(pool.map(f, range(10)))       # prints "[0, 1, 4,..., 81]"

        it = pool.imap(f, range(10))
        print(next(it))                     # prints "0"
        print(next(it))                     # prints "1"
        print(it.next(timeout=1))           # prints "4" unless your computer is *very* slow

        result = pool.apply_async(time.sleep, (10,))
        print(result.get(timeout=1))        # raises multiprocessing.TimeoutError

Слушатели и клиенты

Обычно передача сообщений между процессами осуществляется с помощью очередей или с помощью объектов Connection, возвращаемых Pipe().

Однако модуль multiprocessing.connection обеспечивает некоторую дополнительную гибкость. По сути, он предоставляет высокоуровневый API, ориентированный на сообщения, для работы с сокетами или именованными каналами Windows. Он также поддерживает дайджест-аутентификацию с использованием модуля hmac и одновременный опрос нескольких подключений.

multiprocessing.connection.deliver_challenge(connection, authkey)

Отправьте случайно сгенерированное сообщение на другой конец соединения и дождитесь ответа.

Если ответ соответствует дайджесту сообщения, в котором в качестве ключа используется authkey, на другой конец соединения отправляется приветственное сообщение. В противном случае выводится значение AuthenticationError.

multiprocessing.connection.answer_challenge(connection, authkey)

Получите сообщение, вычислите дайджест сообщения, используя authkey в качестве ключа, а затем отправьте дайджест обратно.

Если приветственное сообщение не получено, то выводится значение AuthenticationError.

multiprocessing.connection.Client(address[, family[, authkey]])

Попытайтесь установить соединение с прослушивателем, который использует address адрес, возвращая значение Connection.

Тип соединения определяется аргументом family, но его обычно можно опустить, поскольку обычно его можно определить по формату address. (см. Форматы адресов)

Если указан authkey, а не None, он должен быть байтовой строкой и будет использоваться в качестве секретного ключа для проверки подлинности на основе HMAC. Проверка подлинности не выполняется, если значение authkey равно None. AuthenticationError вызывается в случае сбоя проверки подлинности. Смотрите Ключи аутентификации.

class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])

Оболочка для связанного сокета или именованного канала Windows, который «прослушивает» соединения.

address - это адрес, который будет использоваться связанным сокетом или именованным каналом объекта прослушивателя.

Примечание

Если используется адрес «0.0.0.0», этот адрес не будет подключаемой конечной точкой в Windows. Если вам требуется подключаемая конечная точка, вам следует использовать «127.0.0.1».

семейство - это тип используемого сокета (или именованного канала). Это может быть одна из строк 'AF_INET' (для сокета TCP), 'AF_UNIX' ( для сокета домена Unix) или 'AF_PIPE' (для именованного канала Windows). Из них гарантированно доступен только первый. Если значение family равно None, то значение семейства определяется на основе формата address. Если значение address также равно None, то выбирается значение по умолчанию. По умолчанию используется семейство, которое считается самым быстрым из доступных. Смотрите Форматы адресов. Обратите внимание, что если семейство равно 'AF_UNIX', а адрес равен None, то сокет будет создан в частном временном каталоге, созданном с помощью tempfile.mkstemp().

Если объект прослушивателя использует сокет, то backlog (по умолчанию 1) передается методу listen() сокета, как только он будет привязан.

Если указан authkey, а не None, он должен быть байтовой строкой и будет использоваться в качестве секретного ключа для проверки подлинности на основе HMAC. Проверка подлинности не выполняется, если значение authkey равно None. AuthenticationError вызывается в случае сбоя проверки подлинности. Смотрите Ключи аутентификации.

accept()

Примите соединение через связанный сокет или именованный канал объекта-прослушивателя и верните объект Connection. Если попытка аутентификации завершилась неудачей, то вызывается AuthenticationError.

close()

Закройте связанный сокет или именованный канал объекта прослушивателя. Это вызывается автоматически, когда для прослушивателя выполняется сборка мусора. Однако рекомендуется вызывать это явно.

Объекты прослушивателя обладают следующими свойствами, доступными только для чтения:

address

Адрес, который используется объектом прослушивателя.

last_accepted

Адрес, с которого было установлено последнее принятое соединение. Если этот адрес недоступен, то это None.

Изменено в версии 3.3: Объекты прослушивателя теперь поддерживают протокол управления контекстом - смотрите Типы контекстных менеджеров. __enter__() возвращает объект прослушивателя, а __exit__() вызывает close().

multiprocessing.connection.wait(object_list, timeout=None)

Подождите, пока объект в object_list не будет готов. Возвращает список объектов в object_list, которые готовы. Если значение timeout является плавающим, то вызов блокируется не более чем на столько секунд. Если тайм-аут равен None, то он будет заблокирован на неограниченный период. Отрицательный тайм-аут эквивалентен нулевому таймауту.

Как для Unix, так и для Windows объект может отображаться в object_list, если он

Объект соединения или сокета готов, когда из него доступны данные для чтения, или когда другой конец закрыт.

Unix: wait(object_list, timeout) почти эквивалентен select.select(object_list, [], [], timeout). Разница в том, что, если select.select() прерывается сигналом, он может вызвать OSError с номером ошибки EINTR, тогда как wait() этого не произойдет.

Windows: Элемент в object_list должен быть либо целочисленным дескриптором, который доступен для ожидания (в соответствии с определением, используемым в документации функции Win32 WaitForMultipleObjects()), либо это может быть объект с fileno() методом, который возвращает ручка для гнезда или ручка для трубы. (Обратите внимание, что ручки труб и разъемов не являются ручками, которые можно использовать для ожидания.)

Добавлено в версии 3.3.

Примеры

Следующий серверный код создает прослушиватель, который использует 'secret password' в качестве ключа аутентификации. Затем он ожидает подключения и отправляет некоторые данные клиенту:

from multiprocessing.connection import Listener
from array import array

address = ('localhost', 6000)     # family is deduced to be 'AF_INET'

with Listener(address, authkey=b'secret password') as listener:
    with listener.accept() as conn:
        print('connection accepted from', listener.last_accepted)

        conn.send([2.25, None, 'junk', float])

        conn.send_bytes(b'hello')

        conn.send_bytes(array('i', [42, 1729]))

Следующий код подключается к серверу и получает некоторые данные с сервера:

from multiprocessing.connection import Client
from array import array

address = ('localhost', 6000)

with Client(address, authkey=b'secret password') as conn:
    print(conn.recv())                  # => [2.25, None, 'junk', float]

    print(conn.recv_bytes())            # => 'hello'

    arr = array('i', [0, 0, 0, 0, 0])
    print(conn.recv_bytes_into(arr))    # => 8
    print(arr)                          # => array('i', [42, 1729, 0, 0, 0])

Следующий код использует wait() для ожидания сообщений от нескольких процессов одновременно:

import time, random
from multiprocessing import Process, Pipe, current_process
from multiprocessing.connection import wait

def foo(w):
    for i in range(10):
        w.send((i, current_process().name))
    w.close()

if __name__ == '__main__':
    readers = []

    for i in range(4):
        r, w = Pipe(duplex=False)
        readers.append(r)
        p = Process(target=foo, args=(w,))
        p.start()
        # We close the writable end of the pipe now to be sure that
        # p is the only process which owns a handle for it.  This
        # ensures that when p closes its handle for the writable end,
        # wait() will promptly report the readable end as being ready.
        w.close()

    while readers:
        for r in wait(readers):
            try:
                msg = r.recv()
            except EOFError:
                readers.remove(r)
            else:
                print(msg)

Форматы адресов

  • Адрес 'AF_INET' представляет собой кортеж вида (hostname, port), где имя хоста - это строка, а порт - целое число.

  • Адрес 'AF_UNIX' - это строка, представляющая имя файла в файловой системе.

  • Адрес 'AF_PIPE' представляет собой строку вида r'\\.\pipe\PipeName'. Чтобы использовать Client() для подключения к именованному каналу на удаленном компьютере с именем ServerName, вместо этого следует использовать адрес вида r'\\ServerName\pipe\PipeName'.

Обратите внимание, что любая строка, начинающаяся с двух обратных косых черт, по умолчанию считается адресом 'AF_PIPE', а не адресом 'AF_UNIX'.

Ключи аутентификации

При использовании Connection.recv полученные данные автоматически удаляются. К сожалению, удаление данных из ненадежного источника представляет угрозу безопасности. Поэтому Listener и Client() используют модуль hmac для обеспечения дайджест-аутентификации.

Ключ аутентификации - это строка байтов, которую можно рассматривать как пароль: как только соединение установлено, обе стороны потребуют подтверждения того, что другая сторона знает ключ аутентификации. (Демонстрация того, что обе стороны используют один и тот же ключ, не требует отправки ключа по соединению.)

Если запрашивается аутентификация, но ключ аутентификации не указан, то используется возвращаемое значение current_process().authkey (см. Process). Это значение будет автоматически унаследовано любым объектом Process, созданным текущим процессом. Это означает, что (по умолчанию) все процессы многопроцессорной программы будут совместно использовать единый ключ аутентификации, который можно использовать при установлении соединений между собой.

Подходящие ключи аутентификации также могут быть сгенерированы с помощью os.urandom().

Регистрация

Доступна некоторая поддержка ведения журнала. Однако обратите внимание, что пакет logging не использует общие блокировки процессов, поэтому сообщения от разных процессов могут перепутываться (в зависимости от типа обработчика).

multiprocessing.get_logger()

Возвращает логгер, используемый параметром multiprocessing. При необходимости будет создан новый.

При первом создании регистратор имеет уровень logging.NOTSET и не имеет обработчика по умолчанию. Сообщения, отправленные в этот регистратор, по умолчанию не будут передаваться в корневой регистратор.

Обратите внимание, что в Windows дочерние процессы наследуют только уровень регистратора родительского процесса - любые другие настройки регистратора наследоваться не будут.

multiprocessing.log_to_stderr(level=None)

Эта функция выполняет вызов get_logger(), но в дополнение к возвращению регистратора, созданного get_logger, она добавляет обработчик, который отправляет выходные данные в sys.stderr, используя формат '[%(levelname)s/%(processName)s] %(message)s'. Вы можете изменить levelname логгера, передав аргумент level.

Ниже приведен пример сеанса с включенным ведением журнала:

>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0

Полную таблицу уровней ведения журнала смотрите в модуле logging.

Модуль multiprocessing.dummy

multiprocessing.dummy повторяет API multiprocessing, но является не более чем оболочкой для модуля threading.

В частности, функция Pool, предоставляемая multiprocessing.dummy, возвращает экземпляр ThreadPool, который является подклассом Pool, который поддерживает все те же вызовы методов, но использует пул рабочих потоков, а не чем рабочие процессы.

class multiprocessing.pool.ThreadPool([processes[, initializer[, initargs]]])

Объект пула потоков, который управляет пулом рабочих потоков, в которые могут быть отправлены задания. Экземпляры ThreadPool полностью совместимы по интерфейсу с экземплярами Pool, и их ресурсами также необходимо надлежащим образом управлять, либо используя пул в качестве контекстного менеджера, либо вызывая close() и terminate() вручную.

processes - это количество используемых рабочих потоков. Если значение processes равно None, то используется число, возвращаемое параметром os.cpu_count().

Если инициализатор не равен None, то каждый рабочий процесс будет вызывать initializer(*initargs) при запуске.

В отличие от Pool, maxtasksperchild и context не могут быть предоставлены.

Примечание

Модуль ThreadPool использует тот же интерфейс, что и Pool, который разработан на основе пула процессов и появился до появления модуля concurrent.futures. Таким образом, он наследует некоторые операции, которые не имеют смысла для пула, поддерживаемого потоками, и у него есть свой собственный тип для представления статуса асинхронных заданий, AsyncResult, который не понимается никакими другими библиотеками.

Пользователям, как правило, следует предпочесть использовать concurrent.futures.ThreadPoolExecutor, который имеет более простой интерфейс, изначально разработанный с учетом потоков, и который возвращает concurrent.futures.Future экземпляры, совместимые со многими другими библиотеками, включая asyncio.

Руководящие принципы программирования

Существуют определенные рекомендации и идиомы, которых следует придерживаться при использовании multiprocessing.

Все методы запуска

Следующее относится ко всем методам запуска.

Избегайте общего состояния

Насколько это возможно, следует стараться избегать переноса больших объемов данных между процессами.

Вероятно, лучше всего придерживаться использования очередей или каналов связи для обмена данными между процессами, а не использовать примитивы синхронизации более низкого уровня.

Способность к травлению

Убедитесь, что аргументы методов прокси-серверов доступны для выбора.

Потокобезопасность прокси-серверов

Не используйте прокси-объект более чем из одного потока, если вы не защитили его блокировкой.

(Никогда не возникает проблем с разными процессами, использующими один и тот же прокси-сервер.)

Присоединение к процессам-зомби

В Unix, когда процесс завершается, но не был присоединен, он становится зомби. Их никогда не должно быть очень много, потому что каждый раз, когда запускается новый процесс (или вызывается active_children()), все завершенные процессы, которые еще не были присоединены, будут присоединены. Кроме того, вызов завершенного процесса Process.is_alive присоединится к процессу. Даже в этом случае, вероятно, будет хорошей практикой явно присоединяться ко всем процессам, которые вы запускаете.

Лучше унаследовать, чем мариновать/не мариновать

При использовании методов spawn или forkserver start многие типы из multiprocessing должны быть доступны для выбора, чтобы дочерние процессы могли их использовать. Однако, как правило, следует избегать отправки общих объектов другим процессам с использованием каналов или очередей. Вместо этого вы должны организовать программу таким образом, чтобы процесс, которому требуется доступ к общему ресурсу, созданному в другом месте, мог унаследовать его от процесса-предка.

Избегайте завершения процессов

Использование метода Process.terminate для остановки процесса может привести к тому, что любые общие ресурсы (такие как блокировки, семафоры, каналы и очереди), используемые в данный момент процессом, станут неработоспособными или недоступными для других процессов.

Поэтому, вероятно, лучше всего использовать Process.terminate только для процессов, которые никогда не используют какие-либо общие ресурсы.

Объединение процессов, использующих очереди

Имейте в виду, что процесс, который поместил элементы в очередь, будет ожидать завершения до тех пор, пока все буферизованные элементы не будут переданы потоком «feeder» в базовый канал. (Дочерний процесс может вызвать Queue.cancel_join_thread метод очереди, чтобы избежать такого поведения.)

Это означает, что всякий раз, когда вы используете очередь, вам необходимо убедиться, что все элементы, которые были помещены в очередь, в конечном итоге будут удалены до присоединения процесса. В противном случае вы не можете быть уверены, что процессы, которые поместили элементы в очередь, завершатся. Помните также, что недемонические процессы будут подключены автоматически.

Примером, который приведет к взаимоблокировке, является следующий:

from multiprocessing import Process, Queue

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(target=f, args=(queue,))
    p.start()
    p.join()                    # this deadlocks
    obj = queue.get()

Исправлением здесь было бы поменять местами последние две строки (или просто удалить строку p.join()).

Явно передавать ресурсы дочерним процессам

В Unix, использующем метод fork start, дочерний процесс может использовать общий ресурс, созданный в родительском процессе с использованием глобального ресурса. Однако лучше передать объект в качестве аргумента конструктору дочернего процесса.

Помимо того, что код (потенциально) совместим с Windows и другими методами запуска, это также гарантирует, что, пока дочерний процесс все еще активен, объект не будет собираться мусором в родительском процессе. Это может быть важно, если какой-либо ресурс освобождается, когда объект собирается мусором в родительском процессе.

Так, например

from multiprocessing import Process, Lock

def f():
    ... do something using "lock" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f).start()

следует переписать следующим образом

from multiprocessing import Process, Lock

def f(l):
    ... do something using "l" ...

if __name__ == '__main__':
    lock = Lock()
    for i in range(10):
        Process(target=f, args=(lock,)).start()

Остерегайтесь замены sys.stdin на «объект, подобный файлу».

multiprocessing первоначально вызывался безоговорочно:

os.close(sys.stdin.fileno())

в методе multiprocessing.Process._bootstrap() — это привело к проблемам с процессами в процессах. Это было изменено на:

sys.stdin.close()
sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)

Это решает фундаментальную проблему, связанную с тем, что процессы сталкиваются друг с другом, что приводит к ошибочному файловому дескриптору, но представляет потенциальную опасность для приложений, которые заменяют sys.stdin() «файлоподобным объектом» с буферизацией выходных данных. Эта опасность заключается в том, что если несколько процессов вызовут close() для этого файлообразного объекта, это может привести к многократному сбросу одних и тех же данных в объект, что приведет к повреждению.

Если вы пишете объект, подобный файлу, и реализуете свое собственное кэширование, вы можете сделать его безопасным для разветвления, сохраняя pid всякий раз, когда вы добавляете его в кэш, и удаляя кэш при изменении pid. Например:

@property
def cache(self):
    pid = os.getpid()
    if pid != self._pid:
        self._pid = pid
        self._cache = []
    return self._cache

Для получения дополнительной информации смотрите bpo-5155, bpo-5313 и bpo-5331

Методы запуска spawn и forkserver

Существует несколько дополнительных ограничений, которые не применяются к методу запуска fork.

Большая разборчивость в выборе

Убедитесь, что все аргументы Process.__init__() доступны для выбора. Кроме того, если вы создаете подкласс Process, убедитесь, что экземпляры будут доступны для выбора при вызове метода Process.start.

Глобальные переменные

Имейте в виду, что если код, запущенный в дочернем процессе, пытается получить доступ к глобальной переменной, то значение, которое он видит (если оно есть), может отличаться от значения в родительском процессе на момент вызова Process.start.

Однако глобальные переменные, которые являются просто константами уровня модуля, не вызывают никаких проблем.

Безопасный импорт основного модуля

Убедитесь, что основной модуль может быть безопасно импортирован новым интерпретатором Python без возникновения непредвиденных побочных эффектов (таких как запуск нового процесса).

Например, при использовании метода spawn или forkserver start запуск следующего модуля завершится ошибкой с RuntimeError:

from multiprocessing import Process

def foo():
    print('hello')

p = Process(target=foo)
p.start()

Вместо этого следует защитить «точку входа» программы, используя if __name__ == '__main__': следующим образом:

from multiprocessing import Process, freeze_support, set_start_method

def foo():
    print('hello')

if __name__ == '__main__':
    freeze_support()
    set_start_method('spawn')
    p = Process(target=foo)
    p.start()

(Строка freeze_support() может быть опущена, если программа будет запущена нормально, а не заморожена.)

Это позволяет недавно созданному интерпретатору Python безопасно импортировать модуль и затем запустить функцию модуля foo().

Аналогичные ограничения применяются, если в главном модуле создан пул или менеджер.

Примеры

Демонстрация того, как создавать и использовать индивидуальные менеджеры и прокси-серверы:

from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator

##

class Foo:
    def f(self):
        print('you called Foo.f()')
    def g(self):
        print('you called Foo.g()')
    def _h(self):
        print('you called Foo._h()')

# A simple generator function
def baz():
    for i in range(10):
        yield i*i

# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
    _exposed_ = ['__next__']
    def __iter__(self):
        return self
    def __next__(self):
        return self._callmethod('__next__')

# Function to return the operator module
def get_operator_module():
    return operator

##

class MyManager(BaseManager):
    pass

# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)

# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))

# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)

# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)

##

def test():
    manager = MyManager()
    manager.start()

    print('-' * 20)

    f1 = manager.Foo1()
    f1.f()
    f1.g()
    assert not hasattr(f1, '_h')
    assert sorted(f1._exposed_) == sorted(['f', 'g'])

    print('-' * 20)

    f2 = manager.Foo2()
    f2.g()
    f2._h()
    assert not hasattr(f2, 'f')
    assert sorted(f2._exposed_) == sorted(['g', '_h'])

    print('-' * 20)

    it = manager.baz()
    for i in it:
        print('<%d>' % i, end=' ')
    print()

    print('-' * 20)

    op = manager.operator()
    print('op.add(23, 45) =', op.add(23, 45))
    print('op.pow(2, 94) =', op.pow(2, 94))
    print('op._exposed_ =', op._exposed_)

##

if __name__ == '__main__':
    freeze_support()
    test()

Используя Pool:

import multiprocessing
import time
import random
import sys

#
# Functions used by test code
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % (
        multiprocessing.current_process().name,
        func.__name__, args, result
        )

def calculatestar(args):
    return calculate(*args)

def mul(a, b):
    time.sleep(0.5 * random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5 * random.random())
    return a + b

def f(x):
    return 1.0 / (x - 5.0)

def pow3(x):
    return x ** 3

def noop(x):
    pass

#
# Test code
#

def test():
    PROCESSES = 4
    print('Creating pool with %d processes\n' % PROCESSES)

    with multiprocessing.Pool(PROCESSES) as pool:
        #
        # Tests
        #

        TASKS = [(mul, (i, 7)) for i in range(10)] + \
                [(plus, (i, 8)) for i in range(10)]

        results = [pool.apply_async(calculate, t) for t in TASKS]
        imap_it = pool.imap(calculatestar, TASKS)
        imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)

        print('Ordered results using pool.apply_async():')
        for r in results:
            print('\t', r.get())
        print()

        print('Ordered results using pool.imap():')
        for x in imap_it:
            print('\t', x)
        print()

        print('Unordered results using pool.imap_unordered():')
        for x in imap_unordered_it:
            print('\t', x)
        print()

        print('Ordered results using pool.map() --- will block till complete:')
        for x in pool.map(calculatestar, TASKS):
            print('\t', x)
        print()

        #
        # Test error handling
        #

        print('Testing error handling:')

        try:
            print(pool.apply(f, (5,)))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.apply()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(pool.map(f, list(range(10))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.map()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(list(pool.imap(f, list(range(10)))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from list(pool.imap())')
        else:
            raise AssertionError('expected ZeroDivisionError')

        it = pool.imap(f, list(range(10)))
        for i in range(10):
            try:
                x = next(it)
            except ZeroDivisionError:
                if i == 5:
                    pass
            except StopIteration:
                break
            else:
                if i == 5:
                    raise AssertionError('expected ZeroDivisionError')

        assert i == 9
        print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
        print()

        #
        # Testing timeouts
        #

        print('Testing ApplyResult.get() with timeout:', end=' ')
        res = pool.apply_async(calculate, TASKS[0])
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % res.get(0.02))
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()

        print('Testing IMapIterator.next() with timeout:', end=' ')
        it = pool.imap(calculatestar, TASKS)
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % it.next(0.02))
            except StopIteration:
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()


if __name__ == '__main__':
    multiprocessing.freeze_support()
    test()

Пример, показывающий, как использовать очереди для передачи задач в набор рабочих процессов и сбора результатов:

import time
import random

from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

#
# Function used to calculate result
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)

#
# Functions referenced by tasks
#

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

#
#
#

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # Get and print results
    print('Unordered results:')
    for i in range(len(TASKS1)):
        print('\t', done_queue.get())

    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)

    # Get and print some more results
    for i in range(len(TASKS2)):
        print('\t', done_queue.get())

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')


if __name__ == '__main__':
    freeze_support()
    test()
Вернуться на верх