qemu/python-aqmp-split-_client_connected_cb-o.patch
Li Zhang 01f9005feb Accepting request 966963 from home:lizhang:branches:Virtualization
- Backport aqmp patches from upstream which can fix iotest issues
* Patches added:
  python-aqmp-add-__del__-method-to-legacy.patch
  python-aqmp-add-_session_guard.patch
  python-aqmp-add-SocketAddrT-to-package-r.patch
  python-aqmp-add-socket-bind-step-to-lega.patch
  python-aqmp-add-start_server-and-accept-.patch
  python-aqmp-copy-type-definitions-from-q.patch
  python-aqmp-drop-_bind_hack.patch
  python-aqmp-fix-docstring-typo.patch
  python-aqmp-Fix-negotiation-with-pre-oob.patch
  python-aqmp-fix-race-condition-in-legacy.patch
  Python-aqmp-fix-type-definitions-for-myp.patch
  python-aqmp-handle-asyncio.TimeoutError-.patch
  python-aqmp-refactor-_do_accept-into-two.patch
  python-aqmp-remove-_new_session-and-_est.patch
  python-aqmp-rename-accept-to-start_serve.patch
  python-aqmp-rename-AQMPError-to-QMPError.patch
  python-aqmp-split-_client_connected_cb-o.patch
  python-aqmp-squelch-pylint-warning-for-t.patch
  python-aqmp-stop-the-server-during-disco.patch
  python-introduce-qmp-shell-wrap-convenie.patch
  python-machine-raise-VMLaunchFailure-exc.patch
  python-move-qmp-shell-under-the-AQMP-pac.patch
  python-move-qmp-utilities-to-python-qemu.patch
  python-qmp-switch-qmp-shell-to-AQMP.patch
  python-support-recording-QMP-session-to-.patch
  python-upgrade-mypy-to-0.780.patch
- Drop the patches which are workaround to fix iotest issues
* Patches dropped:
  Revert-python-iotests-replace-qmp-with-a.patch
  Revert-python-machine-add-instance-disam.patch
  Revert-python-machine-add-sock_dir-prope.patch
  Revert-python-machine-handle-fast-QEMU-t.patch
  Revert-python-machine-move-more-variable.patch
  Revert-python-machine-remove-_remove_mon.patch

OBS-URL: https://build.opensuse.org/request/show/966963
OBS-URL: https://build.opensuse.org/package/show/Virtualization/qemu?expand=0&rev=708
2022-04-06 08:07:07 +00:00

150 lines
5.8 KiB
Diff

From: John Snow <jsnow@redhat.com>
Date: Fri, 25 Feb 2022 15:59:42 -0500
Subject: python/aqmp: split _client_connected_cb() out as _incoming()
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Git-commit: 830e6fd36e2aef37b158a10dea6c3853ce43b20c
As part of disentangling the monolithic nature of _do_accept(), split
out the incoming callback to prepare for factoring out the "wait for a
peer" step. Namely, this means using an event signal we can wait on from
outside of this method.
Signed-off-by: John Snow <jsnow@redhat.com>
Acked-by: Kevin Wolf <kwolf@redhat.com>
Reviewed-by: Daniel P. Berrangé <berrange@redhat.com>
Message-id: 20220225205948.3693480-5-jsnow@redhat.com
Signed-off-by: John Snow <jsnow@redhat.com>
Signed-off-by: Li Zhang <lizhang@suse.de>
---
python/qemu/aqmp/protocol.py | 83 +++++++++++++++++++++++++-----------
1 file changed, 58 insertions(+), 25 deletions(-)
diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py
index b7e5e635d886db0efc85f829f42e..56f05b90308c44a86d0978fd2ce6 100644
--- a/python/qemu/aqmp/protocol.py
+++ b/python/qemu/aqmp/protocol.py
@@ -242,6 +242,10 @@ class AsyncProtocol(Generic[T]):
# Workaround for bind()
self._sock: Optional[socket.socket] = None
+ # Server state for start_server() and _incoming()
+ self._server: Optional[asyncio.AbstractServer] = None
+ self._accepted: Optional[asyncio.Event] = None
+
def __repr__(self) -> str:
cls_name = type(self).__name__
tokens = []
@@ -425,6 +429,54 @@ class AsyncProtocol(Generic[T]):
self._runstate_event.set()
self._runstate_event.clear()
+ @bottom_half # However, it does not run from the R/W tasks.
+ async def _stop_server(self) -> None:
+ """
+ Stop listening for / accepting new incoming connections.
+ """
+ if self._server is None:
+ return
+
+ try:
+ self.logger.debug("Stopping server.")
+ self._server.close()
+ await self._server.wait_closed()
+ self.logger.debug("Server stopped.")
+ finally:
+ self._server = None
+
+ @bottom_half # However, it does not run from the R/W tasks.
+ async def _incoming(self,
+ reader: asyncio.StreamReader,
+ writer: asyncio.StreamWriter) -> None:
+ """
+ Accept an incoming connection and signal the upper_half.
+
+ This method does the minimum necessary to accept a single
+ incoming connection. It signals back to the upper_half ASAP so
+ that any errors during session initialization can occur
+ naturally in the caller's stack.
+
+ :param reader: Incoming `asyncio.StreamReader`
+ :param writer: Incoming `asyncio.StreamWriter`
+ """
+ peer = writer.get_extra_info('peername', 'Unknown peer')
+ self.logger.debug("Incoming connection from %s", peer)
+
+ if self._reader or self._writer:
+ # Sadly, we can have more than one pending connection
+ # because of https://bugs.python.org/issue46715
+ # Close any extra connections we don't actually want.
+ self.logger.warning("Extraneous connection inadvertently accepted")
+ writer.close()
+ return
+
+ # A connection has been accepted; stop listening for new ones.
+ assert self._accepted is not None
+ await self._stop_server()
+ self._reader, self._writer = (reader, writer)
+ self._accepted.set()
+
def _bind_hack(self, address: Union[str, Tuple[str, int]]) -> None:
"""
Used to create a socket in advance of accept().
@@ -469,30 +521,11 @@ class AsyncProtocol(Generic[T]):
self._set_state(Runstate.CONNECTING)
self.logger.debug("Awaiting connection on %s ...", address)
- connected = asyncio.Event()
- server: Optional[asyncio.AbstractServer] = None
-
- async def _client_connected_cb(reader: asyncio.StreamReader,
- writer: asyncio.StreamWriter) -> None:
- """Used to accept a single incoming connection, see below."""
- nonlocal server
- nonlocal connected
-
- # A connection has been accepted; stop listening for new ones.
- assert server is not None
- server.close()
- await server.wait_closed()
- server = None
-
- # Register this client as being connected
- self._reader, self._writer = (reader, writer)
-
- # Signal back: We've accepted a client!
- connected.set()
+ self._accepted = asyncio.Event()
if isinstance(address, tuple):
coro = asyncio.start_server(
- _client_connected_cb,
+ self._incoming,
host=None if self._sock else address[0],
port=None if self._sock else address[1],
ssl=ssl,
@@ -502,7 +535,7 @@ class AsyncProtocol(Generic[T]):
)
else:
coro = asyncio.start_unix_server(
- _client_connected_cb,
+ self._incoming,
path=None if self._sock else address,
ssl=ssl,
backlog=1,
@@ -515,9 +548,9 @@ class AsyncProtocol(Generic[T]):
# otherwise yield.
await asyncio.sleep(0)
- server = await coro # Starts listening
- await connected.wait() # Waits for the callback to fire (and finish)
- assert server is None
+ self._server = await coro # Starts listening
+ await self._accepted.wait() # Waits for the callback to finish
+ assert self._server is None
self._sock = None
self.logger.debug("Connection accepted.")