2024-02-16 14:09:22 +01:00
|
|
|
|
import os
|
2024-04-16 16:18:50 +02:00
|
|
|
|
import platform
|
2024-03-07 11:59:51 +01:00
|
|
|
|
import re
|
2024-04-16 16:18:50 +02:00
|
|
|
|
import shlex
|
2024-04-16 16:56:00 +02:00
|
|
|
|
import subprocess
|
2024-03-05 15:40:37 +01:00
|
|
|
|
import sys
|
2024-04-16 16:18:50 +02:00
|
|
|
|
import tempfile
|
2024-03-07 11:59:51 +01:00
|
|
|
|
from typing import Dict
|
2024-04-16 16:56:00 +02:00
|
|
|
|
from typing import List
|
2024-03-05 15:58:43 +01:00
|
|
|
|
from typing import Optional
|
2024-02-16 14:09:22 +01:00
|
|
|
|
from typing import TextIO
|
|
|
|
|
from typing import Union
|
2024-03-05 15:40:37 +01:00
|
|
|
|
|
2024-03-05 15:58:43 +01:00
|
|
|
|
from . import tty
|
2024-03-05 15:40:37 +01:00
|
|
|
|
|
2024-03-05 15:58:43 +01:00
|
|
|
|
|
|
|
|
|
def print_msg(*args, print_to: Optional[str] = "debug"):
|
|
|
|
|
"""
|
|
|
|
|
Print ``*args`` to the ``print_to`` target:
|
|
|
|
|
- None: print nothing
|
|
|
|
|
- debug: print() to stderr with "DEBUG:" prefix if config["debug"] is set
|
|
|
|
|
- verbose: print() to stdout if config["verbose"] or config["debug"] is set
|
|
|
|
|
- error: print() to stderr with red "ERROR:" prefix
|
|
|
|
|
- warning: print() to stderr with yellow "WARNING:" prefix
|
|
|
|
|
- stdout: print() to stdout
|
|
|
|
|
- stderr: print() to stderr
|
|
|
|
|
"""
|
2024-03-05 15:40:37 +01:00
|
|
|
|
from .. import conf
|
|
|
|
|
|
|
|
|
|
if print_to is None:
|
|
|
|
|
return
|
|
|
|
|
elif print_to == "debug":
|
|
|
|
|
# print a debug message to stderr if config["debug"] is set
|
|
|
|
|
if conf.config["debug"]:
|
|
|
|
|
print("DEBUG:", *args, file=sys.stderr)
|
|
|
|
|
elif print_to == "verbose":
|
|
|
|
|
# print a verbose message to stdout if config["verbose"] or config["debug"] is set
|
|
|
|
|
if conf.config["verbose"] or conf.config["debug"]:
|
|
|
|
|
print(*args)
|
2024-03-05 15:58:43 +01:00
|
|
|
|
elif print_to == "error":
|
|
|
|
|
print(tty.colorize("ERROR:", "red,bold"), *args, file=sys.stderr)
|
|
|
|
|
elif print_to == "warning":
|
|
|
|
|
print(tty.colorize("WARNING:", "yellow,bold"), *args, file=sys.stderr)
|
2024-03-05 15:40:37 +01:00
|
|
|
|
elif print_to == "stdout":
|
|
|
|
|
# print the message to stdout
|
|
|
|
|
print(*args)
|
|
|
|
|
elif print_to == "stderr":
|
|
|
|
|
# print the message to stderr
|
|
|
|
|
print(*args, file=sys.stderr)
|
|
|
|
|
else:
|
|
|
|
|
raise ValueError(f"Invalid value of the 'print_to' option: {print_to}")
|
2024-02-16 14:09:22 +01:00
|
|
|
|
|
|
|
|
|
|
2024-03-07 11:59:51 +01:00
|
|
|
|
# cached compiled regular expressions; they are created on the first use
|
|
|
|
|
SANITIZE_TEXT_RE: Optional[Dict] = None
|
2024-02-16 14:09:22 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def sanitize_text(text: Union[bytes, str]) -> Union[bytes, str]:
|
|
|
|
|
"""
|
2024-03-07 11:59:51 +01:00
|
|
|
|
Remove forbidden characters and escape sequences from ``text``.
|
|
|
|
|
|
|
|
|
|
This must be run on lines or the whole text to work correctly.
|
|
|
|
|
Processing blocks of constant size might lead to splitting escape sequences
|
|
|
|
|
and leaving garbage characters after sanitizing.
|
2024-02-16 14:09:22 +01:00
|
|
|
|
"""
|
2024-03-07 11:59:51 +01:00
|
|
|
|
global SANITIZE_TEXT_RE
|
|
|
|
|
|
|
|
|
|
if not SANITIZE_TEXT_RE:
|
|
|
|
|
SANITIZE_TEXT_RE = {}
|
|
|
|
|
|
|
|
|
|
# CONTROL CHARACTERS
|
|
|
|
|
# remove all control characters with the exception of:
|
|
|
|
|
# 0x09 - horizontal tab (\t)
|
|
|
|
|
# 0x0A - line feed (\n)
|
|
|
|
|
# 0x0D - carriage return (\r)
|
|
|
|
|
# 0x1B - escape - is selectively handled later as part of sanitizing escape sequences
|
|
|
|
|
|
|
|
|
|
regex = r"[\x00-\x08\x0B\x0C\x0E-\x1A\x1C-\x1F]"
|
|
|
|
|
SANITIZE_TEXT_RE["str_control"] = re.compile(regex)
|
|
|
|
|
SANITIZE_TEXT_RE["bytes_control"] = re.compile(regex.encode("ascii"))
|
|
|
|
|
|
|
|
|
|
# CSI ESCAPE SEQUENCES
|
|
|
|
|
# https://en.wikipedia.org/wiki/ANSI_escape_code#CSI_codes
|
|
|
|
|
# remove all but allowed CSI escape sequences
|
|
|
|
|
|
|
|
|
|
# negative lookahead assertion that allows safe color escape sequences
|
|
|
|
|
neg_allowed_csi_sequences = r"(?!\[([0-5]|[34][0-7]|;)+m)"
|
|
|
|
|
|
|
|
|
|
# range 0x30–0x3F (OCT \040-\077) (ASCII 0–9:;<=>?); zero or more characters
|
|
|
|
|
csi_parameter_bytes = r"[\x30-\x3F]*"
|
|
|
|
|
|
|
|
|
|
# range 0x20–0x2F (OCT \040-\057) (ASCII space and !"#$%&'()*+,-./); zero or more characters
|
|
|
|
|
csi_itermediate_bytes = r"[\x20-\x2F]*"
|
|
|
|
|
|
|
|
|
|
# range 0x40–0x7E (OCT \100-\176) (ASCII @A–Z[\]^_`a–z{|}~); 1 character
|
|
|
|
|
csi_final_byte = r"[\x40-\x7E]"
|
|
|
|
|
|
|
|
|
|
regex = rf"\033{neg_allowed_csi_sequences}\[{csi_parameter_bytes}{csi_itermediate_bytes}{csi_final_byte}"
|
|
|
|
|
SANITIZE_TEXT_RE["str_csi_sequences"] = re.compile(regex)
|
|
|
|
|
SANITIZE_TEXT_RE["bytes_csi_sequences"] = re.compile(regex.encode("ascii"))
|
|
|
|
|
|
|
|
|
|
# FE ESCAPE SEQUENCES
|
|
|
|
|
# https://en.wikipedia.org/wiki/ANSI_escape_code#Fe_Escape_sequences
|
|
|
|
|
# remove all Fe escape sequences
|
|
|
|
|
|
|
|
|
|
# range 0x40 to 0x5F (ASCII @A–Z[\]^_); 1 character
|
|
|
|
|
fe = r"[\x40-x5F]"
|
|
|
|
|
regex = rf"\033{neg_allowed_csi_sequences}{fe}"
|
|
|
|
|
SANITIZE_TEXT_RE["str_fe_sequences"] = re.compile(regex)
|
|
|
|
|
SANITIZE_TEXT_RE["bytes_fe_sequences"] = re.compile(regex.encode("ascii"))
|
|
|
|
|
|
|
|
|
|
# REMAINING ESCAPE CHARACTERS
|
|
|
|
|
# remove all remaining escape characters that are not followed with the allowed CSI escape sequences
|
|
|
|
|
|
|
|
|
|
regex = rf"\033{neg_allowed_csi_sequences}"
|
|
|
|
|
SANITIZE_TEXT_RE["str_esc"] = re.compile(regex)
|
|
|
|
|
SANITIZE_TEXT_RE["bytes_esc"] = re.compile(regex.encode("ascii"))
|
|
|
|
|
|
2024-02-16 14:09:22 +01:00
|
|
|
|
if isinstance(text, bytes):
|
2024-03-07 11:59:51 +01:00
|
|
|
|
text = SANITIZE_TEXT_RE["bytes_control"].sub(b"", text)
|
|
|
|
|
text = SANITIZE_TEXT_RE["bytes_csi_sequences"].sub(b"", text)
|
|
|
|
|
text = SANITIZE_TEXT_RE["bytes_fe_sequences"].sub(b"", text)
|
|
|
|
|
text = SANITIZE_TEXT_RE["bytes_esc"].sub(b"", text)
|
|
|
|
|
else:
|
|
|
|
|
text = SANITIZE_TEXT_RE["str_control"].sub("", text)
|
|
|
|
|
text = SANITIZE_TEXT_RE["str_csi_sequences"].sub("", text)
|
|
|
|
|
text = SANITIZE_TEXT_RE["str_fe_sequences"].sub("", text)
|
|
|
|
|
text = SANITIZE_TEXT_RE["str_esc"].sub("", text)
|
|
|
|
|
return text
|
2024-02-16 14:09:22 +01:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def safe_print(*args, **kwargs):
|
|
|
|
|
"""
|
|
|
|
|
A wrapper to print() that runs sanitize_text() on all arguments.
|
|
|
|
|
"""
|
|
|
|
|
args = [sanitize_text(i) for i in args]
|
|
|
|
|
print(*args, **kwargs)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def safe_write(file: TextIO, text: Union[str, bytes], *, add_newline: bool = False):
|
|
|
|
|
"""
|
|
|
|
|
Run sanitize_text() on ``text`` and write it to ``file``.
|
|
|
|
|
|
|
|
|
|
:param add_newline: Write a newline after writing the ``text``.
|
|
|
|
|
"""
|
|
|
|
|
text = sanitize_text(text)
|
|
|
|
|
if isinstance(text, bytes):
|
2024-04-16 16:50:10 +02:00
|
|
|
|
if hasattr(file, "buffer"):
|
|
|
|
|
file.buffer.write(text)
|
|
|
|
|
if add_newline:
|
|
|
|
|
file.buffer.write(os.linesep.encode("utf-8"))
|
|
|
|
|
else:
|
|
|
|
|
# file has no "buffer" attribute, let's try to write the bytes directly
|
|
|
|
|
file.write(text)
|
|
|
|
|
if add_newline:
|
|
|
|
|
file.write(os.linesep.encode("utf-8"))
|
2024-02-16 14:09:22 +01:00
|
|
|
|
else:
|
|
|
|
|
file.write(text)
|
|
|
|
|
if add_newline:
|
|
|
|
|
file.write(os.linesep)
|
2024-04-16 16:18:50 +02:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_default_pager():
|
|
|
|
|
from ..core import _get_linux_distro
|
|
|
|
|
|
|
|
|
|
system = platform.system()
|
|
|
|
|
if system == 'Linux':
|
|
|
|
|
dist = _get_linux_distro()
|
|
|
|
|
if dist == 'debian':
|
|
|
|
|
return 'pager'
|
|
|
|
|
return 'less'
|
|
|
|
|
return 'more'
|
|
|
|
|
|
|
|
|
|
|
2024-04-16 16:56:00 +02:00
|
|
|
|
def get_pager():
|
|
|
|
|
"""
|
|
|
|
|
Return (pager, env) where
|
|
|
|
|
``pager`` is a list with parsed pager command
|
|
|
|
|
``env`` is copy of os.environ() with added variables specific to the pager
|
|
|
|
|
"""
|
|
|
|
|
env = os.environ.copy()
|
|
|
|
|
pager = os.getenv("PAGER", default="").strip()
|
|
|
|
|
pager = pager or get_default_pager()
|
|
|
|
|
|
|
|
|
|
# LESS env is not always set and we need -R to display escape sequences properly
|
|
|
|
|
less_opts = os.getenv("LESS", default="")
|
|
|
|
|
if "-R" not in less_opts:
|
|
|
|
|
less_opts += " -R"
|
|
|
|
|
env["LESS"] = less_opts
|
|
|
|
|
|
|
|
|
|
return shlex.split(pager), env
|
|
|
|
|
|
|
|
|
|
|
2024-04-16 16:18:50 +02:00
|
|
|
|
def run_pager(message: Union[bytes, str], tmp_suffix: str = ""):
|
|
|
|
|
from ..core import run_external
|
|
|
|
|
|
|
|
|
|
if not message:
|
|
|
|
|
return
|
|
|
|
|
|
2024-04-16 16:31:27 +02:00
|
|
|
|
if not tty.IS_INTERACTIVE:
|
|
|
|
|
safe_write(sys.stdout, message)
|
|
|
|
|
return
|
|
|
|
|
|
2024-04-16 16:50:10 +02:00
|
|
|
|
mode = "w+b" if isinstance(message, bytes) else "w+"
|
|
|
|
|
with tempfile.NamedTemporaryFile(mode=mode, suffix=tmp_suffix) as tmpfile:
|
2024-04-16 16:31:27 +02:00
|
|
|
|
safe_write(tmpfile, message)
|
2024-04-16 16:18:50 +02:00
|
|
|
|
tmpfile.flush()
|
|
|
|
|
|
2024-04-16 16:56:00 +02:00
|
|
|
|
pager, env = get_pager()
|
|
|
|
|
cmd = pager + [tmpfile.name]
|
|
|
|
|
run_external(*cmd, env=env)
|
2024-04-16 16:18:50 +02:00
|
|
|
|
|
|
|
|
|
|
2024-04-16 16:56:00 +02:00
|
|
|
|
def pipe_to_pager(lines: Union[List[bytes], List[str]], *, add_newlines=False):
|
|
|
|
|
"""
|
|
|
|
|
Pipe ``lines`` to the pager.
|
|
|
|
|
If running in a non-interactive terminal, print the data instead.
|
|
|
|
|
Add a newline after each line if ``add_newlines`` is ``True``.
|
|
|
|
|
"""
|
|
|
|
|
if not tty.IS_INTERACTIVE:
|
|
|
|
|
for line in lines:
|
|
|
|
|
safe_write(sys.stdout, line, add_newline=add_newlines)
|
|
|
|
|
return
|
2024-04-16 16:18:50 +02:00
|
|
|
|
|
2024-04-16 16:56:00 +02:00
|
|
|
|
pager, env = get_pager()
|
|
|
|
|
with subprocess.Popen(pager, stdin=subprocess.PIPE, encoding="utf-8", env=env) as proc:
|
|
|
|
|
try:
|
|
|
|
|
for line in lines:
|
|
|
|
|
safe_write(proc.stdin, line, add_newline=add_newlines)
|
|
|
|
|
proc.stdin.flush()
|
|
|
|
|
proc.stdin.close()
|
|
|
|
|
except BrokenPipeError:
|
|
|
|
|
pass
|
|
|
|
|
proc.wait()
|