SHA256
1
0
forked from pool/qemu
qemu/python-move-qmp-shell-under-the-AQMP-pac.patch
2022-05-27 12:52:53 +00:00

1144 lines
38 KiB
Diff

From: John Snow <jsnow@redhat.com>
Date: Mon, 10 Jan 2022 18:28:55 -0500
Subject: python: move qmp-shell under the AQMP package
Git-commit: fd9c3a6219b0470c356c8486188052d353846806
Signed-off-by: John Snow <jsnow@redhat.com>
Reviewed-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
Reviewed-by: Beraldo Leal <bleal@redhat.com>
Signed-off-by: Li Zhang <lizhang@suse.de>
---
python/README.rst | 2 +-
python/qemu/aqmp/qmp_shell.py | 537 ++++++++++++++++++++++++++++++++++
python/qemu/qmp/qmp_shell.py | 537 ----------------------------------
python/setup.cfg | 2 +-
scripts/qmp/qmp-shell | 2 +-
5 files changed, 540 insertions(+), 540 deletions(-)
diff --git a/python/README.rst b/python/README.rst
index 9c1fceaee73b11cb313b71a9a558..fcf74f69eae011aafd1e089e1f92 100644
--- a/python/README.rst
+++ b/python/README.rst
@@ -59,7 +59,7 @@ Package installation also normally provides executable console scripts,
so that tools like ``qmp-shell`` are always available via $PATH. To
invoke them without installation, you can invoke e.g.:
-``> PYTHONPATH=~/src/qemu/python python3 -m qemu.qmp.qmp_shell``
+``> PYTHONPATH=~/src/qemu/python python3 -m qemu.aqmp.qmp_shell``
The mappings between console script name and python module path can be
found in ``setup.cfg``.
diff --git a/python/qemu/aqmp/qmp_shell.py b/python/qemu/aqmp/qmp_shell.py
new file mode 100644
index 0000000000000000000000000000000000000000..d11bf54b00e5d56616ae57be0006b9b0629dd9f4
--- /dev/null
+++ b/python/qemu/aqmp/qmp_shell.py
@@ -0,0 +1,537 @@
+#
+# Copyright (C) 2009, 2010 Red Hat Inc.
+#
+# Authors:
+# Luiz Capitulino <lcapitulino@redhat.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2. See
+# the COPYING file in the top-level directory.
+#
+
+"""
+Low-level QEMU shell on top of QMP.
+
+usage: qmp-shell [-h] [-H] [-N] [-v] [-p] qmp_server
+
+positional arguments:
+ qmp_server < UNIX socket path | TCP address:port >
+
+optional arguments:
+ -h, --help show this help message and exit
+ -H, --hmp Use HMP interface
+ -N, --skip-negotiation
+ Skip negotiate (for qemu-ga)
+ -v, --verbose Verbose (echo commands sent and received)
+ -p, --pretty Pretty-print JSON
+
+
+Start QEMU with:
+
+# qemu [...] -qmp unix:./qmp-sock,server
+
+Run the shell:
+
+$ qmp-shell ./qmp-sock
+
+Commands have the following format:
+
+ < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
+
+For example:
+
+(QEMU) device_add driver=e1000 id=net1
+{'return': {}}
+(QEMU)
+
+key=value pairs also support Python or JSON object literal subset notations,
+without spaces. Dictionaries/objects {} are supported as are arrays [].
+
+ example-command arg-name1={'key':'value','obj'={'prop':"value"}}
+
+Both JSON and Python formatting should work, including both styles of
+string literal quotes. Both paradigms of literal values should work,
+including null/true/false for JSON and None/True/False for Python.
+
+
+Transactions have the following multi-line format:
+
+ transaction(
+ action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ]
+ ...
+ action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ]
+ )
+
+One line transactions are also supported:
+
+ transaction( action-name1 ... )
+
+For example:
+
+ (QEMU) transaction(
+ TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1
+ TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0
+ TRANS> )
+ {"return": {}}
+ (QEMU)
+
+Use the -v and -p options to activate the verbose and pretty-print options,
+which will echo back the properly formatted JSON-compliant QMP that is being
+sent to QEMU, which is useful for debugging and documentation generation.
+"""
+
+import argparse
+import ast
+import json
+import logging
+import os
+import re
+import readline
+import sys
+from typing import (
+ Iterator,
+ List,
+ NoReturn,
+ Optional,
+ Sequence,
+)
+
+from qemu.aqmp import ConnectError, QMPError, SocketAddrT
+from qemu.aqmp.legacy import (
+ QEMUMonitorProtocol,
+ QMPBadPortError,
+ QMPMessage,
+ QMPObject,
+)
+
+
+LOG = logging.getLogger(__name__)
+
+
+class QMPCompleter:
+ """
+ QMPCompleter provides a readline library tab-complete behavior.
+ """
+ # NB: Python 3.9+ will probably allow us to subclass list[str] directly,
+ # but pylint as of today does not know that List[str] is simply 'list'.
+ def __init__(self) -> None:
+ self._matches: List[str] = []
+
+ def append(self, value: str) -> None:
+ """Append a new valid completion to the list of possibilities."""
+ return self._matches.append(value)
+
+ def complete(self, text: str, state: int) -> Optional[str]:
+ """readline.set_completer() callback implementation."""
+ for cmd in self._matches:
+ if cmd.startswith(text):
+ if state == 0:
+ return cmd
+ state -= 1
+ return None
+
+
+class QMPShellError(QMPError):
+ """
+ QMP Shell Base error class.
+ """
+
+
+class FuzzyJSON(ast.NodeTransformer):
+ """
+ This extension of ast.NodeTransformer filters literal "true/false/null"
+ values in a Python AST and replaces them by proper "True/False/None" values
+ that Python can properly evaluate.
+ """
+
+ @classmethod
+ def visit_Name(cls, # pylint: disable=invalid-name
+ node: ast.Name) -> ast.AST:
+ """
+ Transform Name nodes with certain values into Constant (keyword) nodes.
+ """
+ if node.id == 'true':
+ return ast.Constant(value=True)
+ if node.id == 'false':
+ return ast.Constant(value=False)
+ if node.id == 'null':
+ return ast.Constant(value=None)
+ return node
+
+
+class QMPShell(QEMUMonitorProtocol):
+ """
+ QMPShell provides a basic readline-based QMP shell.
+
+ :param address: Address of the QMP server.
+ :param pretty: Pretty-print QMP messages.
+ :param verbose: Echo outgoing QMP messages to console.
+ """
+ def __init__(self, address: SocketAddrT,
+ pretty: bool = False, verbose: bool = False):
+ super().__init__(address)
+ self._greeting: Optional[QMPMessage] = None
+ self._completer = QMPCompleter()
+ self._transmode = False
+ self._actions: List[QMPMessage] = []
+ self._histfile = os.path.join(os.path.expanduser('~'),
+ '.qmp-shell_history')
+ self.pretty = pretty
+ self.verbose = verbose
+
+ def close(self) -> None:
+ # Hook into context manager of parent to save shell history.
+ self._save_history()
+ super().close()
+
+ def _fill_completion(self) -> None:
+ cmds = self.cmd('query-commands')
+ if 'error' in cmds:
+ return
+ for cmd in cmds['return']:
+ self._completer.append(cmd['name'])
+
+ def _completer_setup(self) -> None:
+ self._completer = QMPCompleter()
+ self._fill_completion()
+ readline.set_history_length(1024)
+ readline.set_completer(self._completer.complete)
+ readline.parse_and_bind("tab: complete")
+ # NB: default delimiters conflict with some command names
+ # (eg. query-), clearing everything as it doesn't seem to matter
+ readline.set_completer_delims('')
+ try:
+ readline.read_history_file(self._histfile)
+ except FileNotFoundError:
+ pass
+ except IOError as err:
+ msg = f"Failed to read history '{self._histfile}': {err!s}"
+ LOG.warning(msg)
+
+ def _save_history(self) -> None:
+ try:
+ readline.write_history_file(self._histfile)
+ except IOError as err:
+ msg = f"Failed to save history file '{self._histfile}': {err!s}"
+ LOG.warning(msg)
+
+ @classmethod
+ def _parse_value(cls, val: str) -> object:
+ try:
+ return int(val)
+ except ValueError:
+ pass
+
+ if val.lower() == 'true':
+ return True
+ if val.lower() == 'false':
+ return False
+ if val.startswith(('{', '[')):
+ # Try first as pure JSON:
+ try:
+ return json.loads(val)
+ except ValueError:
+ pass
+ # Try once again as FuzzyJSON:
+ try:
+ tree = ast.parse(val, mode='eval')
+ transformed = FuzzyJSON().visit(tree)
+ return ast.literal_eval(transformed)
+ except (SyntaxError, ValueError):
+ pass
+ return val
+
+ def _cli_expr(self,
+ tokens: Sequence[str],
+ parent: QMPObject) -> None:
+ for arg in tokens:
+ (key, sep, val) = arg.partition('=')
+ if sep != '=':
+ raise QMPShellError(
+ f"Expected a key=value pair, got '{arg!s}'"
+ )
+
+ value = self._parse_value(val)
+ optpath = key.split('.')
+ curpath = []
+ for path in optpath[:-1]:
+ curpath.append(path)
+ obj = parent.get(path, {})
+ if not isinstance(obj, dict):
+ msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
+ raise QMPShellError(msg.format('.'.join(curpath)))
+ parent[path] = obj
+ parent = obj
+ if optpath[-1] in parent:
+ if isinstance(parent[optpath[-1]], dict):
+ msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
+ raise QMPShellError(msg.format('.'.join(curpath)))
+ raise QMPShellError(f'Cannot set "{key}" multiple times')
+ parent[optpath[-1]] = value
+
+ def _build_cmd(self, cmdline: str) -> Optional[QMPMessage]:
+ """
+ Build a QMP input object from a user provided command-line in the
+ following format:
+
+ < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
+ """
+ argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+'''
+ cmdargs = re.findall(argument_regex, cmdline)
+ qmpcmd: QMPMessage
+
+ # Transactional CLI entry:
+ if cmdargs and cmdargs[0] == 'transaction(':
+ self._transmode = True
+ self._actions = []
+ cmdargs.pop(0)
+
+ # Transactional CLI exit:
+ if cmdargs and cmdargs[0] == ')' and self._transmode:
+ self._transmode = False
+ if len(cmdargs) > 1:
+ msg = 'Unexpected input after close of Transaction sub-shell'
+ raise QMPShellError(msg)
+ qmpcmd = {
+ 'execute': 'transaction',
+ 'arguments': {'actions': self._actions}
+ }
+ return qmpcmd
+
+ # No args, or no args remaining
+ if not cmdargs:
+ return None
+
+ if self._transmode:
+ # Parse and cache this Transactional Action
+ finalize = False
+ action = {'type': cmdargs[0], 'data': {}}
+ if cmdargs[-1] == ')':
+ cmdargs.pop(-1)
+ finalize = True
+ self._cli_expr(cmdargs[1:], action['data'])
+ self._actions.append(action)
+ return self._build_cmd(')') if finalize else None
+
+ # Standard command: parse and return it to be executed.
+ qmpcmd = {'execute': cmdargs[0], 'arguments': {}}
+ self._cli_expr(cmdargs[1:], qmpcmd['arguments'])
+ return qmpcmd
+
+ def _print(self, qmp_message: object) -> None:
+ jsobj = json.dumps(qmp_message,
+ indent=4 if self.pretty else None,
+ sort_keys=self.pretty)
+ print(str(jsobj))
+
+ def _execute_cmd(self, cmdline: str) -> bool:
+ try:
+ qmpcmd = self._build_cmd(cmdline)
+ except QMPShellError as err:
+ print(
+ f"Error while parsing command line: {err!s}\n"
+ "command format: <command-name> "
+ "[arg-name1=arg1] ... [arg-nameN=argN",
+ file=sys.stderr
+ )
+ return True
+ # For transaction mode, we may have just cached the action:
+ if qmpcmd is None:
+ return True
+ if self.verbose:
+ self._print(qmpcmd)
+ resp = self.cmd_obj(qmpcmd)
+ if resp is None:
+ print('Disconnected')
+ return False
+ self._print(resp)
+ return True
+
+ def connect(self, negotiate: bool = True) -> None:
+ self._greeting = super().connect(negotiate)
+ self._completer_setup()
+
+ def show_banner(self,
+ msg: str = 'Welcome to the QMP low-level shell!') -> None:
+ """
+ Print to stdio a greeting, and the QEMU version if available.
+ """
+ print(msg)
+ if not self._greeting:
+ print('Connected')
+ return
+ version = self._greeting['QMP']['version']['qemu']
+ print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version))
+
+ @property
+ def prompt(self) -> str:
+ """
+ Return the current shell prompt, including a trailing space.
+ """
+ if self._transmode:
+ return 'TRANS> '
+ return '(QEMU) '
+
+ def read_exec_command(self) -> bool:
+ """
+ Read and execute a command.
+
+ @return True if execution was ok, return False if disconnected.
+ """
+ try:
+ cmdline = input(self.prompt)
+ except EOFError:
+ print()
+ return False
+
+ if cmdline == '':
+ for event in self.get_events():
+ print(event)
+ return True
+
+ return self._execute_cmd(cmdline)
+
+ def repl(self) -> Iterator[None]:
+ """
+ Return an iterator that implements the REPL.
+ """
+ self.show_banner()
+ while self.read_exec_command():
+ yield
+ self.close()
+
+
+class HMPShell(QMPShell):
+ """
+ HMPShell provides a basic readline-based HMP shell, tunnelled via QMP.
+
+ :param address: Address of the QMP server.
+ :param pretty: Pretty-print QMP messages.
+ :param verbose: Echo outgoing QMP messages to console.
+ """
+ def __init__(self, address: SocketAddrT,
+ pretty: bool = False, verbose: bool = False):
+ super().__init__(address, pretty, verbose)
+ self._cpu_index = 0
+
+ def _cmd_completion(self) -> None:
+ for cmd in self._cmd_passthrough('help')['return'].split('\r\n'):
+ if cmd and cmd[0] != '[' and cmd[0] != '\t':
+ name = cmd.split()[0] # drop help text
+ if name == 'info':
+ continue
+ if name.find('|') != -1:
+ # Command in the form 'foobar|f' or 'f|foobar', take the
+ # full name
+ opt = name.split('|')
+ if len(opt[0]) == 1:
+ name = opt[1]
+ else:
+ name = opt[0]
+ self._completer.append(name)
+ self._completer.append('help ' + name) # help completion
+
+ def _info_completion(self) -> None:
+ for cmd in self._cmd_passthrough('info')['return'].split('\r\n'):
+ if cmd:
+ self._completer.append('info ' + cmd.split()[1])
+
+ def _other_completion(self) -> None:
+ # special cases
+ self._completer.append('help info')
+
+ def _fill_completion(self) -> None:
+ self._cmd_completion()
+ self._info_completion()
+ self._other_completion()
+
+ def _cmd_passthrough(self, cmdline: str,
+ cpu_index: int = 0) -> QMPMessage:
+ return self.cmd_obj({
+ 'execute': 'human-monitor-command',
+ 'arguments': {
+ 'command-line': cmdline,
+ 'cpu-index': cpu_index
+ }
+ })
+
+ def _execute_cmd(self, cmdline: str) -> bool:
+ if cmdline.split()[0] == "cpu":
+ # trap the cpu command, it requires special setting
+ try:
+ idx = int(cmdline.split()[1])
+ if 'return' not in self._cmd_passthrough('info version', idx):
+ print('bad CPU index')
+ return True
+ self._cpu_index = idx
+ except ValueError:
+ print('cpu command takes an integer argument')
+ return True
+ resp = self._cmd_passthrough(cmdline, self._cpu_index)
+ if resp is None:
+ print('Disconnected')
+ return False
+ assert 'return' in resp or 'error' in resp
+ if 'return' in resp:
+ # Success
+ if len(resp['return']) > 0:
+ print(resp['return'], end=' ')
+ else:
+ # Error
+ print('%s: %s' % (resp['error']['class'], resp['error']['desc']))
+ return True
+
+ def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None:
+ QMPShell.show_banner(self, msg)
+
+
+def die(msg: str) -> NoReturn:
+ """Write an error to stderr, then exit with a return code of 1."""
+ sys.stderr.write('ERROR: %s\n' % msg)
+ sys.exit(1)
+
+
+def main() -> None:
+ """
+ qmp-shell entry point: parse command line arguments and start the REPL.
+ """
+ parser = argparse.ArgumentParser()
+ parser.add_argument('-H', '--hmp', action='store_true',
+ help='Use HMP interface')
+ parser.add_argument('-N', '--skip-negotiation', action='store_true',
+ help='Skip negotiate (for qemu-ga)')
+ parser.add_argument('-v', '--verbose', action='store_true',
+ help='Verbose (echo commands sent and received)')
+ parser.add_argument('-p', '--pretty', action='store_true',
+ help='Pretty-print JSON')
+
+ default_server = os.environ.get('QMP_SOCKET')
+ parser.add_argument('qmp_server', action='store',
+ default=default_server,
+ help='< UNIX socket path | TCP address:port >')
+
+ args = parser.parse_args()
+ if args.qmp_server is None:
+ parser.error("QMP socket or TCP address must be specified")
+
+ shell_class = HMPShell if args.hmp else QMPShell
+
+ try:
+ address = shell_class.parse_address(args.qmp_server)
+ except QMPBadPortError:
+ parser.error(f"Bad port number: {args.qmp_server}")
+ return # pycharm doesn't know error() is noreturn
+
+ with shell_class(address, args.pretty, args.verbose) as qemu:
+ try:
+ qemu.connect(negotiate=not args.skip_negotiation)
+ except ConnectError as err:
+ if isinstance(err.exc, OSError):
+ die(f"Couldn't connect to {args.qmp_server}: {err!s}")
+ die(str(err))
+
+ for _ in qemu.repl():
+ pass
+
+
+if __name__ == '__main__':
+ main()
diff --git a/python/qemu/qmp/qmp_shell.py b/python/qemu/qmp/qmp_shell.py
deleted file mode 100644
index d11bf54b00e5d56616ae57be0006b9b0629dd9f4..0000000000000000000000000000000000000000
--- a/python/qemu/qmp/qmp_shell.py
+++ /dev/null
@@ -1,537 +0,0 @@
-#
-# Copyright (C) 2009, 2010 Red Hat Inc.
-#
-# Authors:
-# Luiz Capitulino <lcapitulino@redhat.com>
-#
-# This work is licensed under the terms of the GNU GPL, version 2. See
-# the COPYING file in the top-level directory.
-#
-
-"""
-Low-level QEMU shell on top of QMP.
-
-usage: qmp-shell [-h] [-H] [-N] [-v] [-p] qmp_server
-
-positional arguments:
- qmp_server < UNIX socket path | TCP address:port >
-
-optional arguments:
- -h, --help show this help message and exit
- -H, --hmp Use HMP interface
- -N, --skip-negotiation
- Skip negotiate (for qemu-ga)
- -v, --verbose Verbose (echo commands sent and received)
- -p, --pretty Pretty-print JSON
-
-
-Start QEMU with:
-
-# qemu [...] -qmp unix:./qmp-sock,server
-
-Run the shell:
-
-$ qmp-shell ./qmp-sock
-
-Commands have the following format:
-
- < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
-
-For example:
-
-(QEMU) device_add driver=e1000 id=net1
-{'return': {}}
-(QEMU)
-
-key=value pairs also support Python or JSON object literal subset notations,
-without spaces. Dictionaries/objects {} are supported as are arrays [].
-
- example-command arg-name1={'key':'value','obj'={'prop':"value"}}
-
-Both JSON and Python formatting should work, including both styles of
-string literal quotes. Both paradigms of literal values should work,
-including null/true/false for JSON and None/True/False for Python.
-
-
-Transactions have the following multi-line format:
-
- transaction(
- action-name1 [ arg-name1=arg1 ] ... [arg-nameN=argN ]
- ...
- action-nameN [ arg-name1=arg1 ] ... [arg-nameN=argN ]
- )
-
-One line transactions are also supported:
-
- transaction( action-name1 ... )
-
-For example:
-
- (QEMU) transaction(
- TRANS> block-dirty-bitmap-add node=drive0 name=bitmap1
- TRANS> block-dirty-bitmap-clear node=drive0 name=bitmap0
- TRANS> )
- {"return": {}}
- (QEMU)
-
-Use the -v and -p options to activate the verbose and pretty-print options,
-which will echo back the properly formatted JSON-compliant QMP that is being
-sent to QEMU, which is useful for debugging and documentation generation.
-"""
-
-import argparse
-import ast
-import json
-import logging
-import os
-import re
-import readline
-import sys
-from typing import (
- Iterator,
- List,
- NoReturn,
- Optional,
- Sequence,
-)
-
-from qemu.aqmp import ConnectError, QMPError, SocketAddrT
-from qemu.aqmp.legacy import (
- QEMUMonitorProtocol,
- QMPBadPortError,
- QMPMessage,
- QMPObject,
-)
-
-
-LOG = logging.getLogger(__name__)
-
-
-class QMPCompleter:
- """
- QMPCompleter provides a readline library tab-complete behavior.
- """
- # NB: Python 3.9+ will probably allow us to subclass list[str] directly,
- # but pylint as of today does not know that List[str] is simply 'list'.
- def __init__(self) -> None:
- self._matches: List[str] = []
-
- def append(self, value: str) -> None:
- """Append a new valid completion to the list of possibilities."""
- return self._matches.append(value)
-
- def complete(self, text: str, state: int) -> Optional[str]:
- """readline.set_completer() callback implementation."""
- for cmd in self._matches:
- if cmd.startswith(text):
- if state == 0:
- return cmd
- state -= 1
- return None
-
-
-class QMPShellError(QMPError):
- """
- QMP Shell Base error class.
- """
-
-
-class FuzzyJSON(ast.NodeTransformer):
- """
- This extension of ast.NodeTransformer filters literal "true/false/null"
- values in a Python AST and replaces them by proper "True/False/None" values
- that Python can properly evaluate.
- """
-
- @classmethod
- def visit_Name(cls, # pylint: disable=invalid-name
- node: ast.Name) -> ast.AST:
- """
- Transform Name nodes with certain values into Constant (keyword) nodes.
- """
- if node.id == 'true':
- return ast.Constant(value=True)
- if node.id == 'false':
- return ast.Constant(value=False)
- if node.id == 'null':
- return ast.Constant(value=None)
- return node
-
-
-class QMPShell(QEMUMonitorProtocol):
- """
- QMPShell provides a basic readline-based QMP shell.
-
- :param address: Address of the QMP server.
- :param pretty: Pretty-print QMP messages.
- :param verbose: Echo outgoing QMP messages to console.
- """
- def __init__(self, address: SocketAddrT,
- pretty: bool = False, verbose: bool = False):
- super().__init__(address)
- self._greeting: Optional[QMPMessage] = None
- self._completer = QMPCompleter()
- self._transmode = False
- self._actions: List[QMPMessage] = []
- self._histfile = os.path.join(os.path.expanduser('~'),
- '.qmp-shell_history')
- self.pretty = pretty
- self.verbose = verbose
-
- def close(self) -> None:
- # Hook into context manager of parent to save shell history.
- self._save_history()
- super().close()
-
- def _fill_completion(self) -> None:
- cmds = self.cmd('query-commands')
- if 'error' in cmds:
- return
- for cmd in cmds['return']:
- self._completer.append(cmd['name'])
-
- def _completer_setup(self) -> None:
- self._completer = QMPCompleter()
- self._fill_completion()
- readline.set_history_length(1024)
- readline.set_completer(self._completer.complete)
- readline.parse_and_bind("tab: complete")
- # NB: default delimiters conflict with some command names
- # (eg. query-), clearing everything as it doesn't seem to matter
- readline.set_completer_delims('')
- try:
- readline.read_history_file(self._histfile)
- except FileNotFoundError:
- pass
- except IOError as err:
- msg = f"Failed to read history '{self._histfile}': {err!s}"
- LOG.warning(msg)
-
- def _save_history(self) -> None:
- try:
- readline.write_history_file(self._histfile)
- except IOError as err:
- msg = f"Failed to save history file '{self._histfile}': {err!s}"
- LOG.warning(msg)
-
- @classmethod
- def _parse_value(cls, val: str) -> object:
- try:
- return int(val)
- except ValueError:
- pass
-
- if val.lower() == 'true':
- return True
- if val.lower() == 'false':
- return False
- if val.startswith(('{', '[')):
- # Try first as pure JSON:
- try:
- return json.loads(val)
- except ValueError:
- pass
- # Try once again as FuzzyJSON:
- try:
- tree = ast.parse(val, mode='eval')
- transformed = FuzzyJSON().visit(tree)
- return ast.literal_eval(transformed)
- except (SyntaxError, ValueError):
- pass
- return val
-
- def _cli_expr(self,
- tokens: Sequence[str],
- parent: QMPObject) -> None:
- for arg in tokens:
- (key, sep, val) = arg.partition('=')
- if sep != '=':
- raise QMPShellError(
- f"Expected a key=value pair, got '{arg!s}'"
- )
-
- value = self._parse_value(val)
- optpath = key.split('.')
- curpath = []
- for path in optpath[:-1]:
- curpath.append(path)
- obj = parent.get(path, {})
- if not isinstance(obj, dict):
- msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
- raise QMPShellError(msg.format('.'.join(curpath)))
- parent[path] = obj
- parent = obj
- if optpath[-1] in parent:
- if isinstance(parent[optpath[-1]], dict):
- msg = 'Cannot use "{:s}" as both leaf and non-leaf key'
- raise QMPShellError(msg.format('.'.join(curpath)))
- raise QMPShellError(f'Cannot set "{key}" multiple times')
- parent[optpath[-1]] = value
-
- def _build_cmd(self, cmdline: str) -> Optional[QMPMessage]:
- """
- Build a QMP input object from a user provided command-line in the
- following format:
-
- < command-name > [ arg-name1=arg1 ] ... [ arg-nameN=argN ]
- """
- argument_regex = r'''(?:[^\s"']|"(?:\\.|[^"])*"|'(?:\\.|[^'])*')+'''
- cmdargs = re.findall(argument_regex, cmdline)
- qmpcmd: QMPMessage
-
- # Transactional CLI entry:
- if cmdargs and cmdargs[0] == 'transaction(':
- self._transmode = True
- self._actions = []
- cmdargs.pop(0)
-
- # Transactional CLI exit:
- if cmdargs and cmdargs[0] == ')' and self._transmode:
- self._transmode = False
- if len(cmdargs) > 1:
- msg = 'Unexpected input after close of Transaction sub-shell'
- raise QMPShellError(msg)
- qmpcmd = {
- 'execute': 'transaction',
- 'arguments': {'actions': self._actions}
- }
- return qmpcmd
-
- # No args, or no args remaining
- if not cmdargs:
- return None
-
- if self._transmode:
- # Parse and cache this Transactional Action
- finalize = False
- action = {'type': cmdargs[0], 'data': {}}
- if cmdargs[-1] == ')':
- cmdargs.pop(-1)
- finalize = True
- self._cli_expr(cmdargs[1:], action['data'])
- self._actions.append(action)
- return self._build_cmd(')') if finalize else None
-
- # Standard command: parse and return it to be executed.
- qmpcmd = {'execute': cmdargs[0], 'arguments': {}}
- self._cli_expr(cmdargs[1:], qmpcmd['arguments'])
- return qmpcmd
-
- def _print(self, qmp_message: object) -> None:
- jsobj = json.dumps(qmp_message,
- indent=4 if self.pretty else None,
- sort_keys=self.pretty)
- print(str(jsobj))
-
- def _execute_cmd(self, cmdline: str) -> bool:
- try:
- qmpcmd = self._build_cmd(cmdline)
- except QMPShellError as err:
- print(
- f"Error while parsing command line: {err!s}\n"
- "command format: <command-name> "
- "[arg-name1=arg1] ... [arg-nameN=argN",
- file=sys.stderr
- )
- return True
- # For transaction mode, we may have just cached the action:
- if qmpcmd is None:
- return True
- if self.verbose:
- self._print(qmpcmd)
- resp = self.cmd_obj(qmpcmd)
- if resp is None:
- print('Disconnected')
- return False
- self._print(resp)
- return True
-
- def connect(self, negotiate: bool = True) -> None:
- self._greeting = super().connect(negotiate)
- self._completer_setup()
-
- def show_banner(self,
- msg: str = 'Welcome to the QMP low-level shell!') -> None:
- """
- Print to stdio a greeting, and the QEMU version if available.
- """
- print(msg)
- if not self._greeting:
- print('Connected')
- return
- version = self._greeting['QMP']['version']['qemu']
- print("Connected to QEMU {major}.{minor}.{micro}\n".format(**version))
-
- @property
- def prompt(self) -> str:
- """
- Return the current shell prompt, including a trailing space.
- """
- if self._transmode:
- return 'TRANS> '
- return '(QEMU) '
-
- def read_exec_command(self) -> bool:
- """
- Read and execute a command.
-
- @return True if execution was ok, return False if disconnected.
- """
- try:
- cmdline = input(self.prompt)
- except EOFError:
- print()
- return False
-
- if cmdline == '':
- for event in self.get_events():
- print(event)
- return True
-
- return self._execute_cmd(cmdline)
-
- def repl(self) -> Iterator[None]:
- """
- Return an iterator that implements the REPL.
- """
- self.show_banner()
- while self.read_exec_command():
- yield
- self.close()
-
-
-class HMPShell(QMPShell):
- """
- HMPShell provides a basic readline-based HMP shell, tunnelled via QMP.
-
- :param address: Address of the QMP server.
- :param pretty: Pretty-print QMP messages.
- :param verbose: Echo outgoing QMP messages to console.
- """
- def __init__(self, address: SocketAddrT,
- pretty: bool = False, verbose: bool = False):
- super().__init__(address, pretty, verbose)
- self._cpu_index = 0
-
- def _cmd_completion(self) -> None:
- for cmd in self._cmd_passthrough('help')['return'].split('\r\n'):
- if cmd and cmd[0] != '[' and cmd[0] != '\t':
- name = cmd.split()[0] # drop help text
- if name == 'info':
- continue
- if name.find('|') != -1:
- # Command in the form 'foobar|f' or 'f|foobar', take the
- # full name
- opt = name.split('|')
- if len(opt[0]) == 1:
- name = opt[1]
- else:
- name = opt[0]
- self._completer.append(name)
- self._completer.append('help ' + name) # help completion
-
- def _info_completion(self) -> None:
- for cmd in self._cmd_passthrough('info')['return'].split('\r\n'):
- if cmd:
- self._completer.append('info ' + cmd.split()[1])
-
- def _other_completion(self) -> None:
- # special cases
- self._completer.append('help info')
-
- def _fill_completion(self) -> None:
- self._cmd_completion()
- self._info_completion()
- self._other_completion()
-
- def _cmd_passthrough(self, cmdline: str,
- cpu_index: int = 0) -> QMPMessage:
- return self.cmd_obj({
- 'execute': 'human-monitor-command',
- 'arguments': {
- 'command-line': cmdline,
- 'cpu-index': cpu_index
- }
- })
-
- def _execute_cmd(self, cmdline: str) -> bool:
- if cmdline.split()[0] == "cpu":
- # trap the cpu command, it requires special setting
- try:
- idx = int(cmdline.split()[1])
- if 'return' not in self._cmd_passthrough('info version', idx):
- print('bad CPU index')
- return True
- self._cpu_index = idx
- except ValueError:
- print('cpu command takes an integer argument')
- return True
- resp = self._cmd_passthrough(cmdline, self._cpu_index)
- if resp is None:
- print('Disconnected')
- return False
- assert 'return' in resp or 'error' in resp
- if 'return' in resp:
- # Success
- if len(resp['return']) > 0:
- print(resp['return'], end=' ')
- else:
- # Error
- print('%s: %s' % (resp['error']['class'], resp['error']['desc']))
- return True
-
- def show_banner(self, msg: str = 'Welcome to the HMP shell!') -> None:
- QMPShell.show_banner(self, msg)
-
-
-def die(msg: str) -> NoReturn:
- """Write an error to stderr, then exit with a return code of 1."""
- sys.stderr.write('ERROR: %s\n' % msg)
- sys.exit(1)
-
-
-def main() -> None:
- """
- qmp-shell entry point: parse command line arguments and start the REPL.
- """
- parser = argparse.ArgumentParser()
- parser.add_argument('-H', '--hmp', action='store_true',
- help='Use HMP interface')
- parser.add_argument('-N', '--skip-negotiation', action='store_true',
- help='Skip negotiate (for qemu-ga)')
- parser.add_argument('-v', '--verbose', action='store_true',
- help='Verbose (echo commands sent and received)')
- parser.add_argument('-p', '--pretty', action='store_true',
- help='Pretty-print JSON')
-
- default_server = os.environ.get('QMP_SOCKET')
- parser.add_argument('qmp_server', action='store',
- default=default_server,
- help='< UNIX socket path | TCP address:port >')
-
- args = parser.parse_args()
- if args.qmp_server is None:
- parser.error("QMP socket or TCP address must be specified")
-
- shell_class = HMPShell if args.hmp else QMPShell
-
- try:
- address = shell_class.parse_address(args.qmp_server)
- except QMPBadPortError:
- parser.error(f"Bad port number: {args.qmp_server}")
- return # pycharm doesn't know error() is noreturn
-
- with shell_class(address, args.pretty, args.verbose) as qemu:
- try:
- qemu.connect(negotiate=not args.skip_negotiation)
- except ConnectError as err:
- if isinstance(err.exc, OSError):
- die(f"Couldn't connect to {args.qmp_server}: {err!s}")
- die(str(err))
-
- for _ in qemu.repl():
- pass
-
-
-if __name__ == '__main__':
- main()
diff --git a/python/setup.cfg b/python/setup.cfg
index 91ccef7e8fd85d0d6d3d86adbc8d..0063c757b78638ef651a362af338 100644
--- a/python/setup.cfg
+++ b/python/setup.cfg
@@ -67,7 +67,7 @@ console_scripts =
qom-tree = qemu.utils.qom:QOMTree.entry_point
qom-fuse = qemu.utils.qom_fuse:QOMFuse.entry_point [fuse]
qemu-ga-client = qemu.utils.qemu_ga_client:main
- qmp-shell = qemu.qmp.qmp_shell:main
+ qmp-shell = qemu.aqmp.qmp_shell:main
aqmp-tui = qemu.aqmp.aqmp_tui:main [tui]
[flake8]
diff --git a/scripts/qmp/qmp-shell b/scripts/qmp/qmp-shell
index 4a20f97db708b74ebfed715e38ea..31b19d73e22a4d9c91aaf0ce9725 100755
--- a/scripts/qmp/qmp-shell
+++ b/scripts/qmp/qmp-shell
@@ -4,7 +4,7 @@ import os
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python'))
-from qemu.qmp import qmp_shell
+from qemu.aqmp import qmp_shell
if __name__ == '__main__':