From c0c05319d093e96c5c7bac406ae73979391ccbab Mon Sep 17 00:00:00 2001 From: Daniel Mach Date: Fri, 16 Feb 2024 14:09:22 +0100 Subject: [PATCH 1/4] Move removing control characters to output.sanitize_text() --- osc/core.py | 7 +++--- osc/output/__init__.py | 3 +++ osc/output/output.py | 48 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 54 insertions(+), 4 deletions(-) diff --git a/osc/core.py b/osc/core.py index 0965089d..6ace1666 100644 --- a/osc/core.py +++ b/osc/core.py @@ -53,6 +53,7 @@ from . import oscerr from . import output from . import store as osc_store from .connection import http_request, http_GET, http_POST, http_PUT, http_DELETE +from .output import sanitize_text from .store import Store from .util import xdg from .util.helper import decode_list, decode_it, raw_input, _html_escape @@ -6998,11 +6999,9 @@ def print_buildlog( def print_data(data, strip_time=False): if strip_time: data = buildlog_strip_time(data) - output_buffer.write(data.translate(all_bytes, remove_bytes)) + # to protect us against control characters (CVE-2012-1095) + output_buffer.write(sanitize_text(data)) - # to protect us against control characters (CVE-2012-1095) - all_bytes = bytes.maketrans(b'', b'') - remove_bytes = all_bytes[:8] + all_bytes[14:32] # accept tabs and newlines query = {'nostream': '1', 'start': f'{offset}'} if last: query['last'] = 1 diff --git a/osc/output/__init__.py b/osc/output/__init__.py index 410727df..254feb86 100644 --- a/osc/output/__init__.py +++ b/osc/output/__init__.py @@ -1,6 +1,9 @@ from .key_value_table import KeyValueTable from .input import get_user_input from .output import print_msg +from .output import sanitize_text +from .output import safe_print +from .output import safe_write from .tty import colorize from .widechar import wc_ljust from .widechar import wc_width diff --git a/osc/output/output.py b/osc/output/output.py index 879d238f..2919ab77 100644 --- a/osc/output/output.py +++ b/osc/output/output.py @@ -1,5 +1,8 @@ +import os import sys from typing import Optional +from typing import TextIO +from typing import Union from . import tty @@ -39,3 +42,48 @@ def print_msg(*args, print_to: Optional[str] = "debug"): print(*args, file=sys.stderr) else: raise ValueError(f"Invalid value of the 'print_to' option: {print_to}") + + +# Forbidden characters are nearly all control characters 0-31 with the exception of: +# 0x09 - horizontal tab (\t) +# 0x0A - line feed (\n) +# 0x0D - carriage return (\r) +# (related to CVE-2012-1095) +# +# It would be good to selectively allow 0x1B with safe & trusted escape sequences. +FORBIDDEN_BYTES = b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x0b\x0c\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f" +FORBIDDEN_CHARS = dict.fromkeys(FORBIDDEN_BYTES) + + +def sanitize_text(text: Union[bytes, str]) -> Union[bytes, str]: + """ + Remove forbidden characters from ``text``. + """ + if isinstance(text, bytes): + return text.translate(None, FORBIDDEN_BYTES) + return text.translate(FORBIDDEN_CHARS) + + +def safe_print(*args, **kwargs): + """ + A wrapper to print() that runs sanitize_text() on all arguments. + """ + args = [sanitize_text(i) for i in args] + print(*args, **kwargs) + + +def safe_write(file: TextIO, text: Union[str, bytes], *, add_newline: bool = False): + """ + Run sanitize_text() on ``text`` and write it to ``file``. + + :param add_newline: Write a newline after writing the ``text``. + """ + text = sanitize_text(text) + if isinstance(text, bytes): + file.buffer.write(text) + if add_newline: + file.buffer.write(os.linesep.encode("utf-8")) + else: + file.write(text) + if add_newline: + file.write(os.linesep) From e332099544bb36c8c3f1f77243d9f226a7d453b4 Mon Sep 17 00:00:00 2001 From: Daniel Mach Date: Thu, 7 Mar 2024 08:23:56 +0100 Subject: [PATCH 2/4] Update list of color codes in 'output.tty' module --- osc/output/tty.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/osc/output/tty.py b/osc/output/tty.py index 3880417d..e2642602 100644 --- a/osc/output/tty.py +++ b/osc/output/tty.py @@ -8,7 +8,10 @@ IS_INTERACTIVE = os.isatty(sys.stdout.fileno()) ESCAPE_CODES = { "reset": "\033[0m", "bold": "\033[1m", + "dim": "\033[2m", + "italic": "\033[3m", "underline": "\033[4m", + "blink": "\033[5m", "black": "\033[30m", "red": "\033[31m", "green": "\033[32m", @@ -17,6 +20,14 @@ ESCAPE_CODES = { "magenta": "\033[35m", "cyan": "\033[36m", "white": "\033[37m", + "bg_black": "\033[40m", + "bg_red": "\033[41m", + "bg_green": "\033[42m", + "bg_yellow": "\033[43m", + "bg_blue": "\033[44m", + "bg_magenta": "\033[45m", + "bg_cyan": "\033[46m", + "bg_white": "\033[47m", } From 2d5399442d32e9944621b4d1b09b5c10db7df0b5 Mon Sep 17 00:00:00 2001 From: Daniel Mach Date: Thu, 7 Mar 2024 08:37:13 +0100 Subject: [PATCH 3/4] Fix output.tty.IS_INTERACTIVE when os.isatty() throws OSError --- osc/output/tty.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/osc/output/tty.py b/osc/output/tty.py index e2642602..c99a6e35 100644 --- a/osc/output/tty.py +++ b/osc/output/tty.py @@ -2,7 +2,10 @@ import os import sys -IS_INTERACTIVE = os.isatty(sys.stdout.fileno()) +try: + IS_INTERACTIVE = os.isatty(sys.stdout.fileno()) +except OSError: + IS_INTERACTIVE = False ESCAPE_CODES = { From f9b17347da9f0eee16b3f7890c1091ce62f15ce9 Mon Sep 17 00:00:00 2001 From: Daniel Mach Date: Thu, 7 Mar 2024 11:59:51 +0100 Subject: [PATCH 4/4] Improve sanitize_text() to keep selected CSI escape sequences --- osc/output/output.py | 84 +++++++++++++++++++++++++++++++++++++------- tests/test_output.py | 78 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 150 insertions(+), 12 deletions(-) diff --git a/osc/output/output.py b/osc/output/output.py index 2919ab77..63b46a46 100644 --- a/osc/output/output.py +++ b/osc/output/output.py @@ -1,5 +1,7 @@ import os +import re import sys +from typing import Dict from typing import Optional from typing import TextIO from typing import Union @@ -44,24 +46,82 @@ def print_msg(*args, print_to: Optional[str] = "debug"): raise ValueError(f"Invalid value of the 'print_to' option: {print_to}") -# Forbidden characters are nearly all control characters 0-31 with the exception of: -# 0x09 - horizontal tab (\t) -# 0x0A - line feed (\n) -# 0x0D - carriage return (\r) -# (related to CVE-2012-1095) -# -# It would be good to selectively allow 0x1B with safe & trusted escape sequences. -FORBIDDEN_BYTES = b"\x00\x01\x02\x03\x04\x05\x06\x07\x08\x0b\x0c\x0e\x0f\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f" -FORBIDDEN_CHARS = dict.fromkeys(FORBIDDEN_BYTES) +# cached compiled regular expressions; they are created on the first use +SANITIZE_TEXT_RE: Optional[Dict] = None def sanitize_text(text: Union[bytes, str]) -> Union[bytes, str]: """ - Remove forbidden characters from ``text``. + Remove forbidden characters and escape sequences from ``text``. + + This must be run on lines or the whole text to work correctly. + Processing blocks of constant size might lead to splitting escape sequences + and leaving garbage characters after sanitizing. """ + global SANITIZE_TEXT_RE + + if not SANITIZE_TEXT_RE: + SANITIZE_TEXT_RE = {} + + # CONTROL CHARACTERS + # remove all control characters with the exception of: + # 0x09 - horizontal tab (\t) + # 0x0A - line feed (\n) + # 0x0D - carriage return (\r) + # 0x1B - escape - is selectively handled later as part of sanitizing escape sequences + + regex = r"[\x00-\x08\x0B\x0C\x0E-\x1A\x1C-\x1F]" + SANITIZE_TEXT_RE["str_control"] = re.compile(regex) + SANITIZE_TEXT_RE["bytes_control"] = re.compile(regex.encode("ascii")) + + # CSI ESCAPE SEQUENCES + # https://en.wikipedia.org/wiki/ANSI_escape_code#CSI_codes + # remove all but allowed CSI escape sequences + + # negative lookahead assertion that allows safe color escape sequences + neg_allowed_csi_sequences = r"(?!\[([0-5]|[34][0-7]|;)+m)" + + # range 0x30–0x3F (OCT \040-\077) (ASCII 0–9:;<=>?); zero or more characters + csi_parameter_bytes = r"[\x30-\x3F]*" + + # range 0x20–0x2F (OCT \040-\057) (ASCII space and !"#$%&'()*+,-./); zero or more characters + csi_itermediate_bytes = r"[\x20-\x2F]*" + + # range 0x40–0x7E (OCT \100-\176) (ASCII @A–Z[\]^_`a–z{|}~); 1 character + csi_final_byte = r"[\x40-\x7E]" + + regex = rf"\033{neg_allowed_csi_sequences}\[{csi_parameter_bytes}{csi_itermediate_bytes}{csi_final_byte}" + SANITIZE_TEXT_RE["str_csi_sequences"] = re.compile(regex) + SANITIZE_TEXT_RE["bytes_csi_sequences"] = re.compile(regex.encode("ascii")) + + # FE ESCAPE SEQUENCES + # https://en.wikipedia.org/wiki/ANSI_escape_code#Fe_Escape_sequences + # remove all Fe escape sequences + + # range 0x40 to 0x5F (ASCII @A–Z[\]^_); 1 character + fe = r"[\x40-x5F]" + regex = rf"\033{neg_allowed_csi_sequences}{fe}" + SANITIZE_TEXT_RE["str_fe_sequences"] = re.compile(regex) + SANITIZE_TEXT_RE["bytes_fe_sequences"] = re.compile(regex.encode("ascii")) + + # REMAINING ESCAPE CHARACTERS + # remove all remaining escape characters that are not followed with the allowed CSI escape sequences + + regex = rf"\033{neg_allowed_csi_sequences}" + SANITIZE_TEXT_RE["str_esc"] = re.compile(regex) + SANITIZE_TEXT_RE["bytes_esc"] = re.compile(regex.encode("ascii")) + if isinstance(text, bytes): - return text.translate(None, FORBIDDEN_BYTES) - return text.translate(FORBIDDEN_CHARS) + text = SANITIZE_TEXT_RE["bytes_control"].sub(b"", text) + text = SANITIZE_TEXT_RE["bytes_csi_sequences"].sub(b"", text) + text = SANITIZE_TEXT_RE["bytes_fe_sequences"].sub(b"", text) + text = SANITIZE_TEXT_RE["bytes_esc"].sub(b"", text) + else: + text = SANITIZE_TEXT_RE["str_control"].sub("", text) + text = SANITIZE_TEXT_RE["str_csi_sequences"].sub("", text) + text = SANITIZE_TEXT_RE["str_fe_sequences"].sub("", text) + text = SANITIZE_TEXT_RE["str_esc"].sub("", text) + return text def safe_print(*args, **kwargs): diff --git a/tests/test_output.py b/tests/test_output.py index 49166837..33971392 100644 --- a/tests/test_output.py +++ b/tests/test_output.py @@ -5,6 +5,7 @@ import unittest import osc.conf from osc.output import KeyValueTable from osc.output import print_msg +from osc.output import sanitize_text from osc.output import tty @@ -160,5 +161,82 @@ class TestPrintMsg(unittest.TestCase): self.assertEqual("foo bar\n", stderr.getvalue()) +class TestSanitization(unittest.TestCase): + def test_control_chars_bytes(self): + original = b"".join([i.to_bytes(1, byteorder="big") for i in range(32)]) + sanitized = sanitize_text(original) + self.assertEqual(sanitized, b"\t\n\r") + + def test_control_chars_str(self): + original = "".join([chr(i) for i in range(32)]) + sanitized = sanitize_text(original) + self.assertEqual(sanitized, "\t\n\r") + + def test_csi_escape_sequences_str(self): + # allowed CSI escape sequences + originals = [">\033[0m<", ">\033[1;31;47m]<"] + for original in originals: + sanitized = sanitize_text(original) + self.assertEqual(sanitized, original) + + # not allowed CSI escape sequences + originals = [">\033[8m<"] + for original in originals: + sanitized = sanitize_text(original) + self.assertEqual(sanitized, "><") + + def test_csi_escape_sequences_bytes(self): + # allowed CSI escape sequences + originals = [b">\033[0m<", b">\033[1;31;47m]<"] + for original in originals: + sanitized = sanitize_text(original) + self.assertEqual(sanitized, original) + + # not allowed CSI escape sequences + originals = [b">\033[8m<"] + for original in originals: + sanitized = sanitize_text(original) + self.assertEqual(sanitized, b"><") + + def test_standalone_escape_str(self): + original = ">\033<" + sanitized = sanitize_text(original) + self.assertEqual(sanitized, "><") + + def test_standalone_escape_bytes(self): + # standalone escape + original = b">\033<" + sanitized = sanitize_text(original) + self.assertEqual(sanitized, b"><") + + def test_fe_escape_sequences_str(self): + for i in range(0x40, 0x5F + 1): + char = chr(i) + original = f">\033{char}<" + sanitized = sanitize_text(original) + self.assertEqual(sanitized, "><") + + def test_fe_escape_sequences_bytes(self): + for i in range(0x40, 0x5F + 1): + byte = i.to_bytes(1, byteorder="big") + original = b">\033" + byte + b"<" + sanitized = sanitize_text(original) + self.assertEqual(sanitized, b"><") + + def test_osc_escape_sequences_str(self): + # OSC (Operating System Command) sequences + original = "\033]0;this is the window title\007" + sanitized = sanitize_text(original) + # \033] is removed with the Fe sequences + self.assertEqual(sanitized, "0;this is the window title") + + def test_osc_escape_sequences_bytes(self): + # OSC (Operating System Command) sequences + original = b"\033]0;this is the window title\007" + sanitized = sanitize_text(original) + # \033] is removed with the Fe sequences + self.assertEqual(sanitized, b"0;this is the window title") + + if __name__ == "__main__": unittest.main()