Потоки

Исходный код: Lib/asyncio/streams.py


Потоки - это высокоуровневые асинхронные примитивы, готовые к работе в режиме ожидания, для работы с сетевыми подключениями. Потоки позволяют отправлять и получать данные без использования обратных вызовов или низкоуровневых протоколов и транспортных средств.

Вот пример клиента TCP echo, написанного с использованием потоков asyncio:

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))

Смотрите также раздел Examples ниже.

Потоковые функции

Для создания потоков и работы с ними можно использовать следующие функции asyncio верхнего уровня:

coroutine asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None)

Установите сетевое соединение и верните пару объектов (reader, writer).

Возвращаемые объекты reader и writer являются экземплярами классов StreamReader и StreamWriter.

limit определяет предельный размер буфера, используемый возвращаемым экземпляром StreamReader. По умолчанию значение limit равно 64 Кбайт.

Остальные аргументы передаются непосредственно в loop.create_connection().

Примечание

Аргумент sock передает права собственности на сокет созданному StreamWriter. Чтобы закрыть сокет, вызовите его метод close().

Изменено в версии 3.7: Добавлен параметр ssl_handshake_timeout.

Изменено в версии 3.8: Добавлены параметры happy_eyeballs_delay и interleave.

Изменено в версии 3.10: Удален параметр loop.

Изменено в версии 3.11: Добавлен параметр ssl_shutdown_timeout.

coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)

Запустите сервер сокетов.

Обратный вызов client_connected_cb вызывается всякий раз, когда устанавливается новое клиентское соединение. Он получает пару (reader, writer) в качестве двух аргументов, экземпляров классов StreamReader и StreamWriter.

client_connected_cb может быть простым вызываемым элементом или coroutine function; если это функция сопрограммы, она будет автоматически запланирована как Task.

limit определяет предельный размер буфера, используемый возвращаемым экземпляром StreamReader. По умолчанию значение limit равно 64 Кбайт.

Остальные аргументы передаются непосредственно в loop.create_server().

Примечание

Аргумент sock передает право собственности на сокет созданному серверу. Чтобы закрыть сокет, вызовите серверный метод close().

Изменено в версии 3.7: Добавлены параметры ssl_handshake_timeout и start_serving.

Изменено в версии 3.10: Удален параметр loop.

Изменено в версии 3.11: Добавлен параметр ssl_shutdown_timeout.

Сокеты Unix

coroutine asyncio.open_unix_connection(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)

Установите соединение с сокетом Unix и верните пару символов (reader, writer).

Аналогично open_connection(), но работает с сокетами Unix.

Смотрите также документацию по loop.create_unix_connection().

Примечание

Аргумент sock передает права собственности на сокет созданному StreamWriter. Чтобы закрыть сокет, вызовите его метод close().

Availability: Unix.

Изменено в версии 3.7: Добавлен параметр ssl_handshake_timeout. Параметром path теперь может быть path-like object

Изменено в версии 3.10: Удален параметр loop.

Изменено в версии 3.11: Добавлен параметр ssl_shutdown_timeout.

coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)

Запустите сервер сокетов Unix.

Аналогично start_server(), но работает с сокетами Unix.

Смотрите также документацию по loop.create_unix_server().

Примечание

Аргумент sock передает право собственности на сокет созданному серверу. Чтобы закрыть сокет, вызовите серверный метод close().

Availability: Unix.

Изменено в версии 3.7: Добавлены параметры ssl_handshake_timeout и start_serving. Параметром path теперь может быть path-like object.

Изменено в версии 3.10: Удален параметр loop.

Изменено в версии 3.11: Добавлен параметр ssl_shutdown_timeout.

Потоковое средство чтения

class asyncio.StreamReader

Представляет объект reader, который предоставляет API для чтения данных из потока ввода-вывода. Как asynchronous iterable, объект поддерживает оператор async for.

Не рекомендуется создавать экземпляры объектов StreamReader напрямую; вместо этого используйте open_connection() и start_server().

feed_eof()

Подтвердите выполнение EOF.

coroutine read(n=-1)

Считайте до n байт из потока.

Если n не указано или установлено в -1, считывайте до EOF, затем возвращайте все прочитанные bytes. Если EOF был получен, а внутренний буфер пуст, верните пустой объект bytes.

Если n равно 0, немедленно верните пустой объект bytes.

Если значение n положительное, верните не более n доступных bytes, как только во внутреннем буфере будет доступен хотя бы 1 байт. Если EOF получен до считывания любого байта, верните пустой объект bytes.

coroutine readline()

Прочитайте одну строку, где «строка» - это последовательность байтов, заканчивающаяся на \n.

Если EOF получен, а \n не найден, метод возвращает частично прочитанные данные.

Если получен EOF, а внутренний буфер пуст, верните пустой объект bytes.

coroutine readexactly(n)

Считайте ровно n байт.

Поднимите значение IncompleteReadError, если значение EOF будет достигнуто до того, как можно будет прочитать n. Используйте атрибут IncompleteReadError.partial, чтобы получить частично прочитанные данные.

coroutine readuntil(separator=b'\n')

Считывайте данные из потока до тех пор, пока не будет найден separator.

В случае успеха данные и разделитель будут удалены из внутреннего буфера (израсходованы). Возвращаемые данные будут содержать разделитель в конце.

Если объем считываемых данных превышает установленный лимит потока, возникает исключение LimitOverrunError, и данные остаются во внутреннем буфере и могут быть прочитаны снова.

Если значение EOF достигнуто до того, как найден полный разделитель, возникает исключение IncompleteReadError и внутренний буфер сбрасывается. Атрибут IncompleteReadError.partial может содержать часть разделителя.

Добавлено в версии 3.5.2.

at_eof()

Возвращает True, если буфер пуст и был вызван feed_eof().

Потоковый редактор

class asyncio.StreamWriter

Представляет объект записи, который предоставляет API-интерфейсы для записи данных в поток ввода-вывода.

Не рекомендуется создавать экземпляры объектов StreamWriter напрямую; вместо этого используйте open_connection() и start_server().

write(data)

Метод пытается немедленно записать данные в базовый сокет. Если это не удается, данные помещаются во внутренний буфер записи до тех пор, пока их нельзя будет отправить.

Этот метод следует использовать вместе с методом drain():

stream.write(data)
await stream.drain()
writelines(data)

Метод немедленно записывает список байтов (или любой другой повторяемый файл) в базовый сокет. Если это не удается, данные помещаются во внутренний буфер записи в очередь до тех пор, пока их нельзя будет отправить.

Этот метод следует использовать вместе с методом drain():

stream.writelines(lines)
await stream.drain()
close()

Метод закрывает поток и базовый сокет.

Этот метод следует использовать, хотя и не обязательно, вместе с методом wait_closed():

stream.close()
await stream.wait_closed()
can_write_eof()

Возвращает True, если базовый транспорт поддерживает метод write_eof(), False в противном случае.

write_eof()

Закройте конец записи потока после того, как буферизованные данные записи будут сброшены.

transport

Возвращает базовый транспорт asyncio.

get_extra_info(name, default=None)

Получите доступ к дополнительной информации о транспорте; более подробную информацию смотрите в разделе BaseTransport.get_extra_info().

coroutine drain()

Подождите, пока не придет время возобновить запись в поток. Пример:

writer.write(data)
await writer.drain()

Это метод управления потоком, который взаимодействует с базовым буфером ввода-вывода для записи. Когда размер буфера достигает максимального значения, функция drain() блокируется до тех пор, пока размер буфера не уменьшится до минимального значения, и запись может быть возобновлена. Когда ждать нечего, drain() немедленно возвращается.

coroutine start_tls(sslcontext, *, server_hostname=None, ssl_handshake_timeout=None)

Обновите существующее потоковое соединение до TLS.

Параметры:

  • sslcontext: сконфигурированный экземпляр SSLContext.

  • server_hostname: задает или переопределяет имя хоста, с которым будет сопоставляться сертификат целевого сервера.

  • ssl_handshake_timeout - это время в секундах, которое требуется для завершения подтверждения связи по протоколу TLS перед прерыванием соединения. 60.0 секунд, если None (по умолчанию).

Добавлено в версии 3.11.

is_closing()

Возвращает True, если поток закрыт или находится в процессе закрытия.

Добавлено в версии 3.7.

coroutine wait_closed()

Подождите, пока поток не закроется.

Должен вызываться после close(), чтобы дождаться закрытия базового соединения, гарантируя, что все данные были сброшены, например, перед выходом из программы.

Добавлено в версии 3.7.

Примеры

Эхо-клиент TCP, использующий потоки

Эхо-клиент TCP, использующий функцию asyncio.open_connection():

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))

См.также

В примере TCP echo client protocol используется низкоуровневый метод loop.create_connection().

Эхо-сервер TCP, использующий потоки

Эхо-сервер TCP, использующий функцию asyncio.start_server():

import asyncio

async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')

    print(f"Received {message!r} from {addr!r}")

    print(f"Send: {message!r}")
    writer.write(data)
    await writer.drain()

    print("Close the connection")
    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(
        handle_echo, '127.0.0.1', 8888)

    addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
    print(f'Serving on {addrs}')

    async with server:
        await server.serve_forever()

asyncio.run(main())

См.также

В примере TCP echo server protocol используется метод loop.create_server().

Получение HTTP-заголовков

Простой пример запроса HTTP-заголовков URL-адреса, передаваемого из командной строки:

import asyncio
import urllib.parse
import sys

async def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    if url.scheme == 'https':
        reader, writer = await asyncio.open_connection(
            url.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(
            url.hostname, 80)

    query = (
        f"HEAD {url.path or '/'} HTTP/1.0\r\n"
        f"Host: {url.hostname}\r\n"
        f"\r\n"
    )

    writer.write(query.encode('latin-1'))
    while True:
        line = await reader.readline()
        if not line:
            break

        line = line.decode('latin1').rstrip()
        if line:
            print(f'HTTP header> {line}')

    # Ignore the body, close the socket
    writer.close()
    await writer.wait_closed()

url = sys.argv[1]
asyncio.run(print_http_headers(url))

Использование:

python example.py http://example.com/path/page.html

или с помощью HTTPS:

python example.py https://example.com/path/page.html

Зарегистрируйте открытый сокет для ожидания данных с использованием потоков

Сопрограмма, ожидающая, пока сокет не получит данные с помощью функции open_connection():

import asyncio
import socket

async def wait_for_data():
    # Get a reference to the current event loop because
    # we want to access low-level APIs.
    loop = asyncio.get_running_loop()

    # Create a pair of connected sockets.
    rsock, wsock = socket.socketpair()

    # Register the open socket to wait for data.
    reader, writer = await asyncio.open_connection(sock=rsock)

    # Simulate the reception of data from the network
    loop.call_soon(wsock.send, 'abc'.encode())

    # Wait for data
    data = await reader.read(100)

    # Got data, we are done: close the socket
    print("Received:", data.decode())
    writer.close()
    await writer.wait_closed()

    # Close the second socket
    wsock.close()

asyncio.run(wait_for_data())

См.также

В примере register an open socket to wait for data using a protocol используется низкоуровневый протокол и метод loop.create_connection().

В примере watch a file descriptor for read events используется низкоуровневый метод loop.add_reader() для просмотра файлового дескриптора.

Вернуться на верх