Потоки¶
Исходный код: Lib/asyncio/streams.py.
Потоки - это высокоуровневые примитивы async/await-ready для работы с сетевыми соединениями. Потоки позволяют отправлять и получать данные без использования обратных вызовов или низкоуровневых протоколов и транспортов.
Вот пример клиента TCP-эха, написанного с использованием потоков asyncio:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))
См. также раздел Examples ниже.
Функции потока
Для создания потоков и работы с ними можно использовать следующие функции asyncio верхнего уровня:
-
coroutine
asyncio.
open_connection
(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, happy_eyeballs_delay=None, interleave=None)¶ Устанавливает сетевое соединение и возвращает пару объектов
(reader, writer)
.Возвращаемые объекты reader и writer являются экземплярами классов
StreamReader
иStreamWriter
.limit определяет предельный размер буфера, используемый возвращаемым экземпляром
StreamReader
. По умолчанию limit установлен на 64 KiB.Остальные аргументы передаются непосредственно в
loop.create_connection()
.Изменено в версии 3.7: Добавлен параметр ssl_handshake_timeout.
Добавлено в версии 3.8: Добавлены параметры happy_eyeballs_delay и interleave.
Изменено в версии 3.10: Удален параметр loop.
-
coroutine
asyncio.
start_server
(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, start_serving=True)¶ Запустите сервер сокетов.
Обратный вызов client_connected_cb вызывается каждый раз, когда устанавливается новое клиентское соединение. В качестве двух аргументов он получает пару
(reader, writer)
, экземпляры классовStreamReader
иStreamWriter
.client_connected_cb может быть обычным callable или coroutine function; если это функция coroutine, она будет автоматически запланирована как
Task
.limit определяет предельный размер буфера, используемый возвращаемым экземпляром
StreamReader
. По умолчанию limit установлен на 64 KiB.Остальные аргументы передаются непосредственно в
loop.create_server()
.Изменено в версии 3.7: Добавлены параметры ssl_handshake_timeout и start_serving.
Изменено в версии 3.10: Удален параметр loop.
Сокеты Unix
-
coroutine
asyncio.
open_unix_connection
(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None)¶ Установите соединение с сокетом Unix и верните пару
(reader, writer)
.Аналогичен
open_connection()
, но работает на сокетах Unix.См. также документацию по
loop.create_unix_connection()
.Availability: Unix.
Изменено в версии 3.7: Добавлен параметр ssl_handshake_timeout. Параметр path теперь может быть path-like object.
Изменено в версии 3.10: Удален параметр loop.
-
coroutine
asyncio.
start_unix_server
(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, start_serving=True)¶ Запустите сервер сокетов Unix.
Аналогичен
start_server()
, но работает с сокетами Unix.См. также документацию по
loop.create_unix_server()
.Availability: Unix.
Изменено в версии 3.7: Добавлены параметры ssl_handshake_timeout и start_serving. Параметр path теперь может быть path-like object.
Изменено в версии 3.10: Удален параметр loop.
StreamReader¶
-
class
asyncio.
StreamReader
¶ Представляет объект читателя, который предоставляет API для чтения данных из потока IO.
Не рекомендуется инстанцировать объекты StreamReader напрямую; вместо этого используйте
open_connection()
иstart_server()
.-
coroutine
read
(n=- 1)¶ Прочитать до n байт. Если n не предоставлено или установлено в
-1
, читайте до EOF и верните все прочитанные байты.Если был получен EOF и внутренний буфер пуст, верните пустой объект
bytes
.
-
coroutine
readline
()¶ Прочитать одну строку, где «строка» - это последовательность байт, заканчивающаяся на
\n
.Если получен EOF и
\n
не был найден, метод возвращает частично прочитанные данные.Если получен EOF и внутренний буфер пуст, возвращается пустой объект
bytes
.
-
coroutine
readexactly
(n)¶ Прочитать ровно n байт.
Вызовите сообщение
IncompleteReadError
, если EOF будет достигнуто до того, как n будет прочитано. Используйте атрибутIncompleteReadError.partial
для получения частично прочитанных данных.
-
coroutine
readuntil
(separator=b'\n')¶ Считывайте данные из потока, пока не будет найден сепаратор.
В случае успеха данные и разделитель будут удалены из внутреннего буфера (потреблены). Возвращаемые данные будут содержать разделитель в конце.
Если объем прочитанных данных превышает сконфигурированный лимит потока, возникает исключение
LimitOverrunError
, данные остаются во внутреннем буфере и могут быть прочитаны снова.Если EOF достигается до того, как будет найден полный разделитель, возникает исключение
IncompleteReadError
, и внутренний буфер сбрасывается. АтрибутIncompleteReadError.partial
может содержать часть разделителя.Добавлено в версии 3.5.2.
-
at_eof
()¶ Возвращает
True
, если буфер пуст и был вызванfeed_eof()
.
-
coroutine
StreamWriter¶
-
class
asyncio.
StreamWriter
¶ Представляет объект writer, который предоставляет API для записи данных в поток IO.
Не рекомендуется инстанцировать объекты StreamWriter напрямую; вместо этого используйте
open_connection()
иstart_server()
.-
write
(data)¶ Метод пытается немедленно записать данные в базовый сокет. Если это не удается, данные ставятся в очередь во внутренний буфер записи до тех пор, пока их нельзя будет отправить.
Этот метод следует использовать вместе с методом
drain()
:stream.write(data) await stream.drain()
-
writelines
(data)¶ Метод немедленно записывает список (или любое итеративное число) байтов в базовый сокет. Если это не удается, данные ставятся в очередь во внутренний буфер записи до тех пор, пока их нельзя будет отправить.
Этот метод следует использовать вместе с методом
drain()
:stream.writelines(lines) await stream.drain()
-
close
()¶ Метод закрывает поток и базовый сокет.
Этот метод следует использовать вместе с методом
wait_closed()
:stream.close() await stream.wait_closed()
-
can_write_eof
()¶ Возвращает
True
, если базовый транспорт поддерживает методwrite_eof()
,False
в противном случае.
-
write_eof
()¶ Закройте конец потока записи после того, как буферизованные данные записи будут удалены.
-
transport
¶ Возвращает базовый транспорт asyncio.
-
get_extra_info
(name, default=None)¶ Доступ к необязательной транспортной информации; подробности см. в
BaseTransport.get_extra_info()
.
-
coroutine
drain
()¶ Дождитесь подходящего момента для возобновления записи в поток. Пример:
writer.write(data) await writer.drain()
Это метод управления потоком, который взаимодействует с базовым буфером записи IO. Когда размер буфера достигает верхней отметки, drain() блокирует до тех пор, пока размер буфера не уменьшится до нижней отметки и запись можно будет возобновить. Когда ждать нечего,
drain()
возвращается немедленно.
-
is_closing
()¶ Возвращает
True
, если поток закрыт или находится в процессе закрытия.Добавлено в версии 3.7.
-
Примеры¶
Эхо-клиент TCP с использованием потоков¶
TCP эхо-клиент, использующий функцию asyncio.open_connection()
:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
asyncio.run(tcp_echo_client('Hello World!'))
См.также
В примере TCP echo client protocol используется низкоуровневый метод loop.create_connection()
.
Эхо-сервер TCP с использованием потоков¶
TCP эхо-сервер с использованием функции asyncio.start_server()
:
import asyncio
async def handle_echo(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"Received {message!r} from {addr!r}")
print(f"Send: {message!r}")
writer.write(data)
await writer.drain()
print("Close the connection")
writer.close()
async def main():
server = await asyncio.start_server(
handle_echo, '127.0.0.1', 8888)
addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
print(f'Serving on {addrs}')
async with server:
await server.serve_forever()
asyncio.run(main())
См.также
В примере TCP echo server protocol используется метод loop.create_server()
.
Получить заголовки HTTP¶
Простой пример запроса HTTP-заголовков URL, переданного в командной строке:
import asyncio
import urllib.parse
import sys
async def print_http_headers(url):
url = urllib.parse.urlsplit(url)
if url.scheme == 'https':
reader, writer = await asyncio.open_connection(
url.hostname, 443, ssl=True)
else:
reader, writer = await asyncio.open_connection(
url.hostname, 80)
query = (
f"HEAD {url.path or '/'} HTTP/1.0\r\n"
f"Host: {url.hostname}\r\n"
f"\r\n"
)
writer.write(query.encode('latin-1'))
while True:
line = await reader.readline()
if not line:
break
line = line.decode('latin1').rstrip()
if line:
print(f'HTTP header> {line}')
# Ignore the body, close the socket
writer.close()
url = sys.argv[1]
asyncio.run(print_http_headers(url))
Использование:
python example.py http://example.com/path/page.html
или с помощью HTTPS:
python example.py https://example.com/path/page.html
Зарегистрируйте открытый сокет для ожидания данных с помощью потоков¶
Короутин, ожидающий, пока сокет не получит данные с помощью функции open_connection()
:
import asyncio
import socket
async def wait_for_data():
# Get a reference to the current event loop because
# we want to access low-level APIs.
loop = asyncio.get_running_loop()
# Create a pair of connected sockets.
rsock, wsock = socket.socketpair()
# Register the open socket to wait for data.
reader, writer = await asyncio.open_connection(sock=rsock)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
# Wait for data
data = await reader.read(100)
# Got data, we are done: close the socket
print("Received:", data.decode())
writer.close()
# Close the second socket
wsock.close()
asyncio.run(wait_for_data())
См.также
В примере register an open socket to wait for data using a protocol используется низкоуровневый протокол и метод loop.create_connection()
.
В примере watch a file descriptor for read events используется низкоуровневый метод loop.add_reader()
для просмотра дескриптора файла.