Транспортировка и протоколы¶
Предисловие
Транспорты и протоколы используются низкоуровневыми API циклов событий, такими как loop.create_connection()
. Они используют стиль программирования на основе обратного вызова и позволяют реализовать высокопроизводительные сетевые или IPC протоколы (например, HTTP).
По сути, транспорты и протоколы должны использоваться только в библиотеках и фреймворках и никогда в высокоуровневых приложениях asyncio.
Эта страница документации охватывает как Transports, так и Protocols.
Введение
На самом высоком уровне транспорт занимается как передаются байты, в то время как протокол определяет какие байты передавать (и, в некоторой степени, когда).
Другой способ сказать то же самое: транспорт - это абстракция для сокета (или аналогичной конечной точки ввода-вывода), а протокол - это абстракция для приложения, с точки зрения транспорта.
Еще одно мнение - транспортный и протокольный интерфейсы вместе определяют абстрактный интерфейс для использования сетевого ввода-вывода и межпроцессного ввода-вывода.
Между объектами транспорта и протокола всегда существует связь 1:1: протокол вызывает методы транспорта для отправки данных, а транспорт вызывает методы протокола для передачи ему полученных данных.
Большинство методов цикла событий, ориентированных на соединение (например, loop.create_connection()
), обычно принимают аргумент protocol_factory, используемый для создания объекта Protocol для принятого соединения, представленного объектом Transport. Такие методы обычно возвращают кортеж (transport, protocol)
.
Содержание
Эта страница документации содержит следующие разделы:
Раздел Transports документирует классы asyncio
BaseTransport
,ReadTransport
,WriteTransport
,Transport
,DatagramTransport
иSubprocessTransport
.Раздел Protocols документирует классы asyncio
BaseProtocol
,Protocol
,BufferedProtocol
,DatagramProtocol
иSubprocessProtocol
.В разделе Examples показано, как работать с транспортами, протоколами и низкоуровневыми API циклов событий.
Перевозки¶
Исходный код: Lib/asyncio/transports.py.
Транспорты - это классы, предоставляемые asyncio
для того, чтобы абстрагировать различные виды каналов связи.
Транспортные объекты всегда инстанцируются с помощью asyncio event loop.
asyncio реализует транспорты для TCP, UDP, SSL и труб подпроцессов. Методы, доступные на транспорте, зависят от типа транспорта.
Транспортными классами являются not thread safe.
Иерархия транспортов¶
-
class
asyncio.
BaseTransport
¶ Базовый класс для всех транспортов. Содержит методы, общие для всех транспортов asyncio.
-
class
asyncio.
WriteTransport
(BaseTransport)¶ Базовый транспорт для соединений только для записи.
Экземпляры класса WriteTransport возвращаются из метода цикла событий
loop.connect_write_pipe()
, а также используются методами, связанными с подпроцессами, напримерloop.subprocess_exec()
.
-
class
asyncio.
ReadTransport
(BaseTransport)¶ Базовый транспорт для соединений только для чтения.
Экземпляры класса ReadTransport возвращаются из метода цикла событий
loop.connect_read_pipe()
, а также используются методами, связанными с подпроцессами, такими какloop.subprocess_exec()
.
-
class
asyncio.
Transport
(WriteTransport, ReadTransport)¶ Интерфейс, представляющий двунаправленный транспорт, например, TCP-соединение.
Пользователь не инстанцирует транспорт напрямую; он вызывает служебную функцию, передавая ей фабрику протоколов и другую информацию, необходимую для создания транспорта и протокола.
Экземпляры класса Transport возвращаются или используются методами цикла событий, такими как
loop.create_connection()
,loop.create_unix_connection()
,loop.create_server()
,loop.sendfile()
и т.д.
-
class
asyncio.
DatagramTransport
(BaseTransport)¶ Транспорт для соединений дейтаграмм (UDP).
Экземпляры класса DatagramTransport возвращаются из метода цикла событий
loop.create_datagram_endpoint()
.
-
class
asyncio.
SubprocessTransport
(BaseTransport)¶ Абстракция для представления связи между родительским и дочерним процессом ОС.
Экземпляры класса SubprocessTransport возвращаются из методов цикла событий
loop.subprocess_shell()
иloop.subprocess_exec()
.
Базовый транспорт¶
-
BaseTransport.
close
()¶ Закройте транспорт.
Если транспорт имеет буфер для исходящих данных, буферизованные данные будут асинхронно удалены. Больше данные получены не будут. После того, как все буферизованные данные будут смыты, будет вызван метод протокола
protocol.connection_lost()
с аргументомNone
.
-
BaseTransport.
is_closing
()¶ Возвращает
True
, если транспорт закрывается или закрыт.
-
BaseTransport.
get_extra_info
(name, default=None)¶ Возвращает информацию о транспорте или базовых ресурсах, которые он использует.
name - это строка, представляющая часть информации, специфичной для транспорта, которую нужно получить.
default - это значение, которое возвращается, если информация недоступна, или если транспорт не поддерживает запрос с помощью данной сторонней реализации цикла событий или на текущей платформе.
Например, следующий код пытается получить объект базового сокета транспорта:
sock = transport.get_extra_info('socket') if sock is not None: print(sock.getsockopt(...))
Категории информации, которую можно запросить на некоторых транспортах:
розетка:
'peername'
: удаленный адрес, к которому подключен сокет, результатsocket.socket.getpeername()
(None
при ошибке)'socket'
:socket.socket
экземпляр'sockname'
: собственный адрес сокета, результатsocket.socket.getsockname()
SSL-сокет:
'compression'
: используемый алгоритм сжатия в виде строки, илиNone
, если соединение не сжато; результатssl.SSLSocket.compression()
.'cipher'
: кортеж из трех значений, содержащий название используемого шифра, версию протокола SSL, определяющую его использование, и количество используемых секретных битов; результатssl.SSLSocket.cipher()
.'peercert'
: сертификат сверстника; результатssl.SSLSocket.getpeercert()
'sslcontext'
:ssl.SSLContext
экземпляр'ssl_object'
:ssl.SSLObject
илиssl.SSLSocket
экземпляр
труба:
'pipe'
: объект трубы
подпроцесс:
'subprocess'
:subprocess.Popen
экземпляр
-
BaseTransport.
set_protocol
(protocol)¶ Установите новый протокол.
Переключение протокола должно осуществляться только тогда, когда оба протокола документированы для поддержки переключения.
-
BaseTransport.
get_protocol
()¶ Возвращает текущий протокол.
Только для чтения Транспорты¶
-
ReadTransport.
is_reading
()¶ Возвращает
True
, если транспорт получает новые данные.Добавлено в версии 3.7.
-
ReadTransport.
pause_reading
()¶ Приостановить работу принимающей стороны транспорта. Никакие данные не будут передаваться в метод протокола
protocol.data_received()
до тех пор, пока не будет вызванresume_reading()
.Изменено в версии 3.7: Метод является идемпотентным, т.е. его можно вызвать, когда транспорт уже приостановлен или закрыт.
-
ReadTransport.
resume_reading
()¶ Возобновите работу принимающей стороны. Метод
protocol.data_received()
протокола будет вызван еще раз, если некоторые данные доступны для чтения.Изменено в версии 3.7: Метод является идемпотентным, т.е. его можно вызвать, когда транспорт уже читает.
Транспортировка только для записи¶
-
WriteTransport.
abort
()¶ Немедленно закройте транспорт, не дожидаясь завершения ожидающих операций. Буферизованные данные будут потеряны. Больше данные не будут получены. Метод
protocol.connection_lost()
протокола в конечном итоге будет вызван сNone
в качестве аргумента.
-
WriteTransport.
can_write_eof
()¶ Возвращает
True
, если транспорт поддерживаетwrite_eof()
,False
, если нет.
-
WriteTransport.
get_write_buffer_size
()¶ Возвращает текущий размер выходного буфера, используемого транспортом.
-
WriteTransport.
get_write_buffer_limits
()¶ Получить высокий и низкий водяные знаки для управления потоком записи. Возвращает кортеж
(low, high)
, где low и high - положительное число байт.Используйте
set_write_buffer_limits()
для установки пределов.Добавлено в версии 3.4.2.
-
WriteTransport.
set_write_buffer_limits
(high=None, low=None)¶ Установите высокий и низкий водяные знаки для управления потоком записи.
Эти два значения (измеряемые в количестве байт) управляют тем, когда вызываются методы протокола
protocol.pause_writing()
иprotocol.resume_writing()
. Если указано, низкий водяной знак должен быть меньше или равен высокому водяному знаку. Ни high, ни low не могут быть отрицательными.pause_writing()
вызывается, когда размер буфера становится больше или равен значению high. Если запись была приостановлена,resume_writing()
вызывается, когда размер буфера становится меньше или равен значению low.Значения по умолчанию зависят от реализации. Если задан только высокий водяной знак, то низкий водяной знак по умолчанию принимает значение, зависящее от реализации, меньшее или равное высокому водяному знаку. Установка high в ноль заставляет low также равняться нулю, и вызывает вызов
pause_writing()
всякий раз, когда буфер становится непустым. Установка low в ноль приводит к тому, чтоresume_writing()
будет вызываться только тогда, когда буфер станет пустым. Использование нуля для любого из пределов обычно неоптимально, так как уменьшает возможности для одновременного выполнения операций ввода-вывода и вычислений.Используйте
get_write_buffer_limits()
для получения пределов.
-
WriteTransport.
write
(data)¶ Запишите несколько байтов данных на транспорт.
Этот метод не блокирует; он буферизирует данные и организует их асинхронную отправку.
-
WriteTransport.
writelines
(list_of_data)¶ Записать список (или любую итерабельную переменную) байтов данных на транспорт. Функционально это эквивалентно вызову
write()
на каждом элементе, выдаваемом итерабельной переменной, но может быть реализовано более эффективно.
-
WriteTransport.
write_eof
()¶ Закройте конец транспорта для записи после удаления всех буферизованных данных. Данные все еще могут быть получены.
Этот метод может выдать ошибку
NotImplementedError
, если транспорт (например, SSL) не поддерживает полузакрытые соединения.
Датаграммные транспорты¶
-
DatagramTransport.
sendto
(data, addr=None)¶ Отправьте байты data удаленному аналогу, указанному addr (целевой адрес, зависящий от транспорта). Если addr равен
None
, данные отправляются на целевой адрес, указанный при создании транспорта.Этот метод не блокирует; он буферизирует данные и организует их асинхронную отправку.
-
DatagramTransport.
abort
()¶ Немедленно закройте транспорт, не дожидаясь завершения ожидающих операций. Буферизованные данные будут потеряны. Больше данные не будут получены. Метод
protocol.connection_lost()
протокола в конечном итоге будет вызван сNone
в качестве аргумента.
Транспортировка подпроцессов¶
-
SubprocessTransport.
get_pid
()¶ Возвращает идентификатор процесса подпроцесса в виде целого числа.
-
SubprocessTransport.
get_pipe_transport
(fd)¶ Возвращает транспорт для коммуникационной трубы, соответствующей целочисленному дескриптору файла fd:
0
: читаемый потоковый транспорт стандартного ввода (stdin), илиNone
, если подпроцесс не был создан с помощьюstdin=PIPE
.1
: записываемый потоковый транспорт стандартного вывода (stdout), илиNone
, если подпроцесс не был создан с помощьюstdout=PIPE
.2
: записываемый потоковый транспорт стандартной ошибки (stderr), илиNone
, если подпроцесс не был создан с помощьюstderr=PIPE
.другие fd:
None
-
SubprocessTransport.
get_returncode
()¶ Возвращает код возврата подпроцесса в виде целого числа или
None
, если он не вернулся, что аналогично атрибутуsubprocess.Popen.returncode
.
-
SubprocessTransport.
kill
()¶ Убейте подпроцесс.
В системах POSIX функция посылает SIGKILL подпроцессу. В Windows этот метод является псевдонимом для
terminate()
.См. также
subprocess.Popen.kill()
.
-
SubprocessTransport.
send_signal
(signal)¶ Отправьте подпроцессу номер сигнала, как в
subprocess.Popen.send_signal()
.
-
SubprocessTransport.
terminate
()¶ Остановите подпроцесс.
В системах POSIX этот метод посылает SIGTERM подпроцессу. В Windows для остановки подпроцесса вызывается функция Windows API TerminateProcess().
См. также
subprocess.Popen.terminate()
.
Протоколы¶
Исходный код: Lib/asyncio/protocols.py.
asyncio предоставляет набор абстрактных базовых классов, которые следует использовать для реализации сетевых протоколов. Эти классы предназначены для использования вместе с transports.
Подклассы абстрактных базовых классов протоколов могут реализовывать некоторые или все методы. Все эти методы являются обратными вызовами: они вызываются транспортом при определенных событиях, например, при получении некоторых данных. Метод базового протокола должен вызываться соответствующим транспортом.
Базовые протоколы¶
-
class
asyncio.
BaseProtocol
¶ Базовый протокол с общими для всех протоколов методами.
-
class
asyncio.
Protocol
(BaseProtocol)¶ Базовый класс для реализации потоковых протоколов (TCP, сокеты Unix и т.д.).
-
class
asyncio.
BufferedProtocol
(BaseProtocol)¶ Базовый класс для реализации потоковых протоколов с ручным управлением буфером приема.
-
class
asyncio.
DatagramProtocol
(BaseProtocol)¶ Базовый класс для реализации протоколов дейтаграмм (UDP).
-
class
asyncio.
SubprocessProtocol
(BaseProtocol)¶ Базовый класс для реализации протоколов, взаимодействующих с дочерними процессами (однонаправленные трубы).
Базовый протокол¶
Все протоколы asyncio могут реализовать обратные вызовы Base Protocol.
Обратные вызовы соединения
Обратные вызовы соединения вызываются во всех протоколах ровно один раз при успешном соединении. Все остальные обратные вызовы протоколов могут быть вызваны только между этими двумя методами.
-
BaseProtocol.
connection_made
(transport)¶ Вызывается при установлении соединения.
Аргумент transport - это транспорт, представляющий соединение. Протокол отвечает за хранение ссылки на свой транспорт.
-
BaseProtocol.
connection_lost
(exc)¶ Вызывается при потере или закрытии соединения.
Аргументом является либо объект исключения, либо
None
. Последнее означает, что получен обычный EOF, либо соединение было прервано или закрыто этой стороной соединения.
Обратные вызовы управления потоком
Обратные вызовы управления потоком могут вызываться транспортом для приостановки или возобновления записи, выполняемой протоколом.
Подробнее см. документацию метода set_write_buffer_limits()
.
-
BaseProtocol.
pause_writing
()¶ Вызывается, когда буфер транспорта переходит за верхнюю водяную отметку.
-
BaseProtocol.
resume_writing
()¶ Вызывается, когда буфер транспорта опустошается ниже нижнего порога.
Если размер буфера равен старшему водяному знаку, pause_writing()
не вызывается: размер буфера должен строго превышать его.
И наоборот, resume_writing()
вызывается, когда размер буфера равен или меньше нижнего водяного знака. Эти конечные условия важны для того, чтобы гарантировать, что все идет так, как ожидается, когда одна из меток равна нулю.
Потоковые протоколы¶
Методы событий, такие как loop.create_server()
, loop.create_unix_server()
, loop.create_connection()
, loop.create_unix_connection()
, loop.connect_accepted_socket()
, loop.connect_read_pipe()
и loop.connect_write_pipe()
принимают фабрики, которые возвращают потоковые протоколы.
-
Protocol.
data_received
(data)¶ Вызывается при получении некоторых данных. data - непустой байтовый объект, содержащий входящие данные.
Буферизация, разбивка на части или сборка данных зависит от транспорта. В целом, не стоит полагаться на конкретную семантику и вместо этого сделать разбор общим и гибким. Однако данные всегда принимаются в правильном порядке.
Метод может быть вызван произвольное количество раз, пока соединение открыто.
Однако
protocol.eof_received()
вызывается не более одного раза. После вызова eof_received(),data_received()
больше не вызывается.
-
Protocol.
eof_received
()¶ Вызывается, когда другой конец сигнализирует, что больше не будет отправлять данные (например, вызовом
transport.write_eof()
, если другой конец также использует asyncio).Этот метод может вернуть ложное значение (включая
None
), в этом случае транспорт закроется сам. И наоборот, если этот метод возвращает истинное значение, используемый протокол определяет, закрывать ли транспорт. Поскольку реализация по умолчанию возвращаетNone
, она неявно закрывает соединение.Некоторые транспорты, включая SSL, не поддерживают полузакрытые соединения, в этом случае возвращение true из этого метода приведет к закрытию соединения.
Государственная машина:
start -> connection_made
[-> data_received]*
[-> eof_received]?
-> connection_lost -> end
Протоколы потоковой передачи с буферизацией¶
Добавлено в версии 3.7.
Буферизованные протоколы можно использовать с любым методом цикла событий, который поддерживает Streaming Protocols.
Реализации BufferedProtocol
допускают явное ручное выделение и управление буфером приема. Циклы событий могут использовать буфер, предоставляемый протоколом, чтобы избежать ненужных копий данных. Это может привести к заметному повышению производительности для протоколов, принимающих большие объемы данных. Сложные реализации протоколов могут значительно сократить количество выделений буфера.
Следующие обратные вызовы вызываются на экземплярах BufferedProtocol
:
-
BufferedProtocol.
get_buffer
(sizehint)¶ Вызывается для выделения нового буфера приема.
sizehint - это рекомендуемый минимальный размер возвращаемого буфера. Допустимо возвращать буферы меньшего или большего размера, чем указано в sizehint. При значении -1 размер буфера может быть произвольным. Возврат буфера с нулевым размером является ошибкой.
get_buffer()
должен возвращать объект, реализующий buffer protocol.
-
BufferedProtocol.
buffer_updated
(nbytes)¶ Вызывается, когда буфер был обновлен полученными данными.
nbytes - общее количество байт, которые были записаны в буфер.
-
BufferedProtocol.
eof_received
()¶ См. документацию метода
protocol.eof_received()
.
get_buffer()
может вызываться произвольное количество раз во время соединения. Однако protocol.eof_received()
вызывается не более одного раза, и, если он вызван, get_buffer()
и buffer_updated()
не будут вызваны после него.
Государственная машина:
start -> connection_made
[-> get_buffer
[-> buffer_updated]?
]*
[-> eof_received]?
-> connection_lost -> end
Протоколы дейтаграмм¶
Экземпляры Datagram Protocol должны быть построены фабриками протоколов, переданными в метод loop.create_datagram_endpoint()
.
-
DatagramProtocol.
datagram_received
(data, addr)¶ Вызывается при получении дейтаграммы. data - это байтовый объект, содержащий входящие данные. addr - это адрес аналога, отправляющего данные; точный формат зависит от транспорта.
-
DatagramProtocol.
error_received
(exc)¶ Вызывается, когда предыдущая операция отправки или получения вызывает ошибку
OSError
. exc - это экземплярOSError
.Этот метод вызывается в редких случаях, когда транспорт (например, UDP) обнаруживает, что дейтаграмма не может быть доставлена получателю. Однако во многих случаях недоставляемые дейтаграммы будут молча отбрасываться.
Примечание
В системах BSD (macOS, FreeBSD и т.д.) управление потоком не поддерживается для дейтаграммных протоколов, поскольку нет надежного способа обнаружения сбоев отправки, вызванных записью слишком большого количества пакетов.
Сокет всегда выглядит «готовым», а лишние пакеты отбрасываются. Ошибка OSError
с errno
, установленная в errno.ENOBUFS
, может возникнуть или не возникнуть; если она возникла, о ней будет сообщено в DatagramProtocol.error_received()
, но в противном случае она игнорируется.
Протоколы подпроцессов¶
Экземпляры протокола подпроцесса должны быть построены фабриками протоколов, переданными методам loop.subprocess_exec()
и loop.subprocess_shell()
.
-
SubprocessProtocol.
pipe_data_received
(fd, data)¶ Вызывается, когда дочерний процесс записывает данные в свой канал stdout или stderr.
fd - целочисленный дескриптор файла трубы.
data - непустой байтовый объект, содержащий полученные данные.
-
SubprocessProtocol.
pipe_connection_lost
(fd, exc)¶ Вызывается, когда один из каналов, связывающийся с дочерним процессом, закрывается.
fd - целочисленный дескриптор файла, который был закрыт.
-
SubprocessProtocol.
process_exited
()¶ Вызывается при завершении дочернего процесса.
Примеры¶
Эхо-сервер TCP¶
Создайте эхо-сервер TCP, используя метод loop.create_server()
, отправьте обратно полученные данные и закройте соединение:
import asyncio
class EchoServerProtocol(asyncio.Protocol):
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('Connection from {}'.format(peername))
self.transport = transport
def data_received(self, data):
message = data.decode()
print('Data received: {!r}'.format(message))
print('Send: {!r}'.format(message))
self.transport.write(data)
print('Close the client socket')
self.transport.close()
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
server = await loop.create_server(
lambda: EchoServerProtocol(),
'127.0.0.1', 8888)
async with server:
await server.serve_forever()
asyncio.run(main())
См.также
В примере TCP echo server using streams используется функция высокого уровня asyncio.start_server()
.
Эхо-клиент TCP¶
Эхо-клиент TCP, используя метод loop.create_connection()
, отправляет данные и ждет, пока соединение не будет закрыто:
import asyncio
class EchoClientProtocol(asyncio.Protocol):
def __init__(self, message, on_con_lost):
self.message = message
self.on_con_lost = on_con_lost
def connection_made(self, transport):
transport.write(self.message.encode())
print('Data sent: {!r}'.format(self.message))
def data_received(self, data):
print('Data received: {!r}'.format(data.decode()))
def connection_lost(self, exc):
print('The server closed the connection')
self.on_con_lost.set_result(True)
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
message = 'Hello World!'
transport, protocol = await loop.create_connection(
lambda: EchoClientProtocol(message, on_con_lost),
'127.0.0.1', 8888)
# Wait until the protocol signals that the connection
# is lost and close the transport.
try:
await on_con_lost
finally:
transport.close()
asyncio.run(main())
См.также
В примере TCP echo client using streams используется функция высокого уровня asyncio.open_connection()
.
Эхо-сервер UDP¶
Сервер эха UDP, используя метод loop.create_datagram_endpoint()
, отправляет обратно полученные данные:
import asyncio
class EchoServerProtocol:
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
message = data.decode()
print('Received %r from %s' % (message, addr))
print('Send %r to %s' % (message, addr))
self.transport.sendto(data, addr)
async def main():
print("Starting UDP server")
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
# One protocol instance will be created to serve all
# client requests.
transport, protocol = await loop.create_datagram_endpoint(
lambda: EchoServerProtocol(),
local_addr=('127.0.0.1', 9999))
try:
await asyncio.sleep(3600) # Serve for 1 hour.
finally:
transport.close()
asyncio.run(main())
Эхо-клиент UDP¶
Клиент эхо UDP, используя метод loop.create_datagram_endpoint()
, отправляет данные и закрывает транспорт, когда получает ответ:
import asyncio
class EchoClientProtocol:
def __init__(self, message, on_con_lost):
self.message = message
self.on_con_lost = on_con_lost
self.transport = None
def connection_made(self, transport):
self.transport = transport
print('Send:', self.message)
self.transport.sendto(self.message.encode())
def datagram_received(self, data, addr):
print("Received:", data.decode())
print("Close the socket")
self.transport.close()
def error_received(self, exc):
print('Error received:', exc)
def connection_lost(self, exc):
print("Connection closed")
self.on_con_lost.set_result(True)
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
message = "Hello World!"
transport, protocol = await loop.create_datagram_endpoint(
lambda: EchoClientProtocol(message, on_con_lost),
remote_addr=('127.0.0.1', 9999))
try:
await on_con_lost
finally:
transport.close()
asyncio.run(main())
Подключение существующих розеток¶
Подождите, пока сокет не получит данные, используя метод loop.create_connection()
с протоколом:
import asyncio
import socket
class MyProtocol(asyncio.Protocol):
def __init__(self, on_con_lost):
self.transport = None
self.on_con_lost = on_con_lost
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
print("Received:", data.decode())
# We are done: close the transport;
# connection_lost() will be called automatically.
self.transport.close()
def connection_lost(self, exc):
# The socket has been closed
self.on_con_lost.set_result(True)
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
# Create a pair of connected sockets
rsock, wsock = socket.socketpair()
# Register the socket to wait for data.
transport, protocol = await loop.create_connection(
lambda: MyProtocol(on_con_lost), sock=rsock)
# Simulate the reception of data from the network.
loop.call_soon(wsock.send, 'abc'.encode())
try:
await protocol.on_con_lost
finally:
transport.close()
wsock.close()
asyncio.run(main())
См.также
В примере watch a file descriptor for read events для регистрации FD используется низкоуровневый метод loop.add_reader()
.
В примере register an open socket to wait for data using streams используются потоки высокого уровня, созданные функцией open_connection()
в корутине.
loop.subprocess_exec() и SubprocessProtocol¶
Пример протокола подпроцесса, используемого для получения вывода подпроцесса и ожидания выхода подпроцесса.
Подпроцесс создается методом loop.subprocess_exec()
:
import asyncio
import sys
class DateProtocol(asyncio.SubprocessProtocol):
def __init__(self, exit_future):
self.exit_future = exit_future
self.output = bytearray()
def pipe_data_received(self, fd, data):
self.output.extend(data)
def process_exited(self):
self.exit_future.set_result(True)
async def get_date():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
code = 'import datetime; print(datetime.datetime.now())'
exit_future = asyncio.Future(loop=loop)
# Create the subprocess controlled by DateProtocol;
# redirect the standard output into a pipe.
transport, protocol = await loop.subprocess_exec(
lambda: DateProtocol(exit_future),
sys.executable, '-c', code,
stdin=None, stderr=None)
# Wait for the subprocess exit using the process_exited()
# method of the protocol.
await exit_future
# Close the stdout pipe.
transport.close()
# Read the output which was collected by the
# pipe_data_received() method of the protocol.
data = bytes(protocol.output)
return data.decode('ascii').rstrip()
date = asyncio.run(get_date())
print(f"Current date: {date}")
См. также same example, написанные с использованием высокоуровневых API.