1
0
mirror of https://github.com/openSUSE/osc.git synced 2025-01-12 08:56:13 +01:00
github.com_openSUSE_osc/osc/output/output.py

235 lines
8.0 KiB
Python
Raw Normal View History

import os
import platform
import re
import shlex
import subprocess
import sys
import tempfile
from typing import Dict
from typing import List
from typing import Optional
from typing import TextIO
from typing import Union
from . import tty
def print_msg(*args, print_to: Optional[str] = "debug"):
"""
Print ``*args`` to the ``print_to`` target:
- None: print nothing
- debug: print() to stderr with "DEBUG:" prefix if config["debug"] is set
- verbose: print() to stdout if config["verbose"] or config["debug"] is set
- error: print() to stderr with red "ERROR:" prefix
- warning: print() to stderr with yellow "WARNING:" prefix
- stdout: print() to stdout
- stderr: print() to stderr
"""
from .. import conf
if print_to is None:
return
elif print_to == "debug":
# print a debug message to stderr if config["debug"] is set
if conf.config["debug"]:
print("DEBUG:", *args, file=sys.stderr)
elif print_to == "verbose":
# print a verbose message to stdout if config["verbose"] or config["debug"] is set
if conf.config["verbose"] or conf.config["debug"]:
print(*args)
elif print_to == "error":
print(tty.colorize("ERROR:", "red,bold"), *args, file=sys.stderr)
elif print_to == "warning":
print(tty.colorize("WARNING:", "yellow,bold"), *args, file=sys.stderr)
elif print_to == "stdout":
# print the message to stdout
print(*args)
elif print_to == "stderr":
# print the message to stderr
print(*args, file=sys.stderr)
else:
raise ValueError(f"Invalid value of the 'print_to' option: {print_to}")
# cached compiled regular expressions; they are created on the first use
SANITIZE_TEXT_RE: Optional[Dict] = None
def sanitize_text(text: Union[bytes, str]) -> Union[bytes, str]:
"""
Remove forbidden characters and escape sequences from ``text``.
This must be run on lines or the whole text to work correctly.
Processing blocks of constant size might lead to splitting escape sequences
and leaving garbage characters after sanitizing.
"""
global SANITIZE_TEXT_RE
if not SANITIZE_TEXT_RE:
SANITIZE_TEXT_RE = {}
# CONTROL CHARACTERS
# remove all control characters with the exception of:
# 0x09 - horizontal tab (\t)
# 0x0A - line feed (\n)
# 0x0D - carriage return (\r)
# 0x1B - escape - is selectively handled later as part of sanitizing escape sequences
regex = r"[\x00-\x08\x0B\x0C\x0E-\x1A\x1C-\x1F]"
SANITIZE_TEXT_RE["str_control"] = re.compile(regex)
SANITIZE_TEXT_RE["bytes_control"] = re.compile(regex.encode("ascii"))
# CSI ESCAPE SEQUENCES
# https://en.wikipedia.org/wiki/ANSI_escape_code#CSI_codes
# remove all but allowed CSI escape sequences
# negative lookahead assertion that allows safe color escape sequences
neg_allowed_csi_sequences = r"(?!\[([0-5]|[34][0-7]|;)+m)"
# range 0x300x3F (OCT \040-\077) (ASCII 09:;<=>?); zero or more characters
csi_parameter_bytes = r"[\x30-\x3F]*"
# range 0x200x2F (OCT \040-\057) (ASCII space and !"#$%&'()*+,-./); zero or more characters
csi_itermediate_bytes = r"[\x20-\x2F]*"
# range 0x400x7E (OCT \100-\176) (ASCII @AZ[\]^_`az{|}~); 1 character
csi_final_byte = r"[\x40-\x7E]"
regex = rf"\033{neg_allowed_csi_sequences}\[{csi_parameter_bytes}{csi_itermediate_bytes}{csi_final_byte}"
SANITIZE_TEXT_RE["str_csi_sequences"] = re.compile(regex)
SANITIZE_TEXT_RE["bytes_csi_sequences"] = re.compile(regex.encode("ascii"))
# FE ESCAPE SEQUENCES
# https://en.wikipedia.org/wiki/ANSI_escape_code#Fe_Escape_sequences
# remove all Fe escape sequences
# range 0x40 to 0x5F (ASCII @AZ[\]^_); 1 character
fe = r"[\x40-x5F]"
regex = rf"\033{neg_allowed_csi_sequences}{fe}"
SANITIZE_TEXT_RE["str_fe_sequences"] = re.compile(regex)
SANITIZE_TEXT_RE["bytes_fe_sequences"] = re.compile(regex.encode("ascii"))
# REMAINING ESCAPE CHARACTERS
# remove all remaining escape characters that are not followed with the allowed CSI escape sequences
regex = rf"\033{neg_allowed_csi_sequences}"
SANITIZE_TEXT_RE["str_esc"] = re.compile(regex)
SANITIZE_TEXT_RE["bytes_esc"] = re.compile(regex.encode("ascii"))
if isinstance(text, bytes):
text = SANITIZE_TEXT_RE["bytes_control"].sub(b"", text)
text = SANITIZE_TEXT_RE["bytes_csi_sequences"].sub(b"", text)
text = SANITIZE_TEXT_RE["bytes_fe_sequences"].sub(b"", text)
text = SANITIZE_TEXT_RE["bytes_esc"].sub(b"", text)
else:
text = SANITIZE_TEXT_RE["str_control"].sub("", text)
text = SANITIZE_TEXT_RE["str_csi_sequences"].sub("", text)
text = SANITIZE_TEXT_RE["str_fe_sequences"].sub("", text)
text = SANITIZE_TEXT_RE["str_esc"].sub("", text)
return text
def safe_print(*args, **kwargs):
"""
A wrapper to print() that runs sanitize_text() on all arguments.
"""
args = [sanitize_text(i) for i in args]
print(*args, **kwargs)
def safe_write(file: TextIO, text: Union[str, bytes], *, add_newline: bool = False):
"""
Run sanitize_text() on ``text`` and write it to ``file``.
:param add_newline: Write a newline after writing the ``text``.
"""
text = sanitize_text(text)
if isinstance(text, bytes):
if hasattr(file, "buffer"):
file.buffer.write(text)
if add_newline:
file.buffer.write(os.linesep.encode("utf-8"))
else:
# file has no "buffer" attribute, let's try to write the bytes directly
file.write(text)
if add_newline:
file.write(os.linesep.encode("utf-8"))
else:
file.write(text)
if add_newline:
file.write(os.linesep)
def get_default_pager():
from ..core import _get_linux_distro
system = platform.system()
if system == 'Linux':
dist = _get_linux_distro()
if dist == 'debian':
return 'pager'
return 'less'
return 'more'
def get_pager():
"""
Return (pager, env) where
``pager`` is a list with parsed pager command
``env`` is copy of os.environ() with added variables specific to the pager
"""
env = os.environ.copy()
pager = os.getenv("PAGER", default="").strip()
pager = pager or get_default_pager()
# LESS env is not always set and we need -R to display escape sequences properly
less_opts = os.getenv("LESS", default="")
if "-R" not in less_opts:
less_opts += " -R"
env["LESS"] = less_opts
return shlex.split(pager), env
def run_pager(message: Union[bytes, str], tmp_suffix: str = ""):
from ..core import run_external
if not message:
return
2024-04-16 16:31:27 +02:00
if not tty.IS_INTERACTIVE:
safe_write(sys.stdout, message)
return
mode = "w+b" if isinstance(message, bytes) else "w+"
with tempfile.NamedTemporaryFile(mode=mode, suffix=tmp_suffix) as tmpfile:
2024-04-16 16:31:27 +02:00
safe_write(tmpfile, message)
tmpfile.flush()
pager, env = get_pager()
cmd = pager + [tmpfile.name]
run_external(*cmd, env=env)
def pipe_to_pager(lines: Union[List[bytes], List[str]], *, add_newlines=False):
"""
Pipe ``lines`` to the pager.
If running in a non-interactive terminal, print the data instead.
Add a newline after each line if ``add_newlines`` is ``True``.
"""
if not tty.IS_INTERACTIVE:
for line in lines:
safe_write(sys.stdout, line, add_newline=add_newlines)
return
pager, env = get_pager()
with subprocess.Popen(pager, stdin=subprocess.PIPE, encoding="utf-8", env=env) as proc:
try:
for line in lines:
safe_write(proc.stdin, line, add_newline=add_newlines)
proc.stdin.flush()
proc.stdin.close()
except BrokenPipeError:
pass
proc.wait()