Потоки

Исходный код: Lib/asyncio/streams.py.


Потоки - это высокоуровневые примитивы async/await-ready для работы с сетевыми соединениями. Потоки позволяют отправлять и получать данные без использования обратных вызовов или низкоуровневых протоколов и транспортов.

Вот пример клиента TCP-эха, написанного с использованием потоков asyncio:

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))

См. также раздел Examples ниже.

Функции потока

Для создания потоков и работы с ними можно использовать следующие функции asyncio верхнего уровня:

coroutine asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, happy_eyeballs_delay=None, interleave=None)

Устанавливает сетевое соединение и возвращает пару объектов (reader, writer).

Возвращаемые объекты reader и writer являются экземплярами классов StreamReader и StreamWriter.

limit определяет предельный размер буфера, используемый возвращаемым экземпляром StreamReader. По умолчанию limit установлен на 64 KiB.

Остальные аргументы передаются непосредственно в loop.create_connection().

Изменено в версии 3.7: Добавлен параметр ssl_handshake_timeout.

Добавлено в версии 3.8: Добавлены параметры happy_eyeballs_delay и interleave.

Изменено в версии 3.10: Удален параметр loop.

coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)

Запустите сервер сокетов.

Обратный вызов client_connected_cb вызывается каждый раз, когда устанавливается новое клиентское соединение. В качестве двух аргументов он получает пару (reader, writer), экземпляры классов StreamReader и StreamWriter.

client_connected_cb может быть обычным callable или coroutine function; если это функция coroutine, она будет автоматически запланирована как Task.

limit определяет предельный размер буфера, используемый возвращаемым экземпляром StreamReader. По умолчанию limit установлен на 64 KiB.

Остальные аргументы передаются непосредственно в loop.create_server().

Изменено в версии 3.7: Добавлены параметры ssl_handshake_timeout и start_serving.

Изменено в версии 3.10: Удален параметр loop.

Сокеты Unix

coroutine asyncio.open_unix_connection(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)

Установите соединение с сокетом Unix и верните пару (reader, writer).

Аналогичен open_connection(), но работает на сокетах Unix.

См. также документацию по loop.create_unix_connection().

Availability: Unix.

Изменено в версии 3.7: Добавлен параметр ssl_handshake_timeout. Параметр path теперь может быть path-like object.

Изменено в версии 3.10: Удален параметр loop.

coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)

Запустите сервер сокетов Unix.

Аналогичен start_server(), но работает с сокетами Unix.

См. также документацию по loop.create_unix_server().

Availability: Unix.

Изменено в версии 3.7: Добавлены параметры ssl_handshake_timeout и start_serving. Параметр path теперь может быть path-like object.

Изменено в версии 3.10: Удален параметр loop.

StreamReader

class asyncio.StreamReader

Представляет объект читателя, который предоставляет API для чтения данных из потока IO.

Не рекомендуется инстанцировать объекты StreamReader напрямую; вместо этого используйте open_connection() и start_server().

coroutine read(n=- 1)

Прочитать до n байт. Если n не предоставлено или установлено в -1, читайте до EOF и верните все прочитанные байты.

Если был получен EOF и внутренний буфер пуст, верните пустой объект bytes.

coroutine readline()

Прочитать одну строку, где «строка» - это последовательность байт, заканчивающаяся на \n.

Если получен EOF и \n не был найден, метод возвращает частично прочитанные данные.

Если получен EOF и внутренний буфер пуст, возвращается пустой объект bytes.

coroutine readexactly(n)

Прочитать ровно n байт.

Вызовите сообщение IncompleteReadError, если EOF будет достигнуто до того, как n будет прочитано. Используйте атрибут IncompleteReadError.partial для получения частично прочитанных данных.

coroutine readuntil(separator=b'\n')

Считывайте данные из потока, пока не будет найден сепаратор.

В случае успеха данные и разделитель будут удалены из внутреннего буфера (потреблены). Возвращаемые данные будут содержать разделитель в конце.

Если объем прочитанных данных превышает сконфигурированный лимит потока, возникает исключение LimitOverrunError, данные остаются во внутреннем буфере и могут быть прочитаны снова.

Если EOF достигается до того, как будет найден полный разделитель, возникает исключение IncompleteReadError, и внутренний буфер сбрасывается. Атрибут IncompleteReadError.partial может содержать часть разделителя.

Добавлено в версии 3.5.2.

at_eof()

Возвращает True, если буфер пуст и был вызван feed_eof().

StreamWriter

class asyncio.StreamWriter

Представляет объект writer, который предоставляет API для записи данных в поток IO.

Не рекомендуется инстанцировать объекты StreamWriter напрямую; вместо этого используйте open_connection() и start_server().

write(data)

Метод пытается немедленно записать данные в базовый сокет. Если это не удается, данные ставятся в очередь во внутренний буфер записи до тех пор, пока их нельзя будет отправить.

Этот метод следует использовать вместе с методом drain():

stream.write(data)
await stream.drain()
writelines(data)

Метод немедленно записывает список (или любое итеративное число) байтов в базовый сокет. Если это не удается, данные ставятся в очередь во внутренний буфер записи до тех пор, пока их нельзя будет отправить.

Этот метод следует использовать вместе с методом drain():

stream.writelines(lines)
await stream.drain()
close()

Метод закрывает поток и базовый сокет.

Этот метод следует использовать вместе с методом wait_closed():

stream.close()
await stream.wait_closed()
can_write_eof()

Возвращает True, если базовый транспорт поддерживает метод write_eof(), False в противном случае.

write_eof()

Закройте конец потока записи после того, как буферизованные данные записи будут удалены.

transport

Возвращает базовый транспорт asyncio.

get_extra_info(name, default=None)

Доступ к необязательной транспортной информации; подробности см. в BaseTransport.get_extra_info().

coroutine drain()

Дождитесь подходящего момента для возобновления записи в поток. Пример:

writer.write(data)
await writer.drain()

Это метод управления потоком, который взаимодействует с базовым буфером записи IO. Когда размер буфера достигает верхней отметки, drain() блокирует до тех пор, пока размер буфера не уменьшится до нижней отметки и запись можно будет возобновить. Когда ждать нечего, drain() возвращается немедленно.

is_closing()

Возвращает True, если поток закрыт или находится в процессе закрытия.

Добавлено в версии 3.7.

coroutine wait_closed()

Подождите, пока поток не будет закрыт.

Следует вызывать после close(), чтобы дождаться закрытия базового соединения.

Добавлено в версии 3.7.

Примеры

Эхо-клиент TCP с использованием потоков

TCP эхо-клиент, использующий функцию asyncio.open_connection():

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()

asyncio.run(tcp_echo_client('Hello World!'))

См.также

В примере TCP echo client protocol используется низкоуровневый метод loop.create_connection().

Эхо-сервер TCP с использованием потоков

TCP эхо-сервер с использованием функции asyncio.start_server():

import asyncio

async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')

    print(f"Received {message!r} from {addr!r}")

    print(f"Send: {message!r}")
    writer.write(data)
    await writer.drain()

    print("Close the connection")
    writer.close()

async def main():
    server = await asyncio.start_server(
        handle_echo, '127.0.0.1', 8888)

    addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
    print(f'Serving on {addrs}')

    async with server:
        await server.serve_forever()

asyncio.run(main())

См.также

В примере TCP echo server protocol используется метод loop.create_server().

Получить заголовки HTTP

Простой пример запроса HTTP-заголовков URL, переданного в командной строке:

import asyncio
import urllib.parse
import sys

async def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    if url.scheme == 'https':
        reader, writer = await asyncio.open_connection(
            url.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(
            url.hostname, 80)

    query = (
        f"HEAD {url.path or '/'} HTTP/1.0\r\n"
        f"Host: {url.hostname}\r\n"
        f"\r\n"
    )

    writer.write(query.encode('latin-1'))
    while True:
        line = await reader.readline()
        if not line:
            break

        line = line.decode('latin1').rstrip()
        if line:
            print(f'HTTP header> {line}')

    # Ignore the body, close the socket
    writer.close()

url = sys.argv[1]
asyncio.run(print_http_headers(url))

Использование:

python example.py http://example.com/path/page.html

или с помощью HTTPS:

python example.py https://example.com/path/page.html

Зарегистрируйте открытый сокет для ожидания данных с помощью потоков

Короутин, ожидающий, пока сокет не получит данные с помощью функции open_connection():

import asyncio
import socket

async def wait_for_data():
    # Get a reference to the current event loop because
    # we want to access low-level APIs.
    loop = asyncio.get_running_loop()

    # Create a pair of connected sockets.
    rsock, wsock = socket.socketpair()

    # Register the open socket to wait for data.
    reader, writer = await asyncio.open_connection(sock=rsock)

    # Simulate the reception of data from the network
    loop.call_soon(wsock.send, 'abc'.encode())

    # Wait for data
    data = await reader.read(100)

    # Got data, we are done: close the socket
    print("Received:", data.decode())
    writer.close()

    # Close the second socket
    wsock.close()

asyncio.run(wait_for_data())

См.также

В примере register an open socket to wait for data using a protocol используется низкоуровневый протокол и метод loop.create_connection().

В примере watch a file descriptor for read events используется низкоуровневый метод loop.add_reader() для просмотра дескриптора файла.

Вернуться на верх