Параллелизм, конкурентность и AsyncIO в Python — на примере

Конкурентность против параллелизма

Конкурентность и параллелизм - похожие термины, но это не одно и то же.

Конкуренция - это возможность одновременного выполнения нескольких задач на ЦП. Задачи могут запускаться, выполняться и завершаться в перекрывающиеся периоды времени. В случае одного процессора несколько задач выполняются с помощью переключения контекстов, когда состояние процесса сохраняется, чтобы его можно было вызвать и выполнить позже.

Параллелизм - это возможность одновременного выполнения нескольких задач на нескольких ядрах процессора.

Хотя они могут увеличить скорость работы вашего приложения, параллелизм и параллельность не следует использовать повсеместно. Вариант использования зависит от того, является ли задача процессорозависимой или IO-зависимой.

Задачи, которые ограничиваются центральным процессором, привязаны к процессору. Например, математические вычисления привязаны к процессору, поскольку вычислительная мощность увеличивается с ростом числа процессоров компьютера. Параллелизм предназначен для задач, ограниченных центральным процессором. Теоретически, если задача разделена на n подзадач, каждая из этих n задач может выполняться параллельно, чтобы эффективно сократить время до 1/n исходной непараллельной задачи. Параллельность предпочтительна для задач, связанных с IO, поскольку вы можете заниматься чем-то другим, пока происходит выборка ресурсов IO.

Лучшим примером задач с привязкой к процессору является наука о данных. Специалисты по изучению данных работают с огромными массивами данных. Для предварительной обработки данных они могут разделить их на несколько пакетов и запускать их параллельно, эффективно сокращая общее время обработки. Увеличение количества ядер приводит к ускорению обработки данных.

Web scraping является IO-bound. Поскольку эта задача мало влияет на процессор, так как большая часть времени тратится на чтение из сети и запись в сеть. Другие распространенные IO-bound задачи включают вызовы баз данных, чтение и запись файлов на диск. Веб-приложения, такие как Django и Flask, являются IO-bound приложениями.

Если вам интересно узнать больше о различиях между потоками, мультипроцессингом и async в Python, ознакомьтесь со статьей Speeding Up Python with Concurrency, Parallelism, and asyncio.

Сценарий

Давайте рассмотрим, как ускорить выполнение следующих задач:

# tasks.py

import os
from multiprocessing import current_process
from threading import current_thread

import requests


def make_request(num):
    # io-bound

    pid = os.getpid()
    thread_name = current_thread().name
    process_name = current_process().name
    print(f"{pid} - {process_name} - {thread_name}")

    requests.get("https://httpbin.org/ip")


async def make_request_async(num, client):
    # io-bound

    pid = os.getpid()
    thread_name = current_thread().name
    process_name = current_process().name
    print(f"{pid} - {process_name} - {thread_name}")

    await client.get("https://httpbin.org/ip")


def get_prime_numbers(num):
    # cpu-bound

    pid = os.getpid()
    thread_name = current_thread().name
    process_name = current_process().name
    print(f"{pid} - {process_name} - {thread_name}")

    numbers = []

    prime = [True for i in range(num + 1)]
    p = 2

    while p * p <= num:
        if prime[p]:
            for i in range(p * 2, num + 1, p):
                prime[i] = False
        p += 1

    prime[0] = False
    prime[1] = False

    for p in range(num + 1):
        if prime[p]:
            numbers.append(p)

    return numbers

Все примеры кода в этом руководстве можно найти в репозитории parallel-concurrent-examples-python.

Примечания:

  • make_request делает HTTP запрос на https://httpbin.org/ip X количество раз.
  • make_request_async делает тот же самый HTTP запрос асинхронно с HTTPX.
  • get_prime_numbers вычисляет простые числа методом Решето Эратосфена от двух до заданного предела.

Для ускорения выполнения вышеуказанных задач мы будем использовать следующие библиотеки из стандартной библиотеки:

  • поточность для одновременного выполнения задач
  • multiprocessing для параллельного выполнения задач
  • concurrent.futures для одновременного и параллельного выполнения задач из одного интерфейса
  • asyncio для параллельного выполнения задач с корутинами, управляемыми интерпретатором Python
Library Class/Method Processing Type
threading Thread concurrent
concurrent.futures ThreadPoolExecutor concurrent
asyncio gather concurrent (via coroutines)
multiprocessing Pool parallel
concurrent.futures ProcessPoolExecutor parallel

IO-bound Operation

Опять же, связанные с IO задачи тратят больше времени на IO, чем на CPU.

Поскольку веб-скрейпинг связан с IO, мы должны использовать потоки для ускорения обработки, поскольку получение HTML (IO) происходит медленнее, чем его разбор (CPU).

Сценарий:  Как ускорить скрипт веб-скрейпинга и краулинга на базе Python?

Пример синхронизации

Начнем с эталона.

# io-bound_sync.py

import time

from tasks import make_request


def main():
    for num in range(1, 101):
        make_request(num)


if __name__ == "__main__":
    start_time = time.perf_counter()

    main()

    end_time = time.perf_counter()
    print(f"Elapsed run time: {end_time - start_time} seconds.")

Здесь мы сделали 100 HTTP-запросов с помощью функции make_request. Поскольку запросы происходят синхронно, каждая задача выполняется последовательно.

Elapsed run time: 15.710984757 seconds.

Итак, это примерно 0,16 секунды на один запрос.

Пример потоковой обработки

# io-bound_concurrent_1.py

import threading
import time

from tasks import make_request


def main():
    tasks = []

    for num in range(1, 101):
        tasks.append(threading.Thread(target=make_request, args=(num,)))
        tasks[-1].start()

    for task in tasks:
        task.join()


if __name__ == "__main__":
    start_time = time.perf_counter()

    main()

    end_time = time.perf_counter()
    print(f"Elapsed run time: {end_time - start_time} seconds.")

Здесь одна и та же функция make_request вызывается 100 раз. На этот раз для создания потока для каждого запроса используется библиотека threading.

Elapsed run time: 1.020112515 seconds.

Общее время уменьшается с ~16с до ~1с.

Поскольку мы используем отдельные потоки для каждого запроса, вы можете задаться вопросом, почему все это не заняло ~0,16 с для завершения. Это дополнительное время - накладные расходы на управление потоками. Глобальная блокировка интерпретатора (GIL) в Python гарантирует, что только один поток одновременно использует байткод Python.

concurrent.futures Пример

# io-bound_concurrent_2.py

import time
from concurrent.futures import ThreadPoolExecutor, wait

from tasks import make_request


def main():
    futures = []

    with ThreadPoolExecutor() as executor:
        for num in range(1, 101):
            futures.append(executor.submit(make_request, num))

    wait(futures)


if __name__ == "__main__":
    start_time = time.perf_counter()

    main()

    end_time = time.perf_counter()
    print(f"Elapsed run time: {end_time - start_time} seconds.")

Здесь мы использовали concurrent.futures.ThreadPoolExecutor для достижения многопоточности. После создания всех фьючерсов/обещаний мы использовали wait, чтобы дождаться их завершения.

Elapsed run time: 1.340592231 seconds

concurrent.futures.ThreadPoolExecutor фактически является абстракцией вокруг библиотеки multithreading, что упрощает ее использование. В предыдущем примере мы назначили каждый запрос на поток, и в общей сложности было использовано 100 потоков. Но в ThreadPoolExecutor по умолчанию количество рабочих потоков равно min(32, os.cpu_count() + 4). ThreadPoolExecutor существует для того, чтобы облегчить процесс достижения многопоточности. Если вы хотите получить больший контроль над многопоточностью, используйте вместо этого библиотеку multithreading.

Пример AsyncIO

# io-bound_concurrent_3.py

import asyncio
import time

import httpx

from tasks import make_request_async


async def main():
    async with httpx.AsyncClient() as client:
        return await asyncio.gather(
            *[make_request_async(num, client) for num in range(1, 101)]
        )


if __name__ == "__main__":
    start_time = time.perf_counter()

    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

    end_time = time.perf_counter()
    elapsed_time = end_time - start_time
    print(f"Elapsed run time: {elapsed_time} seconds")

httpx используется здесь, поскольку requests не поддерживает асинхронные операции.

Здесь мы использовали asyncio для достижения параллелизма.

Elapsed run time: 0.553961068 seconds

asyncio быстрее, чем другие методы, потому что threading использует потоки ОС (операционной системы). Таким образом, потоки управляются ОС, и переключение потоков происходит с ее помощью. asyncio использует корутины, которые определяются интерпретатором Python. С помощью coroutines программа решает, когда оптимально переключать задачи. За это отвечает even_loop в asyncio.

Операция с ограничением вычислительной мощности

Сценарий: Как ускорить простой скрипт обработки данных?

Пример синхронизации

Опять же, давайте начнем с эталона.

# cpu-bound_sync.py

import time

from tasks import get_prime_numbers


def main():
    for num in range(1000, 16000):
        get_prime_numbers(num)


if __name__ == "__main__":
    start_time = time.perf_counter()

    main()

    end_time = time.perf_counter()
    print(f"Elapsed run time: {end_time - start_time} seconds.")

Здесь мы выполнили функцию get_prime_numbers для чисел от 1000 до 16000.

Elapsed run time: 17.863046316 seconds.

Пример многопроцессорной обработки

# cpu-bound_parallel_1.py

import time
from multiprocessing import Pool, cpu_count

from tasks import get_prime_numbers


def main():
    with Pool(cpu_count() - 1) as p:
        p.starmap(get_prime_numbers, zip(range(1000, 16000)))
        p.close()
        p.join()


if __name__ == "__main__":
    start_time = time.perf_counter()

    main()

    end_time = time.perf_counter()
    print(f"Elapsed run time: {end_time - start_time} seconds.")

Здесь мы использовали multiprocessing для вычисления простых чисел.

Elapsed run time: 2.9848740599999997 seconds.

concurrent.futures Пример

# cpu-bound_parallel_2.py

import time
from concurrent.futures import ProcessPoolExecutor, wait
from multiprocessing import cpu_count

from tasks import get_prime_numbers


def main():
    futures = []

    with ProcessPoolExecutor(cpu_count() - 1) as executor:
        for num in range(1000, 16000):
            futures.append(executor.submit(get_prime_numbers, num))

    wait(futures)


if __name__ == "__main__":
    start_time = time.perf_counter()

    main()

    end_time = time.perf_counter()
    print(f"Elapsed run time: {end_time - start_time} seconds.")

Здесь мы добились многопроцессорности с помощью concurrent.futures.ProcessPoolExecutor. Как только задания добавляются во фьючерсы, wait(futures) ожидает их завершения.

Elapsed run time: 4.452427557 seconds.

concurrent.futures.ProcessPoolExecutor является оберткой вокруг multiprocessing.Pool. Она имеет те же ограничения, что и ThreadPoolExecutor. Если вам нужен больший контроль над многопроцессорностью, используйте multiprocessing.Poolconcurrent.futures предоставляет абстракцию как над мультипроцессингом, так и над потоками, что позволяет легко переключаться между ними.

Заключение

Стоит отметить, что использование многопроцессорной обработки для выполнения функции make_request будет намного медленнее, чем потоковый вариант, так как процессы должны будут ждать ввода-вывода. Однако многопроцессорный подход будет быстрее, чем синхронный.

Аналогично, использование параллелизма для задач, привязанных к процессору, не стоит усилий по сравнению с параллелизмом.

При этом использование параллелизма или параллельности для выполнения скриптов усложняет работу. Ваш код будет труднее читать, тестировать и отлаживать, поэтому используйте их только в случае крайней необходимости для долго выполняющихся сценариев.

concurrent.futures - это то, с чего я обычно начинаю, поскольку-

  1. Легко переключаться туда-сюда между параллелизмом и параллельностью
  2. Зависимым библиотекам не нужно поддерживать asyncio (requests против httpx)
  3. Чище и легче читать по сравнению с другими подходами

Возьмите код из репозитория parallel-concurrent-examples-python на GitHub.

Вернуться на верх