1
0
mirror of https://github.com/openSUSE/osc.git synced 2025-01-07 15:06:22 +01:00
github.com_openSUSE_osc/osc/output/output.py

150 lines
5.6 KiB
Python
Raw Normal View History

import os
import re
import sys
from typing import Dict
from typing import Optional
from typing import TextIO
from typing import Union
from . import tty
def print_msg(*args, print_to: Optional[str] = "debug"):
"""
Print ``*args`` to the ``print_to`` target:
- None: print nothing
- debug: print() to stderr with "DEBUG:" prefix if config["debug"] is set
- verbose: print() to stdout if config["verbose"] or config["debug"] is set
- error: print() to stderr with red "ERROR:" prefix
- warning: print() to stderr with yellow "WARNING:" prefix
- stdout: print() to stdout
- stderr: print() to stderr
"""
from .. import conf
if print_to is None:
return
elif print_to == "debug":
# print a debug message to stderr if config["debug"] is set
if conf.config["debug"]:
print("DEBUG:", *args, file=sys.stderr)
elif print_to == "verbose":
# print a verbose message to stdout if config["verbose"] or config["debug"] is set
if conf.config["verbose"] or conf.config["debug"]:
print(*args)
elif print_to == "error":
print(tty.colorize("ERROR:", "red,bold"), *args, file=sys.stderr)
elif print_to == "warning":
print(tty.colorize("WARNING:", "yellow,bold"), *args, file=sys.stderr)
elif print_to == "stdout":
# print the message to stdout
print(*args)
elif print_to == "stderr":
# print the message to stderr
print(*args, file=sys.stderr)
else:
raise ValueError(f"Invalid value of the 'print_to' option: {print_to}")
# cached compiled regular expressions; they are created on the first use
SANITIZE_TEXT_RE: Optional[Dict] = None
def sanitize_text(text: Union[bytes, str]) -> Union[bytes, str]:
"""
Remove forbidden characters and escape sequences from ``text``.
This must be run on lines or the whole text to work correctly.
Processing blocks of constant size might lead to splitting escape sequences
and leaving garbage characters after sanitizing.
"""
global SANITIZE_TEXT_RE
if not SANITIZE_TEXT_RE:
SANITIZE_TEXT_RE = {}
# CONTROL CHARACTERS
# remove all control characters with the exception of:
# 0x09 - horizontal tab (\t)
# 0x0A - line feed (\n)
# 0x0D - carriage return (\r)
# 0x1B - escape - is selectively handled later as part of sanitizing escape sequences
regex = r"[\x00-\x08\x0B\x0C\x0E-\x1A\x1C-\x1F]"
SANITIZE_TEXT_RE["str_control"] = re.compile(regex)
SANITIZE_TEXT_RE["bytes_control"] = re.compile(regex.encode("ascii"))
# CSI ESCAPE SEQUENCES
# https://en.wikipedia.org/wiki/ANSI_escape_code#CSI_codes
# remove all but allowed CSI escape sequences
# negative lookahead assertion that allows safe color escape sequences
neg_allowed_csi_sequences = r"(?!\[([0-5]|[34][0-7]|;)+m)"
# range 0x300x3F (OCT \040-\077) (ASCII 09:;<=>?); zero or more characters
csi_parameter_bytes = r"[\x30-\x3F]*"
# range 0x200x2F (OCT \040-\057) (ASCII space and !"#$%&'()*+,-./); zero or more characters
csi_itermediate_bytes = r"[\x20-\x2F]*"
# range 0x400x7E (OCT \100-\176) (ASCII @AZ[\]^_`az{|}~); 1 character
csi_final_byte = r"[\x40-\x7E]"
regex = rf"\033{neg_allowed_csi_sequences}\[{csi_parameter_bytes}{csi_itermediate_bytes}{csi_final_byte}"
SANITIZE_TEXT_RE["str_csi_sequences"] = re.compile(regex)
SANITIZE_TEXT_RE["bytes_csi_sequences"] = re.compile(regex.encode("ascii"))
# FE ESCAPE SEQUENCES
# https://en.wikipedia.org/wiki/ANSI_escape_code#Fe_Escape_sequences
# remove all Fe escape sequences
# range 0x40 to 0x5F (ASCII @AZ[\]^_); 1 character
fe = r"[\x40-x5F]"
regex = rf"\033{neg_allowed_csi_sequences}{fe}"
SANITIZE_TEXT_RE["str_fe_sequences"] = re.compile(regex)
SANITIZE_TEXT_RE["bytes_fe_sequences"] = re.compile(regex.encode("ascii"))
# REMAINING ESCAPE CHARACTERS
# remove all remaining escape characters that are not followed with the allowed CSI escape sequences
regex = rf"\033{neg_allowed_csi_sequences}"
SANITIZE_TEXT_RE["str_esc"] = re.compile(regex)
SANITIZE_TEXT_RE["bytes_esc"] = re.compile(regex.encode("ascii"))
if isinstance(text, bytes):
text = SANITIZE_TEXT_RE["bytes_control"].sub(b"", text)
text = SANITIZE_TEXT_RE["bytes_csi_sequences"].sub(b"", text)
text = SANITIZE_TEXT_RE["bytes_fe_sequences"].sub(b"", text)
text = SANITIZE_TEXT_RE["bytes_esc"].sub(b"", text)
else:
text = SANITIZE_TEXT_RE["str_control"].sub("", text)
text = SANITIZE_TEXT_RE["str_csi_sequences"].sub("", text)
text = SANITIZE_TEXT_RE["str_fe_sequences"].sub("", text)
text = SANITIZE_TEXT_RE["str_esc"].sub("", text)
return text
def safe_print(*args, **kwargs):
"""
A wrapper to print() that runs sanitize_text() on all arguments.
"""
args = [sanitize_text(i) for i in args]
print(*args, **kwargs)
def safe_write(file: TextIO, text: Union[str, bytes], *, add_newline: bool = False):
"""
Run sanitize_text() on ``text`` and write it to ``file``.
:param add_newline: Write a newline after writing the ``text``.
"""
text = sanitize_text(text)
if isinstance(text, bytes):
file.buffer.write(text)
if add_newline:
file.buffer.write(os.linesep.encode("utf-8"))
else:
file.write(text)
if add_newline:
file.write(os.linesep)