mirror of
https://github.com/openSUSE/osc.git
synced 2025-02-28 21:22:14 +01:00
Merge pull request #1508 from dmach/sanitize_text-escape-sequences
Sanitize text escape sequences
This commit is contained in:
commit
ce1855fbb8
@ -53,6 +53,7 @@ from . import oscerr
|
||||
from . import output
|
||||
from . import store as osc_store
|
||||
from .connection import http_request, http_GET, http_POST, http_PUT, http_DELETE
|
||||
from .output import sanitize_text
|
||||
from .store import Store
|
||||
from .util import xdg
|
||||
from .util.helper import decode_list, decode_it, raw_input, _html_escape
|
||||
@ -6998,11 +6999,9 @@ def print_buildlog(
|
||||
def print_data(data, strip_time=False):
|
||||
if strip_time:
|
||||
data = buildlog_strip_time(data)
|
||||
output_buffer.write(data.translate(all_bytes, remove_bytes))
|
||||
|
||||
# to protect us against control characters (CVE-2012-1095)
|
||||
all_bytes = bytes.maketrans(b'', b'')
|
||||
remove_bytes = all_bytes[:8] + all_bytes[14:32] # accept tabs and newlines
|
||||
output_buffer.write(sanitize_text(data))
|
||||
|
||||
query = {'nostream': '1', 'start': f'{offset}'}
|
||||
if last:
|
||||
query['last'] = 1
|
||||
|
@ -1,6 +1,9 @@
|
||||
from .key_value_table import KeyValueTable
|
||||
from .input import get_user_input
|
||||
from .output import print_msg
|
||||
from .output import sanitize_text
|
||||
from .output import safe_print
|
||||
from .output import safe_write
|
||||
from .tty import colorize
|
||||
from .widechar import wc_ljust
|
||||
from .widechar import wc_width
|
||||
|
@ -1,5 +1,10 @@
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from typing import Dict
|
||||
from typing import Optional
|
||||
from typing import TextIO
|
||||
from typing import Union
|
||||
|
||||
from . import tty
|
||||
|
||||
@ -39,3 +44,106 @@ def print_msg(*args, print_to: Optional[str] = "debug"):
|
||||
print(*args, file=sys.stderr)
|
||||
else:
|
||||
raise ValueError(f"Invalid value of the 'print_to' option: {print_to}")
|
||||
|
||||
|
||||
# cached compiled regular expressions; they are created on the first use
|
||||
SANITIZE_TEXT_RE: Optional[Dict] = None
|
||||
|
||||
|
||||
def sanitize_text(text: Union[bytes, str]) -> Union[bytes, str]:
|
||||
"""
|
||||
Remove forbidden characters and escape sequences from ``text``.
|
||||
|
||||
This must be run on lines or the whole text to work correctly.
|
||||
Processing blocks of constant size might lead to splitting escape sequences
|
||||
and leaving garbage characters after sanitizing.
|
||||
"""
|
||||
global SANITIZE_TEXT_RE
|
||||
|
||||
if not SANITIZE_TEXT_RE:
|
||||
SANITIZE_TEXT_RE = {}
|
||||
|
||||
# CONTROL CHARACTERS
|
||||
# remove all control characters with the exception of:
|
||||
# 0x09 - horizontal tab (\t)
|
||||
# 0x0A - line feed (\n)
|
||||
# 0x0D - carriage return (\r)
|
||||
# 0x1B - escape - is selectively handled later as part of sanitizing escape sequences
|
||||
|
||||
regex = r"[\x00-\x08\x0B\x0C\x0E-\x1A\x1C-\x1F]"
|
||||
SANITIZE_TEXT_RE["str_control"] = re.compile(regex)
|
||||
SANITIZE_TEXT_RE["bytes_control"] = re.compile(regex.encode("ascii"))
|
||||
|
||||
# CSI ESCAPE SEQUENCES
|
||||
# https://en.wikipedia.org/wiki/ANSI_escape_code#CSI_codes
|
||||
# remove all but allowed CSI escape sequences
|
||||
|
||||
# negative lookahead assertion that allows safe color escape sequences
|
||||
neg_allowed_csi_sequences = r"(?!\[([0-5]|[34][0-7]|;)+m)"
|
||||
|
||||
# range 0x30–0x3F (OCT \040-\077) (ASCII 0–9:;<=>?); zero or more characters
|
||||
csi_parameter_bytes = r"[\x30-\x3F]*"
|
||||
|
||||
# range 0x20–0x2F (OCT \040-\057) (ASCII space and !"#$%&'()*+,-./); zero or more characters
|
||||
csi_itermediate_bytes = r"[\x20-\x2F]*"
|
||||
|
||||
# range 0x40–0x7E (OCT \100-\176) (ASCII @A–Z[\]^_`a–z{|}~); 1 character
|
||||
csi_final_byte = r"[\x40-\x7E]"
|
||||
|
||||
regex = rf"\033{neg_allowed_csi_sequences}\[{csi_parameter_bytes}{csi_itermediate_bytes}{csi_final_byte}"
|
||||
SANITIZE_TEXT_RE["str_csi_sequences"] = re.compile(regex)
|
||||
SANITIZE_TEXT_RE["bytes_csi_sequences"] = re.compile(regex.encode("ascii"))
|
||||
|
||||
# FE ESCAPE SEQUENCES
|
||||
# https://en.wikipedia.org/wiki/ANSI_escape_code#Fe_Escape_sequences
|
||||
# remove all Fe escape sequences
|
||||
|
||||
# range 0x40 to 0x5F (ASCII @A–Z[\]^_); 1 character
|
||||
fe = r"[\x40-x5F]"
|
||||
regex = rf"\033{neg_allowed_csi_sequences}{fe}"
|
||||
SANITIZE_TEXT_RE["str_fe_sequences"] = re.compile(regex)
|
||||
SANITIZE_TEXT_RE["bytes_fe_sequences"] = re.compile(regex.encode("ascii"))
|
||||
|
||||
# REMAINING ESCAPE CHARACTERS
|
||||
# remove all remaining escape characters that are not followed with the allowed CSI escape sequences
|
||||
|
||||
regex = rf"\033{neg_allowed_csi_sequences}"
|
||||
SANITIZE_TEXT_RE["str_esc"] = re.compile(regex)
|
||||
SANITIZE_TEXT_RE["bytes_esc"] = re.compile(regex.encode("ascii"))
|
||||
|
||||
if isinstance(text, bytes):
|
||||
text = SANITIZE_TEXT_RE["bytes_control"].sub(b"", text)
|
||||
text = SANITIZE_TEXT_RE["bytes_csi_sequences"].sub(b"", text)
|
||||
text = SANITIZE_TEXT_RE["bytes_fe_sequences"].sub(b"", text)
|
||||
text = SANITIZE_TEXT_RE["bytes_esc"].sub(b"", text)
|
||||
else:
|
||||
text = SANITIZE_TEXT_RE["str_control"].sub("", text)
|
||||
text = SANITIZE_TEXT_RE["str_csi_sequences"].sub("", text)
|
||||
text = SANITIZE_TEXT_RE["str_fe_sequences"].sub("", text)
|
||||
text = SANITIZE_TEXT_RE["str_esc"].sub("", text)
|
||||
return text
|
||||
|
||||
|
||||
def safe_print(*args, **kwargs):
|
||||
"""
|
||||
A wrapper to print() that runs sanitize_text() on all arguments.
|
||||
"""
|
||||
args = [sanitize_text(i) for i in args]
|
||||
print(*args, **kwargs)
|
||||
|
||||
|
||||
def safe_write(file: TextIO, text: Union[str, bytes], *, add_newline: bool = False):
|
||||
"""
|
||||
Run sanitize_text() on ``text`` and write it to ``file``.
|
||||
|
||||
:param add_newline: Write a newline after writing the ``text``.
|
||||
"""
|
||||
text = sanitize_text(text)
|
||||
if isinstance(text, bytes):
|
||||
file.buffer.write(text)
|
||||
if add_newline:
|
||||
file.buffer.write(os.linesep.encode("utf-8"))
|
||||
else:
|
||||
file.write(text)
|
||||
if add_newline:
|
||||
file.write(os.linesep)
|
||||
|
@ -2,13 +2,19 @@ import os
|
||||
import sys
|
||||
|
||||
|
||||
IS_INTERACTIVE = os.isatty(sys.stdout.fileno())
|
||||
try:
|
||||
IS_INTERACTIVE = os.isatty(sys.stdout.fileno())
|
||||
except OSError:
|
||||
IS_INTERACTIVE = False
|
||||
|
||||
|
||||
ESCAPE_CODES = {
|
||||
"reset": "\033[0m",
|
||||
"bold": "\033[1m",
|
||||
"dim": "\033[2m",
|
||||
"italic": "\033[3m",
|
||||
"underline": "\033[4m",
|
||||
"blink": "\033[5m",
|
||||
"black": "\033[30m",
|
||||
"red": "\033[31m",
|
||||
"green": "\033[32m",
|
||||
@ -17,6 +23,14 @@ ESCAPE_CODES = {
|
||||
"magenta": "\033[35m",
|
||||
"cyan": "\033[36m",
|
||||
"white": "\033[37m",
|
||||
"bg_black": "\033[40m",
|
||||
"bg_red": "\033[41m",
|
||||
"bg_green": "\033[42m",
|
||||
"bg_yellow": "\033[43m",
|
||||
"bg_blue": "\033[44m",
|
||||
"bg_magenta": "\033[45m",
|
||||
"bg_cyan": "\033[46m",
|
||||
"bg_white": "\033[47m",
|
||||
}
|
||||
|
||||
|
||||
|
@ -5,6 +5,7 @@ import unittest
|
||||
import osc.conf
|
||||
from osc.output import KeyValueTable
|
||||
from osc.output import print_msg
|
||||
from osc.output import sanitize_text
|
||||
from osc.output import tty
|
||||
|
||||
|
||||
@ -160,5 +161,82 @@ class TestPrintMsg(unittest.TestCase):
|
||||
self.assertEqual("foo bar\n", stderr.getvalue())
|
||||
|
||||
|
||||
class TestSanitization(unittest.TestCase):
|
||||
def test_control_chars_bytes(self):
|
||||
original = b"".join([i.to_bytes(1, byteorder="big") for i in range(32)])
|
||||
sanitized = sanitize_text(original)
|
||||
self.assertEqual(sanitized, b"\t\n\r")
|
||||
|
||||
def test_control_chars_str(self):
|
||||
original = "".join([chr(i) for i in range(32)])
|
||||
sanitized = sanitize_text(original)
|
||||
self.assertEqual(sanitized, "\t\n\r")
|
||||
|
||||
def test_csi_escape_sequences_str(self):
|
||||
# allowed CSI escape sequences
|
||||
originals = [">\033[0m<", ">\033[1;31;47m]<"]
|
||||
for original in originals:
|
||||
sanitized = sanitize_text(original)
|
||||
self.assertEqual(sanitized, original)
|
||||
|
||||
# not allowed CSI escape sequences
|
||||
originals = [">\033[8m<"]
|
||||
for original in originals:
|
||||
sanitized = sanitize_text(original)
|
||||
self.assertEqual(sanitized, "><")
|
||||
|
||||
def test_csi_escape_sequences_bytes(self):
|
||||
# allowed CSI escape sequences
|
||||
originals = [b">\033[0m<", b">\033[1;31;47m]<"]
|
||||
for original in originals:
|
||||
sanitized = sanitize_text(original)
|
||||
self.assertEqual(sanitized, original)
|
||||
|
||||
# not allowed CSI escape sequences
|
||||
originals = [b">\033[8m<"]
|
||||
for original in originals:
|
||||
sanitized = sanitize_text(original)
|
||||
self.assertEqual(sanitized, b"><")
|
||||
|
||||
def test_standalone_escape_str(self):
|
||||
original = ">\033<"
|
||||
sanitized = sanitize_text(original)
|
||||
self.assertEqual(sanitized, "><")
|
||||
|
||||
def test_standalone_escape_bytes(self):
|
||||
# standalone escape
|
||||
original = b">\033<"
|
||||
sanitized = sanitize_text(original)
|
||||
self.assertEqual(sanitized, b"><")
|
||||
|
||||
def test_fe_escape_sequences_str(self):
|
||||
for i in range(0x40, 0x5F + 1):
|
||||
char = chr(i)
|
||||
original = f">\033{char}<"
|
||||
sanitized = sanitize_text(original)
|
||||
self.assertEqual(sanitized, "><")
|
||||
|
||||
def test_fe_escape_sequences_bytes(self):
|
||||
for i in range(0x40, 0x5F + 1):
|
||||
byte = i.to_bytes(1, byteorder="big")
|
||||
original = b">\033" + byte + b"<"
|
||||
sanitized = sanitize_text(original)
|
||||
self.assertEqual(sanitized, b"><")
|
||||
|
||||
def test_osc_escape_sequences_str(self):
|
||||
# OSC (Operating System Command) sequences
|
||||
original = "\033]0;this is the window title\007"
|
||||
sanitized = sanitize_text(original)
|
||||
# \033] is removed with the Fe sequences
|
||||
self.assertEqual(sanitized, "0;this is the window title")
|
||||
|
||||
def test_osc_escape_sequences_bytes(self):
|
||||
# OSC (Operating System Command) sequences
|
||||
original = b"\033]0;this is the window title\007"
|
||||
sanitized = sanitize_text(original)
|
||||
# \033] is removed with the Fe sequences
|
||||
self.assertEqual(sanitized, b"0;this is the window title")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
Loading…
x
Reference in New Issue
Block a user