mirror of
https://github.com/openSUSE/osc.git
synced 2024-11-14 16:26:13 +01:00
150 lines
5.6 KiB
Python
150 lines
5.6 KiB
Python
import os
|
||
import re
|
||
import sys
|
||
from typing import Dict
|
||
from typing import Optional
|
||
from typing import TextIO
|
||
from typing import Union
|
||
|
||
from . import tty
|
||
|
||
|
||
def print_msg(*args, print_to: Optional[str] = "debug"):
|
||
"""
|
||
Print ``*args`` to the ``print_to`` target:
|
||
- None: print nothing
|
||
- debug: print() to stderr with "DEBUG:" prefix if config["debug"] is set
|
||
- verbose: print() to stdout if config["verbose"] or config["debug"] is set
|
||
- error: print() to stderr with red "ERROR:" prefix
|
||
- warning: print() to stderr with yellow "WARNING:" prefix
|
||
- stdout: print() to stdout
|
||
- stderr: print() to stderr
|
||
"""
|
||
from .. import conf
|
||
|
||
if print_to is None:
|
||
return
|
||
elif print_to == "debug":
|
||
# print a debug message to stderr if config["debug"] is set
|
||
if conf.config["debug"]:
|
||
print("DEBUG:", *args, file=sys.stderr)
|
||
elif print_to == "verbose":
|
||
# print a verbose message to stdout if config["verbose"] or config["debug"] is set
|
||
if conf.config["verbose"] or conf.config["debug"]:
|
||
print(*args)
|
||
elif print_to == "error":
|
||
print(tty.colorize("ERROR:", "red,bold"), *args, file=sys.stderr)
|
||
elif print_to == "warning":
|
||
print(tty.colorize("WARNING:", "yellow,bold"), *args, file=sys.stderr)
|
||
elif print_to == "stdout":
|
||
# print the message to stdout
|
||
print(*args)
|
||
elif print_to == "stderr":
|
||
# print the message to stderr
|
||
print(*args, file=sys.stderr)
|
||
else:
|
||
raise ValueError(f"Invalid value of the 'print_to' option: {print_to}")
|
||
|
||
|
||
# cached compiled regular expressions; they are created on the first use
|
||
SANITIZE_TEXT_RE: Optional[Dict] = None
|
||
|
||
|
||
def sanitize_text(text: Union[bytes, str]) -> Union[bytes, str]:
|
||
"""
|
||
Remove forbidden characters and escape sequences from ``text``.
|
||
|
||
This must be run on lines or the whole text to work correctly.
|
||
Processing blocks of constant size might lead to splitting escape sequences
|
||
and leaving garbage characters after sanitizing.
|
||
"""
|
||
global SANITIZE_TEXT_RE
|
||
|
||
if not SANITIZE_TEXT_RE:
|
||
SANITIZE_TEXT_RE = {}
|
||
|
||
# CONTROL CHARACTERS
|
||
# remove all control characters with the exception of:
|
||
# 0x09 - horizontal tab (\t)
|
||
# 0x0A - line feed (\n)
|
||
# 0x0D - carriage return (\r)
|
||
# 0x1B - escape - is selectively handled later as part of sanitizing escape sequences
|
||
|
||
regex = r"[\x00-\x08\x0B\x0C\x0E-\x1A\x1C-\x1F]"
|
||
SANITIZE_TEXT_RE["str_control"] = re.compile(regex)
|
||
SANITIZE_TEXT_RE["bytes_control"] = re.compile(regex.encode("ascii"))
|
||
|
||
# CSI ESCAPE SEQUENCES
|
||
# https://en.wikipedia.org/wiki/ANSI_escape_code#CSI_codes
|
||
# remove all but allowed CSI escape sequences
|
||
|
||
# negative lookahead assertion that allows safe color escape sequences
|
||
neg_allowed_csi_sequences = r"(?!\[([0-5]|[34][0-7]|;)+m)"
|
||
|
||
# range 0x30–0x3F (OCT \040-\077) (ASCII 0–9:;<=>?); zero or more characters
|
||
csi_parameter_bytes = r"[\x30-\x3F]*"
|
||
|
||
# range 0x20–0x2F (OCT \040-\057) (ASCII space and !"#$%&'()*+,-./); zero or more characters
|
||
csi_itermediate_bytes = r"[\x20-\x2F]*"
|
||
|
||
# range 0x40–0x7E (OCT \100-\176) (ASCII @A–Z[\]^_`a–z{|}~); 1 character
|
||
csi_final_byte = r"[\x40-\x7E]"
|
||
|
||
regex = rf"\033{neg_allowed_csi_sequences}\[{csi_parameter_bytes}{csi_itermediate_bytes}{csi_final_byte}"
|
||
SANITIZE_TEXT_RE["str_csi_sequences"] = re.compile(regex)
|
||
SANITIZE_TEXT_RE["bytes_csi_sequences"] = re.compile(regex.encode("ascii"))
|
||
|
||
# FE ESCAPE SEQUENCES
|
||
# https://en.wikipedia.org/wiki/ANSI_escape_code#Fe_Escape_sequences
|
||
# remove all Fe escape sequences
|
||
|
||
# range 0x40 to 0x5F (ASCII @A–Z[\]^_); 1 character
|
||
fe = r"[\x40-x5F]"
|
||
regex = rf"\033{neg_allowed_csi_sequences}{fe}"
|
||
SANITIZE_TEXT_RE["str_fe_sequences"] = re.compile(regex)
|
||
SANITIZE_TEXT_RE["bytes_fe_sequences"] = re.compile(regex.encode("ascii"))
|
||
|
||
# REMAINING ESCAPE CHARACTERS
|
||
# remove all remaining escape characters that are not followed with the allowed CSI escape sequences
|
||
|
||
regex = rf"\033{neg_allowed_csi_sequences}"
|
||
SANITIZE_TEXT_RE["str_esc"] = re.compile(regex)
|
||
SANITIZE_TEXT_RE["bytes_esc"] = re.compile(regex.encode("ascii"))
|
||
|
||
if isinstance(text, bytes):
|
||
text = SANITIZE_TEXT_RE["bytes_control"].sub(b"", text)
|
||
text = SANITIZE_TEXT_RE["bytes_csi_sequences"].sub(b"", text)
|
||
text = SANITIZE_TEXT_RE["bytes_fe_sequences"].sub(b"", text)
|
||
text = SANITIZE_TEXT_RE["bytes_esc"].sub(b"", text)
|
||
else:
|
||
text = SANITIZE_TEXT_RE["str_control"].sub("", text)
|
||
text = SANITIZE_TEXT_RE["str_csi_sequences"].sub("", text)
|
||
text = SANITIZE_TEXT_RE["str_fe_sequences"].sub("", text)
|
||
text = SANITIZE_TEXT_RE["str_esc"].sub("", text)
|
||
return text
|
||
|
||
|
||
def safe_print(*args, **kwargs):
|
||
"""
|
||
A wrapper to print() that runs sanitize_text() on all arguments.
|
||
"""
|
||
args = [sanitize_text(i) for i in args]
|
||
print(*args, **kwargs)
|
||
|
||
|
||
def safe_write(file: TextIO, text: Union[str, bytes], *, add_newline: bool = False):
|
||
"""
|
||
Run sanitize_text() on ``text`` and write it to ``file``.
|
||
|
||
:param add_newline: Write a newline after writing the ``text``.
|
||
"""
|
||
text = sanitize_text(text)
|
||
if isinstance(text, bytes):
|
||
file.buffer.write(text)
|
||
if add_newline:
|
||
file.buffer.write(os.linesep.encode("utf-8"))
|
||
else:
|
||
file.write(text)
|
||
if add_newline:
|
||
file.write(os.linesep)
|