diff --git a/osc/core.py b/osc/core.py index 1a3f5ab3..27d5535a 100644 --- a/osc/core.py +++ b/osc/core.py @@ -5181,8 +5181,6 @@ def unpack_srcrpm(srpm, dir, *files): with open(os.devnull, 'w') as devnull: rpm2cpio_proc = subprocess.Popen(['rpm2cpio'], stdin=fsrpm, stdout=subprocess.PIPE) - # XXX: shell injection is possible via the files parameter, but the - # current osc code does not use the files parameter. cpio_proc = subprocess.Popen(['cpio', '-i'] + list(files), stdin=rpm2cpio_proc.stdout, stderr=devnull) diff --git a/osc/util/ar.py b/osc/util/ar.py index 9fa53c51..66c02918 100644 --- a/osc/util/ar.py +++ b/osc/util/ar.py @@ -19,36 +19,32 @@ import re import stat import sys from io import BytesIO - - -# workaround for python24 -if not hasattr(os, 'SEEK_SET'): - os.SEEK_SET = 0 +from typing import Union class ArError(Exception): """Base class for all ar related errors""" - def __init__(self, fn, msg): + def __init__(self, fn: bytes, msg: str): super().__init__() self.file = fn self.msg = msg def __str__(self): - return 'ar error: %s' % self.msg + return f"{self.msg}: {self.file.decode('utf-8')}" class ArHdr: """Represents an ar header entry""" - def __init__(self, fn, date, uid, gid, mode, size, fmag, off): + def __init__(self, fn: bytes, date: bytes, uid: bytes, gid: bytes, mode: bytes, size: bytes, fmag: bytes, off: bytes): self.file = fn.strip() self.date = date.strip() self.uid = uid.strip() self.gid = gid.strip() if not mode.strip(): # provide a dummy mode for the ext_fn hdr - mode = '0' + mode = b"0" self.mode = stat.S_IMODE(int(mode, 8)) self.size = int(size) self.fmag = fmag @@ -75,9 +71,17 @@ class ArFile(BytesIO): working dir is used. Additionally it tries to set the owner/group and permissions. """ + if self.name.startswith(b"/"): + raise ArError(self.name, "Extracting files with absolute paths is not supported for security reasons") + if not dir: dir = os.getcwdb() - fn = os.path.join(dir, self.name) + fn = os.path.join(dir, self.name.decode("utf-8")) + + dir_path, _ = os.path.split(fn) + if dir_path: + os.makedirs(dir_path, exist_ok=True) + with open(fn, 'wb') as f: f.write(self.getvalue()) os.chmod(fn, self.mode) @@ -88,6 +92,7 @@ class ArFile(BytesIO): if gid not in os.getgroups() or os.getegid() != 0: gid = -1 os.chown(fn, uid, gid) + return fn def __str__(self): return '%s %s %s %s' % (self.name, self.uid, @@ -140,26 +145,30 @@ class Ar: data section. The end of such a filename is indicated with a trailing '/'. Another special file is the '/' which contains the symbol lookup table. """ + + # read extended header with long file names and then only seek into the right offsets + self.__file.seek(self.ext_fnhdr.dataoff, os.SEEK_SET) + ext_fnhdr_data = self.__file.read(self.ext_fnhdr.size) + for h in self.hdrs: if h.file == b'/': continue - # remove slashes which are appended by ar - h.file = h.file.rstrip(b'/') - if not h.file.startswith(b'/'): + + if h.file.endswith(b"/"): + # regular file name + h.file = h.file[:-1] continue - # handle long filename - off = int(h.file[1:len(h.file)]) - start = self.ext_fnhdr.dataoff + off - self.__file.seek(start, os.SEEK_SET) - # XXX: is it safe to read all the data in one chunk? I assume the '//' data section - # won't be too large - data = self.__file.read(self.ext_fnhdr.size) - end = data.find(b'/') - if end != -1: - h.file = data[0:end] - else: + + # long file name + assert h.file[0:1] == b"/" + start = int(h.file[1:]) + end = ext_fnhdr_data.find(b'/', start) + + if end == -1: raise ArError(b'//', 'invalid data section - trailing slash (off: %d)' % start) + h.file = ext_fnhdr_data[start:end] + def _get_file(self, hdr): self.__file.seek(hdr.dataoff, os.SEEK_SET) return ArFile(hdr.file, hdr.uid, hdr.gid, hdr.mode, @@ -193,7 +202,10 @@ class Ar: pos += hdr.size + (hdr.size & 1) self._fixupFilenames() - def get_file(self, fn): + def get_file(self, fn: Union[str, bytes]): + # accept str for better user experience + if isinstance(fn, str): + fn = fn.encode("utf-8") for h in self.hdrs: if h.file == fn: return self._get_file(h) diff --git a/osc/util/cpio.py b/osc/util/cpio.py index 09324277..bbfb278a 100644 --- a/osc/util/cpio.py +++ b/osc/util/cpio.py @@ -20,10 +20,6 @@ import struct import sys -# workaround for python24 -if not hasattr(os, 'SEEK_SET'): - os.SEEK_SET = 0 - # format implementation is based on src/copyin.c and src/util.c (see cpio sources) @@ -129,7 +125,16 @@ class CpioRead: msg = '\'%s\' is no regular file - only regular files are supported atm' % hdr.filename raise NotImplementedError(msg) self.__file.seek(hdr.dataoff, os.SEEK_SET) + + if fn.startswith(b"/"): + raise CpioError(fn, "Extracting files with absolute paths is not supported for security reasons") + fn = os.path.join(dest, fn) + + dir_path, _ = os.path.split(fn) + if dir_path: + os.makedirs(dir_path, exist_ok=True) + with open(fn, 'wb') as f: f.write(self.__file.read(hdr.filesize)) os.chmod(fn, hdr.mode) @@ -183,12 +188,22 @@ class CpioRead: to a dir the file will be stored in dest/filename. In case new_fn is specified the file will be stored as new_fn. """ + + # accept str for better user experience + if isinstance(filename, str): + filename = filename.encode("utf-8") + if isinstance(dest, str): + dest = dest.encode("utf-8") + if isinstance(new_fn, str): + new_fn = new_fn.encode("utf-8") + hdr = self._get_hdr(filename) if not hdr: raise CpioError(filename, '\'%s\' does not exist in archive' % filename) dest = dest or os.getcwdb() fn = new_fn or filename self._copyin_file(hdr, dest, fn) + return os.path.join(dest, fn).decode("utf-8") def copyin(self, dest=None): """ diff --git a/tests/fixtures/README b/tests/fixtures/README new file mode 100644 index 00000000..2b7fa007 --- /dev/null +++ b/tests/fixtures/README @@ -0,0 +1,35 @@ +Generate data for creating test archives +---------------------------------------- + +echo 'foobar' > /tmp/foo + +# root perms required for the next command +echo 'numbers' > /123 + +echo 'qwerty' > very-long-long-long-long-name + +echo 'asdfgh' > very-long-long-long-long-name2 + +echo 'newline' > 'very-long-name +-with-newline' + +echo 'newline' > 'a +b' + +mkdir 'dir' +echo 'file-in-a-dir' > dir/file + + +Create archive.ar +----------------- + +ar qP archive.ar /tmp/foo /123 very-long-long-long-long-name very-long-long-long-long-name2 'very-long-name +-with-newline' 'a +b' dir/file + + +Create archive.cpio +------------------- + +printf "/tmp/foo\0/123\0very-long-long-long-long-name\0very-long-long-long-long-name2\0very-long-name +-with-newline\0a\nb\0dir/file\0" | cpio -ocv0 --owner=root:root > archive.cpio diff --git a/tests/fixtures/archive.ar b/tests/fixtures/archive.ar new file mode 100644 index 00000000..afbde3dd --- /dev/null +++ b/tests/fixtures/archive.ar @@ -0,0 +1,25 @@ +! +// 94 ` +very-long-long-long-long-name/ +very-long-long-long-long-name2/ +very-long-name +-with-newline/ + +/tmp/foo/ 1716888536 1000 1000 100644 7 ` +foobar + +/123/ 1716883776 0 0 100644 8 ` +numbers +/0 1716882802 1000 1000 100644 7 ` +qwerty + +/31 1716882988 1000 1000 100644 7 ` +asdfgh + +/63 1716884767 1000 1000 100644 8 ` +newline +a +b/ 1716884876 1000 1000 100644 8 ` +newline +dir/file/ 1716992150 1000 1000 100644 14 ` +file-in-a-dir diff --git a/tests/fixtures/archive.cpio b/tests/fixtures/archive.cpio new file mode 100644 index 00000000..d92e3210 Binary files /dev/null and b/tests/fixtures/archive.cpio differ diff --git a/tests/test_util_ar.py b/tests/test_util_ar.py new file mode 100644 index 00000000..227fd473 --- /dev/null +++ b/tests/test_util_ar.py @@ -0,0 +1,86 @@ +import os +import shutil +import tempfile +import unittest + +from osc.util.ar import Ar +from osc.util.ar import ArError + + +FIXTURES_DIR = os.path.join(os.path.dirname(__file__), "fixtures") + + +class TestAr(unittest.TestCase): + def setUp(self): + self.tmpdir = tempfile.mkdtemp(prefix="osc_test_") + try: + self.old_cwd = os.getcwd() + except FileNotFoundError: + self.old_cwd = os.path.expanduser("~") + os.chdir(self.tmpdir) + self.archive = os.path.join(FIXTURES_DIR, "archive.ar") + self.ar = Ar(self.archive) + self.ar.read() + + def tearDown(self): + os.chdir(self.old_cwd) + shutil.rmtree(self.tmpdir) + + def test_file_list(self): + actual = [i.name for i in self.ar] + expected = [ + # absolute path + b"/tmp/foo", + # this is a filename, not a long filename reference + b"/123", + b"very-long-long-long-long-name", + b"very-long-long-long-long-name2", + # long file name with a newline + b"very-long-name\n-with-newline", + # short file name with a newline + b"a\nb", + b"dir/file", + ] + self.assertEqual(actual, expected) + + def test_get_file(self): + f = self.ar.get_file(b"/tmp/foo") + self.assertIsNotNone(f) + + f = self.ar.get_file("/tmp/foo") + self.assertIsNotNone(f) + + f = self.ar.get_file("does-not-exist") + self.assertIsNone(f) + + def test_saveTo(self): + f = self.ar.get_file("a\nb") + path = f.saveTo(self.tmpdir) + + # check that we've got the expected path + self.assertEqual(path, os.path.join(self.tmpdir, "a\nb")) + + # ... and that the contents also match + with open(path, "r", encoding="utf-8") as f: + self.assertEqual(f.read(), "newline\n") + + def test_saveTo_subdir(self): + f = self.ar.get_file("dir/file") + path = f.saveTo(self.tmpdir) + + # check that we've got the expected path + self.assertEqual(path, os.path.join(self.tmpdir, "dir/file")) + + # ... and that the contents also match + with open(path, "r", encoding="utf-8") as f: + self.assertEqual(f.read(), "file-in-a-dir\n") + + def test_saveTo_abspath(self): + f = self.ar.get_file("/tmp/foo") + assert f is not None + # this is supposed to throw an error, extracting files with absolute paths might overwrite system files + self.assertRaises(ArError, f.saveTo, self.tmpdir) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_util_cpio.py b/tests/test_util_cpio.py new file mode 100644 index 00000000..363b6373 --- /dev/null +++ b/tests/test_util_cpio.py @@ -0,0 +1,71 @@ +import os +import shutil +import tempfile +import unittest + +from osc.util.cpio import CpioRead +from osc.util.cpio import CpioError + + +FIXTURES_DIR = os.path.join(os.path.dirname(__file__), "fixtures") + + +class TestCpio(unittest.TestCase): + def setUp(self): + self.tmpdir = tempfile.mkdtemp(prefix="osc_test_") + try: + self.old_cwd = os.getcwd() + except FileNotFoundError: + self.old_cwd = os.path.expanduser("~") + os.chdir(self.tmpdir) + self.archive = os.path.join(FIXTURES_DIR, "archive.cpio") + self.cpio = CpioRead(self.archive) + self.cpio.read() + + def tearDown(self): + os.chdir(self.old_cwd) + shutil.rmtree(self.tmpdir) + + def test_file_list(self): + actual = [i.filename for i in self.cpio] + expected = [ + # absolute path + b"/tmp/foo", + # this is a filename, not a long filename reference + b"/123", + b"very-long-long-long-long-name", + b"very-long-long-long-long-name2", + # long file name with a newline + b"very-long-name\n-with-newline", + # short file name with a newline + b"a\nb", + b"dir/file", + ] + self.assertEqual(actual, expected) + + def test_copyin_file(self): + path = self.cpio.copyin_file("a\nb", dest=self.tmpdir) + + # check that we've got the expected path + self.assertEqual(path, os.path.join(self.tmpdir, "a\nb")) + + # ... and that the contents also match + with open(path, "r", encoding="utf-8") as f: + self.assertEqual(f.read(), "newline\n") + + def test_copyin_file_abspath(self): + self.assertRaises(CpioError, self.cpio.copyin_file, "/tmp/foo") + + def test_copyin_file_subdir(self): + path = self.cpio.copyin_file("dir/file", dest=self.tmpdir) + + # check that we've got the expected path + self.assertEqual(path, os.path.join(self.tmpdir, "dir/file")) + + # ... and that the contents also match + with open(path, "r", encoding="utf-8") as f: + self.assertEqual(f.read(), "file-in-a-dir\n") + + +if __name__ == "__main__": + unittest.main()