Разработка с помощью asyncio

Асинхронное программирование отличается от классического «последовательного» программирования.

На этой странице перечислены распространенные ошибки и ловушки и объяснено, как их избежать.

Режим отладки

По умолчанию asyncio запускается в рабочем режиме. Для упрощения разработки в asyncio предусмотрен режим отладки.

Существует несколько способов включить режим отладки asyncio:

В дополнение к включению режима отладки, рассмотрите также:

  • установив уровень регистрации asyncio logger равным logging.DEBUG, например, следующий фрагмент кода может быть запущен при запуске приложения:

    logging.basicConfig(level=logging.DEBUG)
    
  • настройка модуля warnings для отображения предупреждений ResourceWarning. Один из способов сделать это - использовать параметр командной строки -W default.

Когда включен режим отладки:

  • asyncio проверяет наличие coroutines that were not awaited и регистрирует их; это устраняет проблему «забытого ожидания».

  • Многие непоточнобезопасные асинхронные API (такие как методы loop.call_soon() и loop.call_at()) вызывают исключение, если они вызываются из неправильного потока.

  • Время выполнения селектора ввода-вывода регистрируется, если выполнение операции ввода-вывода занимает слишком много времени.

  • Регистрируются обратные вызовы, которые длятся более 100 миллисекунд. Атрибут loop.slow_callback_duration может использоваться для установки минимальной продолжительности выполнения в секундах, которая считается «медленной».

Параллелизм и многопоточность

Цикл обработки событий выполняется в потоке (обычно основном потоке) и выполняет все обратные вызовы и задачи в своем потоке. Пока задача выполняется в цикле обработки событий, никакие другие задачи не могут выполняться в том же потоке. Когда задача выполняет выражение await, выполнение задачи приостанавливается, и цикл обработки событий выполняет следующую задачу.

Чтобы запланировать callback из другого потока операционной системы, следует использовать метод loop.call_soon_threadsafe(). Пример:

loop.call_soon_threadsafe(callback, *args)

Почти все объекты asyncio не являются потокобезопасными, что обычно не является проблемой, если только нет кода, который работает с ними извне задачи или обратного вызова. Если есть необходимость в таком коде для вызова низкоуровневого asyncio API, следует использовать метод loop.call_soon_threadsafe(), например:

loop.call_soon_threadsafe(fut.cancel)

Чтобы запланировать объект сопрограммы из другого потока операционной системы, следует использовать функцию run_coroutine_threadsafe(). Она возвращает concurrent.futures.Future для доступа к результату:

async def coro_func():
     return await asyncio.sleep(1, 42)

# Later in another OS thread:

future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
# Wait for the result:
result = future.result()

Для обработки сигналов цикл обработки событий должен быть запущен в основном потоке.

Метод loop.run_in_executor() можно использовать с concurrent.futures.ThreadPoolExecutor для выполнения блокирующего кода в другом потоке операционной системы, не блокируя поток операционной системы, в котором выполняется цикл обработки событий.

В настоящее время нет способа запланировать выполнение сопрограмм или обратных вызовов непосредственно из другого процесса (например, запущенного с помощью multiprocessing). В разделе Цикл обработки событий перечислены API, которые могут считывать данные из каналов и просматривать файловые дескрипторы, не блокируя цикл обработки событий. Кроме того, asyncio Subprocess API-интерфейсы предоставляют способ запуска процесса и взаимодействия с ним из цикла обработки событий. Наконец, вышеупомянутый метод loop.run_in_executor() также может быть использован с concurrent.futures.ProcessPoolExecutor для выполнения кода в другом процессе.

Запуск блокирующего кода

Блокирующий (зависящий от процессора) код не должен вызываться напрямую. Например, если функция выполняет вычисления с интенсивным использованием процессора в течение 1 секунды, все одновременные асинхронные задачи и операции ввода-вывода будут отложены на 1 секунду.

Executor можно использовать для запуска задачи в другом потоке или даже в другом процессе, чтобы избежать блокировки потока операционной системы циклом обработки событий. Более подробную информацию смотрите в методе loop.run_in_executor().

Регистрация

asyncio использует модуль logging, и все протоколирование выполняется с помощью регистратора "asyncio".

Уровень регистрации по умолчанию:py:const:logging.INFO, который можно легко настроить:

logging.getLogger("asyncio").setLevel(logging.WARNING)

Ведение журнала по сети может заблокировать цикл обработки событий. Рекомендуется использовать отдельный поток для обработки журналов или использовать неблокирующий ввод-вывод. Например, смотрите Работа с обработчиками, которые блокируют.

Обнаружение неожиданных сопрограмм

Когда вызывается функция сопрограммы, но она не ожидается (например, coro() вместо await coro()) или сопрограмма не запланирована с помощью asyncio.create_task(), asyncio выдает RuntimeWarning:

import asyncio

async def test():
    print("never scheduled")

async def main():
    test()

asyncio.run(main())

Выход:

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
  test()

Вывод в режиме отладки:

test.py:7: RuntimeWarning: coroutine 'test' was never awaited
Coroutine created at (most recent call last)
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

  < .. >

  File "../t.py", line 7, in main
    test()
  test()

Обычное решение заключается в том, чтобы либо дождаться запуска сопрограммы, либо вызвать функцию asyncio.create_task():

async def main():
    await test()

Обнаружение никогда не извлекаемых исключений

Если вызывается Future.set_exception(), но объект Future никогда не ожидается, исключение никогда не будет передано в пользовательский код. В этом случае asyncio выдаст сообщение журнала, когда объект Future будет собран как мусор.

Пример необработанного исключения:

import asyncio

async def bug():
    raise Exception("not consumed")

async def main():
    asyncio.create_task(bug())

asyncio.run(main())

Выход:

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
  exception=Exception('not consumed')>

Traceback (most recent call last):
  File "test.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed

Enable the debug mode чтобы получить обратную трассировку, где была создана задача:

asyncio.run(main(), debug=True)

Вывод в режиме отладки:

Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
    exception=Exception('not consumed') created at asyncio/tasks.py:321>

source_traceback: Object created at (most recent call last):
  File "../t.py", line 9, in <module>
    asyncio.run(main(), debug=True)

< .. >

Traceback (most recent call last):
  File "../t.py", line 4, in bug
    raise Exception("not consumed")
Exception: not consumed
Вернуться на верх