Потоки¶
Исходный код: Lib/asyncio/streams.py
Потоки - это высокоуровневые асинхронные примитивы, готовые к работе в режиме ожидания, для работы с сетевыми подключениями. Потоки позволяют отправлять и получать данные без использования обратных вызовов или низкоуровневых протоколов и транспортных средств.
Вот пример клиента TCP echo, написанного с использованием потоков asyncio:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))
Смотрите также раздел Examples ниже.
Потоковые функции
Для создания потоков и работы с ними можно использовать следующие функции asyncio верхнего уровня:
- coroutine asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None)¶
Установите сетевое соединение и верните пару объектов
(reader, writer)
.Возвращаемые объекты reader и writer являются экземплярами классов
StreamReader
иStreamWriter
.limit определяет предельный размер буфера, используемый возвращаемым экземпляром
StreamReader
. По умолчанию значение limit равно 64 Кбайт.Остальные аргументы передаются непосредственно в
loop.create_connection()
.Примечание
Аргумент sock передает права собственности на сокет созданному
StreamWriter
. Чтобы закрыть сокет, вызовите его методclose()
.Изменено в версии 3.7: Добавлен параметр ssl_handshake_timeout.
Изменено в версии 3.8: Добавлены параметры happy_eyeballs_delay и interleave.
Изменено в версии 3.10: Удален параметр loop.
Изменено в версии 3.11: Добавлен параметр ssl_shutdown_timeout.
- coroutine asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)¶
Запустите сервер сокетов.
Обратный вызов client_connected_cb вызывается всякий раз, когда устанавливается новое клиентское соединение. Он получает пару
(reader, writer)
в качестве двух аргументов, экземпляров классовStreamReader
иStreamWriter
.client_connected_cb может быть простым вызываемым элементом или coroutine function; если это функция сопрограммы, она будет автоматически запланирована как
Task
.limit определяет предельный размер буфера, используемый возвращаемым экземпляром
StreamReader
. По умолчанию значение limit равно 64 Кбайт.Остальные аргументы передаются непосредственно в
loop.create_server()
.Примечание
Аргумент sock передает право собственности на сокет созданному серверу. Чтобы закрыть сокет, вызовите серверный метод
close()
.Изменено в версии 3.7: Добавлены параметры ssl_handshake_timeout и start_serving.
Изменено в версии 3.10: Удален параметр loop.
Изменено в версии 3.11: Добавлен параметр ssl_shutdown_timeout.
Сокеты Unix
- coroutine asyncio.open_unix_connection(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)¶
Установите соединение с сокетом Unix и верните пару символов
(reader, writer)
.Аналогично
open_connection()
, но работает с сокетами Unix.Смотрите также документацию по
loop.create_unix_connection()
.Примечание
Аргумент sock передает права собственности на сокет созданному
StreamWriter
. Чтобы закрыть сокет, вызовите его методclose()
.Availability: Unix.
Изменено в версии 3.7: Добавлен параметр ssl_handshake_timeout. Параметром path теперь может быть path-like object
Изменено в версии 3.10: Удален параметр loop.
Изменено в версии 3.11: Добавлен параметр ssl_shutdown_timeout.
- coroutine asyncio.start_unix_server(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)¶
Запустите сервер сокетов Unix.
Аналогично
start_server()
, но работает с сокетами Unix.Смотрите также документацию по
loop.create_unix_server()
.Примечание
Аргумент sock передает право собственности на сокет созданному серверу. Чтобы закрыть сокет, вызовите серверный метод
close()
.Availability: Unix.
Изменено в версии 3.7: Добавлены параметры ssl_handshake_timeout и start_serving. Параметром path теперь может быть path-like object.
Изменено в версии 3.10: Удален параметр loop.
Изменено в версии 3.11: Добавлен параметр ssl_shutdown_timeout.
Потоковое средство чтения¶
- class asyncio.StreamReader¶
Представляет объект reader, который предоставляет API для чтения данных из потока ввода-вывода. Как asynchronous iterable, объект поддерживает оператор
async for
.Не рекомендуется создавать экземпляры объектов StreamReader напрямую; вместо этого используйте
open_connection()
иstart_server()
.- feed_eof()¶
Подтвердите выполнение EOF.
- coroutine read(n=-1)¶
Считайте до n байт из потока.
Если n не указано или установлено в
-1
, считывайте до EOF, затем возвращайте все прочитанныеbytes
. Если EOF был получен, а внутренний буфер пуст, верните пустой объектbytes
.Если n равно
0
, немедленно верните пустой объектbytes
.Если значение n положительное, верните не более n доступных
bytes
, как только во внутреннем буфере будет доступен хотя бы 1 байт. Если EOF получен до считывания любого байта, верните пустой объектbytes
.
- coroutine readline()¶
Прочитайте одну строку, где «строка» - это последовательность байтов, заканчивающаяся на
\n
.Если EOF получен, а
\n
не найден, метод возвращает частично прочитанные данные.Если получен EOF, а внутренний буфер пуст, верните пустой объект
bytes
.
- coroutine readexactly(n)¶
Считайте ровно n байт.
Поднимите значение
IncompleteReadError
, если значение EOF будет достигнуто до того, как можно будет прочитать n. Используйте атрибутIncompleteReadError.partial
, чтобы получить частично прочитанные данные.
- coroutine readuntil(separator=b'\n')¶
Считывайте данные из потока до тех пор, пока не будет найден separator.
В случае успеха данные и разделитель будут удалены из внутреннего буфера (израсходованы). Возвращаемые данные будут содержать разделитель в конце.
Если объем считываемых данных превышает установленный лимит потока, возникает исключение
LimitOverrunError
, и данные остаются во внутреннем буфере и могут быть прочитаны снова.Если значение EOF достигнуто до того, как найден полный разделитель, возникает исключение
IncompleteReadError
и внутренний буфер сбрасывается. АтрибутIncompleteReadError.partial
может содержать часть разделителя.Добавлено в версии 3.5.2.
- at_eof()¶
Возвращает
True
, если буфер пуст и был вызванfeed_eof()
.
Потоковый редактор¶
- class asyncio.StreamWriter¶
Представляет объект записи, который предоставляет API-интерфейсы для записи данных в поток ввода-вывода.
Не рекомендуется создавать экземпляры объектов StreamWriter напрямую; вместо этого используйте
open_connection()
иstart_server()
.- write(data)¶
Метод пытается немедленно записать данные в базовый сокет. Если это не удается, данные помещаются во внутренний буфер записи до тех пор, пока их нельзя будет отправить.
Этот метод следует использовать вместе с методом
drain()
:stream.write(data) await stream.drain()
- writelines(data)¶
Метод немедленно записывает список байтов (или любой другой повторяемый файл) в базовый сокет. Если это не удается, данные помещаются во внутренний буфер записи в очередь до тех пор, пока их нельзя будет отправить.
Этот метод следует использовать вместе с методом
drain()
:stream.writelines(lines) await stream.drain()
- close()¶
Метод закрывает поток и базовый сокет.
Этот метод следует использовать, хотя и не обязательно, вместе с методом
wait_closed()
:stream.close() await stream.wait_closed()
- can_write_eof()¶
Возвращает
True
, если базовый транспорт поддерживает методwrite_eof()
,False
в противном случае.
- write_eof()¶
Закройте конец записи потока после того, как буферизованные данные записи будут сброшены.
- transport¶
Возвращает базовый транспорт asyncio.
- get_extra_info(name, default=None)¶
Получите доступ к дополнительной информации о транспорте; более подробную информацию смотрите в разделе
BaseTransport.get_extra_info()
.
- coroutine drain()¶
Подождите, пока не придет время возобновить запись в поток. Пример:
writer.write(data) await writer.drain()
Это метод управления потоком, который взаимодействует с базовым буфером ввода-вывода для записи. Когда размер буфера достигает максимального значения, функция drain() блокируется до тех пор, пока размер буфера не уменьшится до минимального значения, и запись может быть возобновлена. Когда ждать нечего,
drain()
немедленно возвращается.
- coroutine start_tls(sslcontext, *, server_hostname=None, ssl_handshake_timeout=None)¶
Обновите существующее потоковое соединение до TLS.
Параметры:
sslcontext: сконфигурированный экземпляр
SSLContext
.server_hostname: задает или переопределяет имя хоста, с которым будет сопоставляться сертификат целевого сервера.
ssl_handshake_timeout - это время в секундах, которое требуется для завершения подтверждения связи по протоколу TLS перед прерыванием соединения.
60.0
секунд, еслиNone
(по умолчанию).
Добавлено в версии 3.11.
- is_closing()¶
Возвращает
True
, если поток закрыт или находится в процессе закрытия.Добавлено в версии 3.7.
Примеры¶
Эхо-клиент TCP, использующий потоки¶
Эхо-клиент TCP, использующий функцию asyncio.open_connection()
:
import asyncio
async def tcp_echo_client(message):
reader, writer = await asyncio.open_connection(
'127.0.0.1', 8888)
print(f'Send: {message!r}')
writer.write(message.encode())
await writer.drain()
data = await reader.read(100)
print(f'Received: {data.decode()!r}')
print('Close the connection')
writer.close()
await writer.wait_closed()
asyncio.run(tcp_echo_client('Hello World!'))
См.также
В примере TCP echo client protocol используется низкоуровневый метод loop.create_connection()
.
Эхо-сервер TCP, использующий потоки¶
Эхо-сервер TCP, использующий функцию asyncio.start_server()
:
import asyncio
async def handle_echo(reader, writer):
data = await reader.read(100)
message = data.decode()
addr = writer.get_extra_info('peername')
print(f"Received {message!r} from {addr!r}")
print(f"Send: {message!r}")
writer.write(data)
await writer.drain()
print("Close the connection")
writer.close()
await writer.wait_closed()
async def main():
server = await asyncio.start_server(
handle_echo, '127.0.0.1', 8888)
addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
print(f'Serving on {addrs}')
async with server:
await server.serve_forever()
asyncio.run(main())
См.также
В примере TCP echo server protocol используется метод loop.create_server()
.
Получение HTTP-заголовков¶
Простой пример запроса HTTP-заголовков URL-адреса, передаваемого из командной строки:
import asyncio
import urllib.parse
import sys
async def print_http_headers(url):
url = urllib.parse.urlsplit(url)
if url.scheme == 'https':
reader, writer = await asyncio.open_connection(
url.hostname, 443, ssl=True)
else:
reader, writer = await asyncio.open_connection(
url.hostname, 80)
query = (
f"HEAD {url.path or '/'} HTTP/1.0\r\n"
f"Host: {url.hostname}\r\n"
f"\r\n"
)
writer.write(query.encode('latin-1'))
while True:
line = await reader.readline()
if not line:
break
line = line.decode('latin1').rstrip()
if line:
print(f'HTTP header> {line}')
# Ignore the body, close the socket
writer.close()
await writer.wait_closed()
url = sys.argv[1]
asyncio.run(print_http_headers(url))
Использование:
python example.py http://example.com/path/page.html
или с помощью HTTPS:
python example.py https://example.com/path/page.html
Зарегистрируйте открытый сокет для ожидания данных с использованием потоков¶
Сопрограмма, ожидающая, пока сокет не получит данные с помощью функции open_connection()
:
import asyncio
import socket
async def wait_for_data():
# Get a reference to the current event loop because
# we want to access low-level APIs.
loop = asyncio.get_running_loop()
# Create a pair of connected sockets.
rsock, wsock = socket.socketpair()
# Register the open socket to wait for data.
reader, writer = await asyncio.open_connection(sock=rsock)
# Simulate the reception of data from the network
loop.call_soon(wsock.send, 'abc'.encode())
# Wait for data
data = await reader.read(100)
# Got data, we are done: close the socket
print("Received:", data.decode())
writer.close()
await writer.wait_closed()
# Close the second socket
wsock.close()
asyncio.run(wait_for_data())
См.также
В примере register an open socket to wait for data using a protocol используется низкоуровневый протокол и метод loop.create_connection()
.
В примере watch a file descriptor for read events используется низкоуровневый метод loop.add_reader()
для просмотра файлового дескриптора.