01f9005feb
- Backport aqmp patches from upstream which can fix iotest issues * Patches added: python-aqmp-add-__del__-method-to-legacy.patch python-aqmp-add-_session_guard.patch python-aqmp-add-SocketAddrT-to-package-r.patch python-aqmp-add-socket-bind-step-to-lega.patch python-aqmp-add-start_server-and-accept-.patch python-aqmp-copy-type-definitions-from-q.patch python-aqmp-drop-_bind_hack.patch python-aqmp-fix-docstring-typo.patch python-aqmp-Fix-negotiation-with-pre-oob.patch python-aqmp-fix-race-condition-in-legacy.patch Python-aqmp-fix-type-definitions-for-myp.patch python-aqmp-handle-asyncio.TimeoutError-.patch python-aqmp-refactor-_do_accept-into-two.patch python-aqmp-remove-_new_session-and-_est.patch python-aqmp-rename-accept-to-start_serve.patch python-aqmp-rename-AQMPError-to-QMPError.patch python-aqmp-split-_client_connected_cb-o.patch python-aqmp-squelch-pylint-warning-for-t.patch python-aqmp-stop-the-server-during-disco.patch python-introduce-qmp-shell-wrap-convenie.patch python-machine-raise-VMLaunchFailure-exc.patch python-move-qmp-shell-under-the-AQMP-pac.patch python-move-qmp-utilities-to-python-qemu.patch python-qmp-switch-qmp-shell-to-AQMP.patch python-support-recording-QMP-session-to-.patch python-upgrade-mypy-to-0.780.patch - Drop the patches which are workaround to fix iotest issues * Patches dropped: Revert-python-iotests-replace-qmp-with-a.patch Revert-python-machine-add-instance-disam.patch Revert-python-machine-add-sock_dir-prope.patch Revert-python-machine-handle-fast-QEMU-t.patch Revert-python-machine-move-more-variable.patch Revert-python-machine-remove-_remove_mon.patch OBS-URL: https://build.opensuse.org/request/show/966963 OBS-URL: https://build.opensuse.org/package/show/Virtualization/qemu?expand=0&rev=708
232 lines
8.9 KiB
Diff
232 lines
8.9 KiB
Diff
From: John Snow <jsnow@redhat.com>
|
|
Date: Fri, 25 Feb 2022 15:59:41 -0500
|
|
Subject: python/aqmp: remove _new_session and _establish_connection
|
|
MIME-Version: 1.0
|
|
Content-Type: text/plain; charset=UTF-8
|
|
Content-Transfer-Encoding: 8bit
|
|
|
|
Git-commit: 68a6cf3ffe3532c0655efbbf5910bd99a1b4a3fa
|
|
|
|
These two methods attempted to entirely envelop the logic of
|
|
establishing a connection to a peer start to finish. However, we need to
|
|
break apart the incoming connection step into more granular steps. We
|
|
will no longer be able to reasonably constrain the logic inside of these
|
|
helper functions.
|
|
|
|
So, remove them - with _session_guard(), they no longer serve a real
|
|
purpose.
|
|
|
|
Although the public API doesn't change, the internal API does. Now that
|
|
there are no intermediary methods between e.g. connect() and
|
|
_do_connect(), there's no hook where the runstate is set. As a result,
|
|
the test suite changes a little to cope with the new semantics of
|
|
_do_accept() and _do_connect().
|
|
|
|
Lastly, take some pieces of the now-deleted docstrings and move
|
|
them up to the public interface level. They were a little more detailed,
|
|
and it won't hurt to keep them.
|
|
|
|
Signed-off-by: John Snow <jsnow@redhat.com>
|
|
Acked-by: Kevin Wolf <kwolf@redhat.com>
|
|
Reviewed-by: Daniel P. Berrangé <berrange@redhat.com>
|
|
Message-id: 20220225205948.3693480-4-jsnow@redhat.com
|
|
Signed-off-by: John Snow <jsnow@redhat.com>
|
|
Signed-off-by: Li Zhang <lizhang@suse.de>
|
|
---
|
|
python/qemu/aqmp/protocol.py | 117 ++++++++++++++---------------------
|
|
python/tests/protocol.py | 10 ++-
|
|
2 files changed, 53 insertions(+), 74 deletions(-)
|
|
|
|
diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py
|
|
index 73719257e058b7e9e4d8a281bcd9..b7e5e635d886db0efc85f829f42e 100644
|
|
--- a/python/qemu/aqmp/protocol.py
|
|
+++ b/python/qemu/aqmp/protocol.py
|
|
@@ -275,13 +275,25 @@ class AsyncProtocol(Generic[T]):
|
|
If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
|
|
|
|
:param address:
|
|
- Address to listen to; UNIX socket path or TCP address/port.
|
|
+ Address to listen on; UNIX socket path or TCP address/port.
|
|
:param ssl: SSL context to use, if any.
|
|
|
|
:raise StateError: When the `Runstate` is not `IDLE`.
|
|
- :raise ConnectError: If a connection could not be accepted.
|
|
+ :raise ConnectError:
|
|
+ When a connection or session cannot be established.
|
|
+
|
|
+ This exception will wrap a more concrete one. In most cases,
|
|
+ the wrapped exception will be `OSError` or `EOFError`. If a
|
|
+ protocol-level failure occurs while establishing a new
|
|
+ session, the wrapped error may also be an `QMPError`.
|
|
"""
|
|
- await self._new_session(address, ssl, accept=True)
|
|
+ await self._session_guard(
|
|
+ self._do_accept(address, ssl),
|
|
+ 'Failed to establish connection')
|
|
+ await self._session_guard(
|
|
+ self._establish_session(),
|
|
+ 'Failed to establish session')
|
|
+ assert self.runstate == Runstate.RUNNING
|
|
|
|
@upper_half
|
|
@require(Runstate.IDLE)
|
|
@@ -297,9 +309,21 @@ class AsyncProtocol(Generic[T]):
|
|
:param ssl: SSL context to use, if any.
|
|
|
|
:raise StateError: When the `Runstate` is not `IDLE`.
|
|
- :raise ConnectError: If a connection cannot be made to the server.
|
|
+ :raise ConnectError:
|
|
+ When a connection or session cannot be established.
|
|
+
|
|
+ This exception will wrap a more concrete one. In most cases,
|
|
+ the wrapped exception will be `OSError` or `EOFError`. If a
|
|
+ protocol-level failure occurs while establishing a new
|
|
+ session, the wrapped error may also be an `QMPError`.
|
|
"""
|
|
- await self._new_session(address, ssl)
|
|
+ await self._session_guard(
|
|
+ self._do_connect(address, ssl),
|
|
+ 'Failed to establish connection')
|
|
+ await self._session_guard(
|
|
+ self._establish_session(),
|
|
+ 'Failed to establish session')
|
|
+ assert self.runstate == Runstate.RUNNING
|
|
|
|
@upper_half
|
|
async def disconnect(self) -> None:
|
|
@@ -401,73 +425,6 @@ class AsyncProtocol(Generic[T]):
|
|
self._runstate_event.set()
|
|
self._runstate_event.clear()
|
|
|
|
- @upper_half
|
|
- async def _new_session(self,
|
|
- address: SocketAddrT,
|
|
- ssl: Optional[SSLContext] = None,
|
|
- accept: bool = False) -> None:
|
|
- """
|
|
- Establish a new connection and initialize the session.
|
|
-
|
|
- Connect or accept a new connection, then begin the protocol
|
|
- session machinery. If this call fails, `runstate` is guaranteed
|
|
- to be set back to `IDLE`.
|
|
-
|
|
- :param address:
|
|
- Address to connect to/listen on;
|
|
- UNIX socket path or TCP address/port.
|
|
- :param ssl: SSL context to use, if any.
|
|
- :param accept: Accept a connection instead of connecting when `True`.
|
|
-
|
|
- :raise ConnectError:
|
|
- When a connection or session cannot be established.
|
|
-
|
|
- This exception will wrap a more concrete one. In most cases,
|
|
- the wrapped exception will be `OSError` or `EOFError`. If a
|
|
- protocol-level failure occurs while establishing a new
|
|
- session, the wrapped error may also be an `QMPError`.
|
|
- """
|
|
- assert self.runstate == Runstate.IDLE
|
|
-
|
|
- await self._session_guard(
|
|
- self._establish_connection(address, ssl, accept),
|
|
- 'Failed to establish connection')
|
|
-
|
|
- await self._session_guard(
|
|
- self._establish_session(),
|
|
- 'Failed to establish session')
|
|
-
|
|
- assert self.runstate == Runstate.RUNNING
|
|
-
|
|
- @upper_half
|
|
- async def _establish_connection(
|
|
- self,
|
|
- address: SocketAddrT,
|
|
- ssl: Optional[SSLContext] = None,
|
|
- accept: bool = False
|
|
- ) -> None:
|
|
- """
|
|
- Establish a new connection.
|
|
-
|
|
- :param address:
|
|
- Address to connect to/listen on;
|
|
- UNIX socket path or TCP address/port.
|
|
- :param ssl: SSL context to use, if any.
|
|
- :param accept: Accept a connection instead of connecting when `True`.
|
|
- """
|
|
- assert self.runstate == Runstate.IDLE
|
|
- self._set_state(Runstate.CONNECTING)
|
|
-
|
|
- # Allow runstate watchers to witness 'CONNECTING' state; some
|
|
- # failures in the streaming layer are synchronous and will not
|
|
- # otherwise yield.
|
|
- await asyncio.sleep(0)
|
|
-
|
|
- if accept:
|
|
- await self._do_accept(address, ssl)
|
|
- else:
|
|
- await self._do_connect(address, ssl)
|
|
-
|
|
def _bind_hack(self, address: Union[str, Tuple[str, int]]) -> None:
|
|
"""
|
|
Used to create a socket in advance of accept().
|
|
@@ -508,6 +465,9 @@ class AsyncProtocol(Generic[T]):
|
|
|
|
:raise OSError: For stream-related errors.
|
|
"""
|
|
+ assert self.runstate == Runstate.IDLE
|
|
+ self._set_state(Runstate.CONNECTING)
|
|
+
|
|
self.logger.debug("Awaiting connection on %s ...", address)
|
|
connected = asyncio.Event()
|
|
server: Optional[asyncio.AbstractServer] = None
|
|
@@ -550,6 +510,11 @@ class AsyncProtocol(Generic[T]):
|
|
sock=self._sock,
|
|
)
|
|
|
|
+ # Allow runstate watchers to witness 'CONNECTING' state; some
|
|
+ # failures in the streaming layer are synchronous and will not
|
|
+ # otherwise yield.
|
|
+ await asyncio.sleep(0)
|
|
+
|
|
server = await coro # Starts listening
|
|
await connected.wait() # Waits for the callback to fire (and finish)
|
|
assert server is None
|
|
@@ -569,6 +534,14 @@ class AsyncProtocol(Generic[T]):
|
|
|
|
:raise OSError: For stream-related errors.
|
|
"""
|
|
+ assert self.runstate == Runstate.IDLE
|
|
+ self._set_state(Runstate.CONNECTING)
|
|
+
|
|
+ # Allow runstate watchers to witness 'CONNECTING' state; some
|
|
+ # failures in the streaming layer are synchronous and will not
|
|
+ # otherwise yield.
|
|
+ await asyncio.sleep(0)
|
|
+
|
|
self.logger.debug("Connecting to %s ...", address)
|
|
|
|
if isinstance(address, tuple):
|
|
diff --git a/python/tests/protocol.py b/python/tests/protocol.py
|
|
index 354d6559b9d1e3dc3ad29598af3c..8dd26c4ed1e0973b8058604c2373 100644
|
|
--- a/python/tests/protocol.py
|
|
+++ b/python/tests/protocol.py
|
|
@@ -42,11 +42,17 @@ class NullProtocol(AsyncProtocol[None]):
|
|
await super()._establish_session()
|
|
|
|
async def _do_accept(self, address, ssl=None):
|
|
- if not self.fake_session:
|
|
+ if self.fake_session:
|
|
+ self._set_state(Runstate.CONNECTING)
|
|
+ await asyncio.sleep(0)
|
|
+ else:
|
|
await super()._do_accept(address, ssl)
|
|
|
|
async def _do_connect(self, address, ssl=None):
|
|
- if not self.fake_session:
|
|
+ if self.fake_session:
|
|
+ self._set_state(Runstate.CONNECTING)
|
|
+ await asyncio.sleep(0)
|
|
+ else:
|
|
await super()._do_connect(address, ssl)
|
|
|
|
async def _do_recv(self) -> None:
|