Транспорт и протоколы

Предисловие

Транспортные средства и протоколы используются низкоуровневыми API-интерфейсами цикла обработки событий, такими как loop.create_connection(). Они используют стиль программирования на основе обратного вызова и обеспечивают высокопроизводительную реализацию сетевых протоколов или протоколов IPC (например, HTTP).

По сути, транспортные средства и протоколы должны использоваться только в библиотеках и фреймворках и никогда - в высокоуровневых асинхронных приложениях.

На этой странице документации описаны как Transports, так и Protocols.

Вступление

На самом высоком уровне передача данных связана с тем, как передаются байты, в то время как протокол определяет, какие байты следует передавать (и в некоторой степени когда).

Другой способ сказать то же самое: транспорт - это абстракция для сокета (или аналогичной конечной точки ввода-вывода), в то время как протокол - это абстракция для приложения, с точки зрения транспорта.

Еще одна точка зрения заключается в том, что транспортный и протокольный интерфейсы вместе определяют абстрактный интерфейс для использования сетевого ввода-вывода и межпроцессного ввода-вывода.

Между транспортными объектами и объектами протокола всегда существует соотношение 1:1: протокол вызывает транспортные методы для отправки данных, в то время как транспорт вызывает методы протокола для передачи полученных данных.

Большинство методов цикла обработки событий, ориентированных на подключение (таких как loop.create_connection()), обычно принимают аргумент protocol_factory, используемый для создания объекта Protocol для принятого соединения, представленного объектом Transport. Такие методы обычно возвращают кортеж из (transport, protocol).

Содержание

Эта страница документации содержит следующие разделы:

Транспортирует

Исходный код: Lib/asyncio/transports.py


Транспортные средства - это классы, предоставляемые asyncio для абстрагирования различных видов каналов связи.

Транспортные объекты всегда создаются с помощью asyncio event loop.

asyncio реализует транспортировку по протоколам TCP, UDP, SSL и каналам подпроцессов. Методы, доступные для транспорта, зависят от его типа.

Транспортными классами являются not thread safe.

Иерархия транспортных средств

class asyncio.BaseTransport

Базовый класс для всех транспортных средств. Содержит методы, которые совместно используются всеми транспортными средствами asyncio.

class asyncio.WriteTransport(BaseTransport)

Базовый класс для всех транспортных средств. Содержит методы, которые совместно используются всеми транспортными средствами asyncio.

Экземпляры класса WriteTransport возвращаются из метода цикла обработки событий loop.connect_write_pipe() и также используются методами, связанными с подпроцессом, такими как loop.subprocess_exec().

class asyncio.ReadTransport(BaseTransport)

Экземпляры класса WriteTransport возвращаются из метода цикла обработки событий и также используются методами, связанными с подпроцессом, такими как .

Экземпляры класса ReadTransport возвращаются из метода цикла обработки событий loop.connect_read_pipe() и также используются методами, связанными с подпроцессом, такими как loop.subprocess_exec().

class asyncio.Transport(WriteTransport, ReadTransport)

Интерфейс, представляющий собой двунаправленный транспорт, такой как TCP-соединение.

Пользователь не создает экземпляр транспорта напрямую; он вызывает служебную функцию, передавая ей фабрику протоколов и другую информацию, необходимую для создания транспорта и протокола.

Экземпляры класса Transport возвращаются или используются методами цикла обработки событий, такими как loop.create_connection(), loop.create_unix_connection(), loop.create_server(), loop.sendfile(), и т.д.

class asyncio.DatagramTransport(BaseTransport)

Транспорт для дейтаграммных (UDP) соединений.

Экземпляры класса DatagramTransport возвращаются из метода цикла обработки событий loop.create_datagram_endpoint().

class asyncio.SubprocessTransport(BaseTransport)

Абстракция, представляющая связь между родительским и дочерним процессами операционной системы.

Экземпляры класса SubprocessTransport возвращаются из методов цикла обработки событий loop.subprocess_shell() и loop.subprocess_exec().

Базовый транспорт

BaseTransport.close()

Закройте транспорт.

Если у транспортного средства есть буфер для исходящих данных, буферизованные данные будут сбрасываться асинхронно. Больше никаких данных получено не будет. После того, как все буферизованные данные будут сброшены, будет вызван метод протокола protocol.connection_lost() с None в качестве аргумента. Транспорт не должен использоваться после того, как он закрыт.

BaseTransport.is_closing()

Верните True, если транспорт закрывается.

BaseTransport.get_extra_info(name, default=None)

Верните , если транспорт закрывается.

Верните , если транспорт закрывается.

Верните , если транспорт закрывается.

Верните , если транспорт закрывается.

sock = transport.get_extra_info('socket')
if sock is not None:
    print(sock.getsockopt(...))

Верните , если транспорт закрывается.

  • Верните , если транспорт закрывается.

  • Верните , если транспорт закрывается.

    • 'compression': алгоритм сжатия, используемый в виде строки, или None, если соединение не сжато; результат ssl.SSLSocket.compression()

    • 'cipher': алгоритм сжатия, используемый в виде строки, или ssl.SSLSocket.cipher(), если соединение не сжато; результат

    • 'peercert': алгоритм сжатия, используемый в виде строки, или ssl.SSLSocket.getpeercert(), если соединение не сжато; результат

    • 'sslcontext': ssl.SSLContext пример

    • 'ssl_object': ssl.SSLObject приме ssl.SSLSocket р

  • труба:

    • ``“pipe“``труба:

  • подпроцесс:

BaseTransport.set_protocol(protocol)

подпроцесс:

подпроцесс:

BaseTransport.get_protocol()

подпроцесс:

Транспорты, доступные только для чтения

ReadTransport.is_reading()

Верните True, если транспорт получает новые данные.

Добавлено в версии 3.7.

ReadTransport.pause_reading()

Приостановите передачу данных на принимающей стороне. Никакие данные не будут переданы в метод протокола protocol.data_received() до тех пор, пока не будет вызван метод resume_reading().

Изменено в версии 3.7: Метод является идемпотентным, то есть он может быть вызван, когда передача уже приостановлена или закрыта.

ReadTransport.resume_reading()

Возобновите работу с принимающей стороной. Метод протокола protocol.data_received() будет вызван еще раз, если какие-либо данные будут доступны для чтения.

Изменено в версии 3.7: Метод является идемпотентным, то есть он может быть вызван, когда транспорт уже выполняет чтение.

Транспорты только для записи

WriteTransport.abort()

Немедленно закройте передачу, не дожидаясь завершения отложенных операций. Буферизованные данные будут потеряны. Больше никаких данных получено не будет. В конечном итоге будет вызван метод протокола protocol.connection_lost() с None в качестве аргумента.

WriteTransport.can_write_eof()

Верните True, если транспорт поддерживает write_eof(), False если нет.

WriteTransport.get_write_buffer_size()

Возвращает текущий размер выходного буфера, используемого транспортным средством.

WriteTransport.get_write_buffer_limits()

Получите водяные знаки high и low для управления потоком записи. Верните кортеж (low, high), где low и high - положительное число байт.

Используйте set_write_buffer_limits() для установки ограничений.

Добавлено в версии 3.4.2.

WriteTransport.set_write_buffer_limits(high=None, low=None)

Установите водяные знаки «высокий» и «низкий» для управления потоком записи.

Эти два значения (измеряемые в количестве байт) определяют, когда вызываются методы протокола protocol.pause_writing() и protocol.resume_writing(). Если указано, нижний водяной знак должен быть меньше или равен верхнему водяному знаку. Ни «высокий», ни «низкий» не могут быть отрицательными.

pause_writing() вызывается, когда размер буфера становится больше или равен значению high. Если запись была приостановлена, resume_writing() вызывается, когда размер буфера становится меньше или равен значению low.

pause_writing() вызывается, когда размер буфера становится больше или равен значению high. Если запись была приостановлена, resume_writing() вызывается, когда размер буфера становится меньше или равен значению low.

get_write_buffer_limits() вызывается, когда размер буфера становится больше или равен значению high. Если запись была приостановлена, вызывается, когда размер буфера становится меньше или равен значению low.

WriteTransport.write(data)

Запишите несколько байт данных в транспортный файл.

Запишите несколько байт данных в транспортный файл.

WriteTransport.writelines(list_of_data)

:meth:`write`Запишите несколько байт данных в транспортный файл.

WriteTransport.write_eof()

Закройте конец передачи данных для записи после очистки всех буферизованных данных. Данные все еще могут быть получены.

:exc:`NotImplementedError`Закройте конец передачи данных для записи после очистки всех буферизованных данных. Данные все еще могут быть получены.

Дейтаграмма

DatagramTransport.sendto(data, addr=None)

:const:`None`Дейтаграмма

Запишите несколько байт данных в транспортный файл.

DatagramTransport.abort()

Немедленно закройте передачу, не дожидаясь завершения отложенных операций. Буферизованные данные будут потеряны. Больше никаких данных получено не будет. В конечном итоге будет вызван метод протокола protocol.connection_lost() с None в качестве аргумента.

Транспортировка подпроцессов

SubprocessTransport.get_pid()

Возвращает идентификатор процесса подпроцесса в виде целого числа.

SubprocessTransport.get_pipe_transport(fd)

Возвращает транспорт для канала связи, соответствующего целочисленному файловому дескриптору fd:

  • 0: считываемый потоковый перенос стандартного ввода (stdin) или None, если подпроцесс не был создан с помощью stdin=PIPE

  • 1: доступный для записи поток стандартного вывода (stdout) или None, если подпроцесс не был создан с помощью stdout=PIPE

  • 2: доступная для записи потоковая передача стандартной ошибки (stderr) или None, если подпроцесс не был создан с помощью stderr=PIPE

  • другое fd: None

SubprocessTransport.get_returncode()

Верните код возврата подпроцесса в виде целого числа или None, если он не был возвращен, что аналогично атрибуту subprocess.Popen.returncode.

SubprocessTransport.kill()

Завершите подпроцесс.

В системах POSIX функция отправляет SIGKILL в подпроцесс. В Windows этот метод является псевдонимом для terminate().

Смотрите также subprocess.Popen.kill().

SubprocessTransport.send_signal(signal)

Отправьте номер сигнала в подпроцесс, как в subprocess.Popen.send_signal().

SubprocessTransport.terminate()

Остановите подпроцесс.

В системах POSIX этот метод отправляет SIGTERM в подпроцесс. В Windows для остановки подпроцесса вызывается функция Windows API TerminateProcess().

Смотрите также subprocess.Popen.terminate().

SubprocessTransport.close()

Завершите подпроцесс, вызвав метод kill().

Если подпроцесс еще не вернулся, и закройте транспортировку каналов stdin, stdout и stderr.

Протоколы

Исходный код: Lib/asyncio/protocols.py


asyncio предоставляет набор абстрактных базовых классов, которые должны использоваться для реализации сетевых протоколов. Эти классы предназначены для использования вместе с transports.

Подклассы абстрактных базовых протокольных классов могут реализовывать некоторые или все методы. Все эти методы являются обратными вызовами: они вызываются транспортными средствами при определенных событиях, например, при получении некоторых данных. Метод базового протокола должен вызываться соответствующим транспортом.

Базовые протоколы

class asyncio.BaseProtocol

Базовый протокол с методами, общими для всех протоколов.

class asyncio.Protocol(BaseProtocol)

Базовый класс для реализации потоковых протоколов (TCP, Unix-сокеты и т.д.).

class asyncio.BufferedProtocol(BaseProtocol)

Базовый класс для реализации потоковых протоколов с ручным управлением буфером приема.

class asyncio.DatagramProtocol(BaseProtocol)

Базовый класс для реализации протоколов передачи дейтаграмм (UDP).

class asyncio.SubprocessProtocol(BaseProtocol)

Базовый класс для реализации протоколов, взаимодействующих с дочерними процессами (однонаправленные каналы).

Базовый протокол

Все протоколы asyncio могут реализовывать обратные вызовы базового протокола.

Обратные вызовы для подключения

Обратные вызовы соединения вызываются по всем протоколам ровно один раз за каждое успешное соединение. Все остальные обратные вызовы протокола могут быть вызваны только этими двумя методами.

BaseProtocol.connection_made(transport)

Вызывается при установлении соединения.

Аргумент transport - это транспорт, представляющий соединение. Протокол отвечает за хранение ссылки на свой транспорт.

BaseProtocol.connection_lost(exc)

Вызывается, когда соединение потеряно или закрыто.

Аргументом является либо объект exception, либо None. Последнее означает, что получен обычный EOF, или соединение было прервано или закрыто с этой стороны соединения.

Обратные вызовы для управления потоком

Обратные вызовы управления потоком могут вызываться транспортными средствами для приостановки или возобновления записи, выполняемой протоколом.

Смотрите документацию по методу set_write_buffer_limits() для получения более подробной информации.

BaseProtocol.pause_writing()

Вызывается, когда транспортный буфер выходит за пределы верхнего водяного знака.

BaseProtocol.resume_writing()

Вызывается, когда объем транспортного буфера опускается ниже нижнего водяного знака.

Если размер буфера равен верхнему водяному знаку, pause_writing() не вызывается: размер буфера должен быть строго превышен.

И наоборот, resume_writing() вызывается, когда размер буфера равен или меньше нижнего водяного знака. Эти конечные условия важны для обеспечения того, чтобы все шло так, как ожидалось, когда любой из этих знаков равен нулю.

Потоковые протоколы

Методы событий, такие как loop.create_server(), loop.create_unix_server(), loop.create_connection(), loop.create_unix_connection(), loop.connect_accepted_socket(), loop.connect_read_pipe(), и loop.connect_write_pipe(), принимают фабрики, которые возвращают потоковые протоколы.

Protocol.data_received(data)

Вызывается при получении некоторых данных. data - это непустой объект в байтах, содержащий входящие данные.

Буферизуются ли данные, разбиваются на фрагменты или собираются заново, зависит от способа передачи. В целом, вам не следует полагаться на конкретную семантику и вместо этого делать синтаксический анализ универсальным и гибким. Однако данные всегда поступают в правильном порядке.

Метод может быть вызван произвольное количество раз, пока соединение открыто.

Однако protocol.eof_received() вызывается не более одного раза. Как только вызывается eof_received(), data_received() больше не вызывается.

Protocol.eof_received()

Вызывается, когда другой конец сигнализирует, что больше не будет отправлять данные (например, путем вызова transport.write_eof(), если другой конец также использует asyncio).

Этот метод может возвращать значение false (включая None), и в этом случае транспорт будет закрыт сам по себе. И наоборот, если этот метод возвращает значение true, используемый протокол определяет, следует ли закрывать транспорт. Поскольку реализация по умолчанию возвращает None, она неявно закрывает соединение.

Некоторые транспортные средства, включая SSL, не поддерживают полузакрытые соединения, и в этом случае возврат значения true из этого метода приведет к закрытию соединения.

Государственная машина:

start -> connection_made
    [-> data_received]*
    [-> eof_received]?
-> connection_lost -> end

Буферизованные потоковые протоколы

Добавлено в версии 3.7.

Буферизованные протоколы можно использовать с любым методом цикла обработки событий, который поддерживает Streaming Protocols.

BufferedProtocol реализации позволяют явно вручную распределять буфер приема и управлять им. Затем циклы обработки событий могут использовать буфер, предоставляемый протоколом, чтобы избежать ненужных копий данных. Это может привести к заметному повышению производительности протоколов, которые получают большие объемы данных. Сложные реализации протоколов могут значительно сократить количество выделяемых буферов.

Следующие обратные вызовы вызываются в экземплярах BufferedProtocol:

BufferedProtocol.get_buffer(sizehint)

Вызывается для выделения нового приемного буфера.

sizehint - это рекомендуемый минимальный размер возвращаемого буфера. Допустимо возвращать буферы меньшего или большего размера, чем указано в sizehint. При значении -1 размер буфера может быть произвольным. Возврат буфера с нулевым размером является ошибкой.

get_buffer() должен возвращать объект, реализующий buffer protocol.

BufferedProtocol.buffer_updated(nbytes)

Вызывается, когда буфер обновляется полученными данными.

nbytes - это общее количество байт, которые были записаны в буфер.

BufferedProtocol.eof_received()

Смотрите документацию по методу protocol.eof_received().

get_buffer() может вызываться произвольное количество раз во время соединения. Однако protocol.eof_received() вызывается не более одного раза, и, если вызывается, get_buffer() и buffer_updated() не будут вызываться после него.

Государственная машина:

start -> connection_made
    [-> get_buffer
        [-> buffer_updated]?
    ]*
    [-> eof_received]?
-> connection_lost -> end

Протоколы передачи дейтаграмм

Экземпляры протокола дейтаграмм должны создаваться с помощью фабрик протоколов, передаваемых методу loop.create_datagram_endpoint().

DatagramProtocol.datagram_received(data, addr)

Вызывается при получении дейтаграммы. data - это объект bytes, содержащий входящие данные. addr - это адрес узла, отправляющего данные; точный формат зависит от способа передачи.

DatagramProtocol.error_received(exc)

Вызывается, когда предыдущая операция отправки или получения вызывает OSError. exc - это экземпляр OSError.

Этот метод вызывается в редких случаях, когда транспортная система (например, UDP) обнаруживает, что дейтаграмма не может быть доставлена получателю. Однако во многих случаях недоставленные дейтаграммы автоматически удаляются.

Примечание

В системах BSD (macOS, FreeBSD и т.д.) управление потоком не поддерживается для протоколов передачи дейтаграмм, поскольку нет надежного способа обнаружить сбои отправки, вызванные записью слишком большого количества пакетов.

Сокет всегда отображается как «готовый», и лишние пакеты отбрасываются. Значение OSError с значением errno, равным errno.ENOBUFS, может быть задано, а может и не быть задано; если оно задано, о нем будет сообщено как о DatagramProtocol.error_received(), но в противном случае оно будет проигнорировано.

Протоколы подпроцессов

Экземпляры протокола подпроцесса должны создаваться с помощью фабрик протоколов, передаваемых в методы loop.subprocess_exec() и loop.subprocess_shell().

SubprocessProtocol.pipe_data_received(fd, data)

Вызывается, когда дочерний процесс записывает данные в свой канал stdout или stderr.

fd - это целочисленный файловый дескриптор канала.

data - это непустой объект в байтах, содержащий полученные данные.

SubprocessProtocol.pipe_connection_lost(fd, exc)

Вызывается, когда закрывается один из каналов, связывающихся с дочерним процессом.

fd - это целочисленный файловый дескриптор, который был закрыт.

SubprocessProtocol.process_exited()

Вызывается при завершении дочернего процесса.

Он может быть вызван перед методами pipe_data_received() и pipe_connection_lost().

Примеры

Эхо-сервер TCP

Создайте эхо-сервер TCP, используя метод loop.create_server(), отправьте обратно полученные данные и закройте соединение:

import asyncio


class EchoServerProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print('Data received: {!r}'.format(message))

        print('Send: {!r}'.format(message))
        self.transport.write(data)

        print('Close the client socket')
        self.transport.close()


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    server = await loop.create_server(
        lambda: EchoServerProtocol(),
        '127.0.0.1', 8888)

    async with server:
        await server.serve_forever()


asyncio.run(main())

См.также

В примере TCP echo server using streams используется высокоуровневая функция asyncio.start_server().

Эхо-клиент TCP

Эхо-клиент TCP, использующий метод loop.create_connection(), отправляет данные и ожидает, пока соединение не будет закрыто:

import asyncio


class EchoClientProtocol(asyncio.Protocol):
    def __init__(self, message, on_con_lost):
        self.message = message
        self.on_con_lost = on_con_lost

    def connection_made(self, transport):
        transport.write(self.message.encode())
        print('Data sent: {!r}'.format(self.message))

    def data_received(self, data):
        print('Data received: {!r}'.format(data.decode()))

    def connection_lost(self, exc):
        print('The server closed the connection')
        self.on_con_lost.set_result(True)


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    on_con_lost = loop.create_future()
    message = 'Hello World!'

    transport, protocol = await loop.create_connection(
        lambda: EchoClientProtocol(message, on_con_lost),
        '127.0.0.1', 8888)

    # Wait until the protocol signals that the connection
    # is lost and close the transport.
    try:
        await on_con_lost
    finally:
        transport.close()


asyncio.run(main())

См.также

В примере TCP echo client using streams используется высокоуровневая функция asyncio.open_connection().

Эхо-сервер UDP

Эхо-сервер UDP, используя метод loop.create_datagram_endpoint(), отправляет обратно полученные данные:

import asyncio


class EchoServerProtocol:
    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        message = data.decode()
        print('Received %r from %s' % (message, addr))
        print('Send %r to %s' % (message, addr))
        self.transport.sendto(data, addr)


async def main():
    print("Starting UDP server")

    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    # One protocol instance will be created to serve all
    # client requests.
    transport, protocol = await loop.create_datagram_endpoint(
        lambda: EchoServerProtocol(),
        local_addr=('127.0.0.1', 9999))

    try:
        await asyncio.sleep(3600)  # Serve for 1 hour.
    finally:
        transport.close()


asyncio.run(main())

Эхо-клиент UDP

Эхо-клиент UDP, используя метод loop.create_datagram_endpoint(), отправляет данные и закрывает передачу, когда получает ответ:

import asyncio


class EchoClientProtocol:
    def __init__(self, message, on_con_lost):
        self.message = message
        self.on_con_lost = on_con_lost
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport
        print('Send:', self.message)
        self.transport.sendto(self.message.encode())

    def datagram_received(self, data, addr):
        print("Received:", data.decode())

        print("Close the socket")
        self.transport.close()

    def error_received(self, exc):
        print('Error received:', exc)

    def connection_lost(self, exc):
        print("Connection closed")
        self.on_con_lost.set_result(True)


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    on_con_lost = loop.create_future()
    message = "Hello World!"

    transport, protocol = await loop.create_datagram_endpoint(
        lambda: EchoClientProtocol(message, on_con_lost),
        remote_addr=('127.0.0.1', 9999))

    try:
        await on_con_lost
    finally:
        transport.close()


asyncio.run(main())

Подключение существующих розеток

Подождите, пока сокет не получит данные, используя метод loop.create_connection() с протоколом:

import asyncio
import socket


class MyProtocol(asyncio.Protocol):

    def __init__(self, on_con_lost):
        self.transport = None
        self.on_con_lost = on_con_lost

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print("Received:", data.decode())

        # We are done: close the transport;
        # connection_lost() will be called automatically.
        self.transport.close()

    def connection_lost(self, exc):
        # The socket has been closed
        self.on_con_lost.set_result(True)


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()
    on_con_lost = loop.create_future()

    # Create a pair of connected sockets
    rsock, wsock = socket.socketpair()

    # Register the socket to wait for data.
    transport, protocol = await loop.create_connection(
        lambda: MyProtocol(on_con_lost), sock=rsock)

    # Simulate the reception of data from the network.
    loop.call_soon(wsock.send, 'abc'.encode())

    try:
        await protocol.on_con_lost
    finally:
        transport.close()
        wsock.close()

asyncio.run(main())

См.также

В примере watch a file descriptor for read events используется низкоуровневый метод loop.add_reader() для регистрации FD.

В примере register an open socket to wait for data using streams используются потоки высокого уровня, созданные функцией open_connection() в сопрограмме.

цикл.subprocess_exec() и SubprocessProtocol

Пример протокола подпроцесса, используемого для получения выходных данных подпроцесса и ожидания завершения подпроцесса.

Подпроцесс создается с помощью метода loop.subprocess_exec():

import asyncio
import sys

class DateProtocol(asyncio.SubprocessProtocol):
    def __init__(self, exit_future):
        self.exit_future = exit_future
        self.output = bytearray()
        self.pipe_closed = False
        self.exited = False

    def pipe_connection_lost(self, fd, exc):
        self.pipe_closed = True
        self.check_for_exit()

    def pipe_data_received(self, fd, data):
        self.output.extend(data)

    def process_exited(self):
        self.exited = True
        # process_exited() method can be called before
        # pipe_connection_lost() method: wait until both methods are
        # called.
        self.check_for_exit()

    def check_for_exit(self):
        if self.pipe_closed and self.exited:
            self.exit_future.set_result(True)

async def get_date():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    code = 'import datetime; print(datetime.datetime.now())'
    exit_future = asyncio.Future(loop=loop)

    # Create the subprocess controlled by DateProtocol;
    # redirect the standard output into a pipe.
    transport, protocol = await loop.subprocess_exec(
        lambda: DateProtocol(exit_future),
        sys.executable, '-c', code,
        stdin=None, stderr=None)

    # Wait for the subprocess exit using the process_exited()
    # method of the protocol.
    await exit_future

    # Close the stdout pipe.
    transport.close()

    # Read the output which was collected by the
    # pipe_data_received() method of the protocol.
    data = bytes(protocol.output)
    return data.decode('ascii').rstrip()

date = asyncio.run(get_date())
print(f"Current date: {date}")

Смотрите также same example, написанный с использованием высокоуровневых API.

Вернуться на верх