From: John Snow Date: Fri, 25 Feb 2022 15:59:41 -0500 Subject: python/aqmp: remove _new_session and _establish_connection MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Git-commit: 68a6cf3ffe3532c0655efbbf5910bd99a1b4a3fa These two methods attempted to entirely envelop the logic of establishing a connection to a peer start to finish. However, we need to break apart the incoming connection step into more granular steps. We will no longer be able to reasonably constrain the logic inside of these helper functions. So, remove them - with _session_guard(), they no longer serve a real purpose. Although the public API doesn't change, the internal API does. Now that there are no intermediary methods between e.g. connect() and _do_connect(), there's no hook where the runstate is set. As a result, the test suite changes a little to cope with the new semantics of _do_accept() and _do_connect(). Lastly, take some pieces of the now-deleted docstrings and move them up to the public interface level. They were a little more detailed, and it won't hurt to keep them. Signed-off-by: John Snow Acked-by: Kevin Wolf Reviewed-by: Daniel P. Berrangé Message-id: 20220225205948.3693480-4-jsnow@redhat.com Signed-off-by: John Snow Signed-off-by: Li Zhang --- python/qemu/aqmp/protocol.py | 117 ++++++++++++++--------------------- python/tests/protocol.py | 10 ++- 2 files changed, 53 insertions(+), 74 deletions(-) diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py index 73719257e058b7e9e4d8a281bcd9..b7e5e635d886db0efc85f829f42e 100644 --- a/python/qemu/aqmp/protocol.py +++ b/python/qemu/aqmp/protocol.py @@ -275,13 +275,25 @@ class AsyncProtocol(Generic[T]): If this call fails, `runstate` is guaranteed to be set back to `IDLE`. :param address: - Address to listen to; UNIX socket path or TCP address/port. + Address to listen on; UNIX socket path or TCP address/port. :param ssl: SSL context to use, if any. :raise StateError: When the `Runstate` is not `IDLE`. - :raise ConnectError: If a connection could not be accepted. + :raise ConnectError: + When a connection or session cannot be established. + + This exception will wrap a more concrete one. In most cases, + the wrapped exception will be `OSError` or `EOFError`. If a + protocol-level failure occurs while establishing a new + session, the wrapped error may also be an `QMPError`. """ - await self._new_session(address, ssl, accept=True) + await self._session_guard( + self._do_accept(address, ssl), + 'Failed to establish connection') + await self._session_guard( + self._establish_session(), + 'Failed to establish session') + assert self.runstate == Runstate.RUNNING @upper_half @require(Runstate.IDLE) @@ -297,9 +309,21 @@ class AsyncProtocol(Generic[T]): :param ssl: SSL context to use, if any. :raise StateError: When the `Runstate` is not `IDLE`. - :raise ConnectError: If a connection cannot be made to the server. + :raise ConnectError: + When a connection or session cannot be established. + + This exception will wrap a more concrete one. In most cases, + the wrapped exception will be `OSError` or `EOFError`. If a + protocol-level failure occurs while establishing a new + session, the wrapped error may also be an `QMPError`. """ - await self._new_session(address, ssl) + await self._session_guard( + self._do_connect(address, ssl), + 'Failed to establish connection') + await self._session_guard( + self._establish_session(), + 'Failed to establish session') + assert self.runstate == Runstate.RUNNING @upper_half async def disconnect(self) -> None: @@ -401,73 +425,6 @@ class AsyncProtocol(Generic[T]): self._runstate_event.set() self._runstate_event.clear() - @upper_half - async def _new_session(self, - address: SocketAddrT, - ssl: Optional[SSLContext] = None, - accept: bool = False) -> None: - """ - Establish a new connection and initialize the session. - - Connect or accept a new connection, then begin the protocol - session machinery. If this call fails, `runstate` is guaranteed - to be set back to `IDLE`. - - :param address: - Address to connect to/listen on; - UNIX socket path or TCP address/port. - :param ssl: SSL context to use, if any. - :param accept: Accept a connection instead of connecting when `True`. - - :raise ConnectError: - When a connection or session cannot be established. - - This exception will wrap a more concrete one. In most cases, - the wrapped exception will be `OSError` or `EOFError`. If a - protocol-level failure occurs while establishing a new - session, the wrapped error may also be an `QMPError`. - """ - assert self.runstate == Runstate.IDLE - - await self._session_guard( - self._establish_connection(address, ssl, accept), - 'Failed to establish connection') - - await self._session_guard( - self._establish_session(), - 'Failed to establish session') - - assert self.runstate == Runstate.RUNNING - - @upper_half - async def _establish_connection( - self, - address: SocketAddrT, - ssl: Optional[SSLContext] = None, - accept: bool = False - ) -> None: - """ - Establish a new connection. - - :param address: - Address to connect to/listen on; - UNIX socket path or TCP address/port. - :param ssl: SSL context to use, if any. - :param accept: Accept a connection instead of connecting when `True`. - """ - assert self.runstate == Runstate.IDLE - self._set_state(Runstate.CONNECTING) - - # Allow runstate watchers to witness 'CONNECTING' state; some - # failures in the streaming layer are synchronous and will not - # otherwise yield. - await asyncio.sleep(0) - - if accept: - await self._do_accept(address, ssl) - else: - await self._do_connect(address, ssl) - def _bind_hack(self, address: Union[str, Tuple[str, int]]) -> None: """ Used to create a socket in advance of accept(). @@ -508,6 +465,9 @@ class AsyncProtocol(Generic[T]): :raise OSError: For stream-related errors. """ + assert self.runstate == Runstate.IDLE + self._set_state(Runstate.CONNECTING) + self.logger.debug("Awaiting connection on %s ...", address) connected = asyncio.Event() server: Optional[asyncio.AbstractServer] = None @@ -550,6 +510,11 @@ class AsyncProtocol(Generic[T]): sock=self._sock, ) + # Allow runstate watchers to witness 'CONNECTING' state; some + # failures in the streaming layer are synchronous and will not + # otherwise yield. + await asyncio.sleep(0) + server = await coro # Starts listening await connected.wait() # Waits for the callback to fire (and finish) assert server is None @@ -569,6 +534,14 @@ class AsyncProtocol(Generic[T]): :raise OSError: For stream-related errors. """ + assert self.runstate == Runstate.IDLE + self._set_state(Runstate.CONNECTING) + + # Allow runstate watchers to witness 'CONNECTING' state; some + # failures in the streaming layer are synchronous and will not + # otherwise yield. + await asyncio.sleep(0) + self.logger.debug("Connecting to %s ...", address) if isinstance(address, tuple): diff --git a/python/tests/protocol.py b/python/tests/protocol.py index 354d6559b9d1e3dc3ad29598af3c..8dd26c4ed1e0973b8058604c2373 100644 --- a/python/tests/protocol.py +++ b/python/tests/protocol.py @@ -42,11 +42,17 @@ class NullProtocol(AsyncProtocol[None]): await super()._establish_session() async def _do_accept(self, address, ssl=None): - if not self.fake_session: + if self.fake_session: + self._set_state(Runstate.CONNECTING) + await asyncio.sleep(0) + else: await super()._do_accept(address, ssl) async def _do_connect(self, address, ssl=None): - if not self.fake_session: + if self.fake_session: + self._set_state(Runstate.CONNECTING) + await asyncio.sleep(0) + else: await super()._do_connect(address, ssl) async def _do_recv(self) -> None: