1
0
mirror of https://github.com/openSUSE/osc.git synced 2024-09-20 01:06:17 +02:00

Fix possibility to overwrite special files in .osc (CVE-2024-22034 boo#1225911)

Source files are now stored in the 'sources' subdirectory which prevents
name collisons. This requires changing version of '.osc' store to 2.0.
This commit is contained in:
Daniel Mach 2024-06-25 20:58:53 +02:00
parent d8bfd4521e
commit a887ade78f
11 changed files with 166 additions and 98 deletions

View File

@ -9535,14 +9535,12 @@ Please submit there instead, or use --nodevelproject to force direct submission.
store_write_string(destdir, '_linkrepair', '')
pac = Package(destdir)
storedir = os.path.join(destdir, store)
for name in sorted(entries.keys()):
md5_old = entries_old.get(name, '')
md5_new = entries_new.get(name, '')
md5_oldpatched = entries_oldpatched.get(name, '')
if md5_new != '':
self.download(name, md5_new, dir_new, os.path.join(storedir, name))
self.download(name, md5_new, dir_new, pac.store.sources_get_path(name))
if md5_old == md5_new:
if md5_oldpatched == '':
pac.put_on_deletelist(name)
@ -9554,17 +9552,17 @@ Please submit there instead, or use --nodevelproject to force direct submission.
if md5_new == '':
continue
print(statfrmt('U', name))
shutil.copy2(os.path.join(storedir, name), os.path.join(destdir, name))
shutil.copy2(pac.store.sources_get_path(name), os.path.join(destdir, name))
continue
if md5_new == md5_oldpatched:
if md5_new == '':
continue
print(statfrmt('G', name))
shutil.copy2(os.path.join(storedir, name), os.path.join(destdir, name))
shutil.copy2(pac.store.sources_get_path(name), os.path.join(destdir, name))
continue
self.download(name, md5_oldpatched, dir_oldpatched, os.path.join(destdir, name + '.mine'))
if md5_new != '':
shutil.copy2(os.path.join(storedir, name), os.path.join(destdir, name + '.new'))
shutil.copy2(pac.store.sources_get_path(name), os.path.join(destdir, name + '.new'))
else:
self.download(name, md5_new, dir_new, os.path.join(destdir, name + '.new'))
self.download(name, md5_old, dir_old, os.path.join(destdir, name + '.old'))

View File

@ -2820,7 +2820,7 @@ def get_source_file_diff(dir, filename, rev, oldfilename=None, olddir=None, orig
oldfilename = filename
if not olddir:
olddir = os.path.join(dir, store)
olddir = os.path.join(dir, store, "sources")
if not origfilename:
origfilename = filename

View File

@ -16,6 +16,7 @@ from .linkinfo import Linkinfo
from .serviceinfo import Serviceinfo
from .store import __store_version__
from .store import Store
from .store import check_store_version
from .store import read_inconflict
from .store import read_filemeta
from .store import read_sizelimit
@ -154,16 +155,13 @@ class Package:
if self.scm_url:
return dirty_files
for fname in self.filenamelist:
if not os.path.exists(os.path.join(self.storedir, fname)) and fname not in self.skipped:
if not self.store.sources_is_file(fname) and fname not in self.skipped:
dirty_files.append(fname)
for fname in Package.REQ_STOREFILES:
if not os.path.isfile(os.path.join(self.storedir, fname)):
dirty_files.append(fname)
for fname in os.listdir(self.storedir):
if fname in Package.REQ_STOREFILES or fname in Package.OPT_STOREFILES or \
fname.startswith('_build'):
continue
elif fname in self.filenamelist and fname in self.skipped:
for fname in self.store.sources_list_files():
if fname in self.filenamelist and fname in self.skipped:
dirty_files.append(fname)
elif fname not in self.filenamelist:
dirty_files.append(fname)
@ -182,6 +180,8 @@ class Package:
store = Store(self.dir, check=False)
store.assert_is_package()
# check_store_version() does the metadata migration that was disabled due to Store(..., check=False)
check_store_version(self.absdir)
# there was a time when osc did not write _osclib_version file; let's assume these checkouts have version 1.0
if not store.exists("_osclib_version"):
@ -202,10 +202,10 @@ class Package:
# all files which are present in the filelist have to exist in the storedir
for f in self.filelist:
# XXX: should we also check the md5?
if not os.path.exists(os.path.join(self.storedir, f.name)) and f.name not in self.skipped:
if not self.store.sources_is_file(f.name) and f.name not in self.skipped:
# if get_source_file fails we're screwed up...
get_source_file(self.apiurl, self.prjname, self.name, f.name,
targetfilename=os.path.join(self.storedir, f.name), revision=self.rev,
targetfilename=self.store.sources_get_path(f.name), revision=self.rev,
mtime=f.mtime)
repaired = True
@ -213,9 +213,11 @@ class Package:
if fname in Package.REQ_STOREFILES or fname in Package.OPT_STOREFILES or \
fname.startswith('_build'):
continue
elif fname not in self.filenamelist or fname in self.skipped:
for fname in self.store.sources_list_files():
if fname not in self.filenamelist or fname in self.skipped:
# this file does not belong to the storedir so remove it
store.unlink(fname)
os.unlink(self.store.sources_get_path(fname))
repaired = True
for fname in self.to_be_deleted[:]:
@ -247,11 +249,9 @@ class Package:
raise oscerr.OscIOError(None, f'error: file \'{n}\' does not exist')
if n in self.to_be_deleted:
self.to_be_deleted.remove(n)
# self.delete_storefile(n)
self.write_deletelist()
elif n in self.filenamelist or n in self.to_be_added:
raise oscerr.PackageFileConflict(self.prjname, self.name, n, f'osc: warning: \'{n}\' is already under version control')
# shutil.copyfile(os.path.join(self.dir, n), os.path.join(self.storedir, n))
if self.dir != '.':
pathname = os.path.join(self.dir, n)
else:
@ -291,12 +291,6 @@ class Package:
self.write_deletelist()
return (True, state)
def delete_storefile(self, n):
try:
os.unlink(os.path.join(self.storedir, n))
except:
pass
def delete_localfile(self, n):
try:
os.unlink(os.path.join(self.dir, n))
@ -320,7 +314,7 @@ class Package:
if n in self.in_conflict:
filename = os.path.join(self.dir, n)
storefilename = os.path.join(self.storedir, n)
storefilename = self.store.sources_get_path(n)
myfilename = os.path.join(self.dir, n + '.mine')
upfilename = os.path.join(self.dir, n + '.new')
@ -364,7 +358,7 @@ class Package:
def delete_source_file(self, n):
"""delete local a source file"""
self.delete_localfile(n)
self.delete_storefile(n)
self.store.sources_delete_file(n)
def delete_remote_source_file(self, n):
"""delete a remote source file (e.g. from the server)"""
@ -393,7 +387,7 @@ class Package:
def __commit_update_store(self, tdir):
"""move files from transaction directory into the store"""
for filename in os.listdir(tdir):
os.rename(os.path.join(tdir, filename), os.path.join(self.storedir, filename))
os.rename(os.path.join(tdir, filename), self.store.sources_get_path(filename))
def __generate_commitlist(self, todo_send):
root = ET.Element('directory')
@ -543,7 +537,7 @@ class Package:
# in sha256sums.
# The storefile is guaranteed to exist (since we have a
# pulled/linkrepair wc, the file cannot have state 'S')
storefile = os.path.join(self.storedir, filename)
storefile = self.store.sources_get_path(filename)
sha256sums[filename] = sha256_dgst(storefile)
if not force and not real_send and not todo_delete and not self.islinkrepair() and not self.ispulled():
@ -623,7 +617,7 @@ class Package:
store_write_string(self.absdir, '_files', ET.tostring(sfilelist, encoding=ET_ENCODING) + '\n')
for filename in todo_delete:
self.to_be_deleted.remove(filename)
self.delete_storefile(filename)
self.store.sources_delete_file(filename)
self.write_deletelist()
self.write_addlist()
self.update_datastructs()
@ -667,7 +661,7 @@ class Package:
from ..core import utime
filename = os.path.join(self.dir, n)
storefilename = os.path.join(self.storedir, n)
storefilename = self.store.sources_get_path(n)
origfile_tmp = os.path.join(self.storedir, '_in_update', f'{n}.copy')
origfile = os.path.join(self.storedir, '_in_update', n)
if os.path.isfile(filename):
@ -691,7 +685,7 @@ class Package:
from ..core import run_external
filename = os.path.join(self.dir, n)
storefilename = os.path.join(self.storedir, n)
storefilename = self.store.sources_get_path(n)
myfilename = os.path.join(self.dir, n + '.mine')
upfilename = os.path.join(self.dir, n + '.new')
origfile_tmp = os.path.join(self.storedir, '_in_update', f'{n}.copy')
@ -996,7 +990,7 @@ class Package:
known_by_meta = True
if os.path.exists(localfile):
exists = True
if os.path.exists(os.path.join(self.storedir, n)):
if self.store.sources_is_file(n):
exists_in_store = True
if n in self.to_be_deleted:
@ -1074,7 +1068,7 @@ class Package:
b_revision = self.rev.encode()
diff.append(b'--- %s\t(revision %s)\n' % (fname.encode(), b_revision))
diff.append(b'+++ %s\t(working copy)\n' % fname.encode())
fname = os.path.join(self.storedir, fname)
fname = self.store.sources_get_path(fname)
fd = None
tmpfile = None
@ -1427,8 +1421,8 @@ rev: %s
raise oscerr.PackageInternalError(self.prjname, self.name, 'too many files in \'_in_update\' dir')
tmp = rfiles[:]
for f in tmp:
if os.path.exists(os.path.join(self.storedir, f.name)):
if dgst(os.path.join(self.storedir, f.name)) == f.md5:
if self.store.sources_is_file(f.name):
if dgst(self.store.sources_get_path(f.name)) == f.md5:
if f in kept:
kept.remove(f)
elif f in added:
@ -1474,10 +1468,10 @@ rev: %s
# if the storefile doesn't exist we're resuming an aborted update:
# the file was already deleted but we cannot know this
# OR we're processing a _service: file (simply keep the file)
if os.path.isfile(os.path.join(self.storedir, f.name)) and self.status(f.name) not in ('M', 'C'):
if self.store.sources_is_file(f.name) and self.status(f.name) not in ('M', 'C'):
# if self.status(f.name) != 'M':
self.delete_localfile(f.name)
self.delete_storefile(f.name)
self.store.sources_delete_file(f.name)
print(statfrmt('D', os.path.join(pathn, f.name)))
if f.name in self.to_be_deleted:
self.to_be_deleted.remove(f.name)
@ -1501,7 +1495,7 @@ rev: %s
print(f'Restored \'{os.path.join(pathn, f.name)}\'')
elif state == 'C':
get_source_file(self.apiurl, self.prjname, self.name, f.name,
targetfilename=os.path.join(self.storedir, f.name), revision=rev,
targetfilename=self.store.sources_get_path(f.name), revision=rev,
progress_obj=self.progress_obj, mtime=f.mtime, meta=self.meta)
print(f'skipping \'{f.name}\' (this is due to conflicts)')
elif state == 'D' and self.findfilebyname(f.name).md5 != f.md5:
@ -1560,12 +1554,12 @@ rev: %s
raise oscerr.OscIOError(None, f'file \'{filename}\' is not under version control')
elif filename in self.skipped:
raise oscerr.OscIOError(None, f'file \'{filename}\' is marked as skipped and cannot be reverted')
if filename in self.filenamelist and not os.path.exists(os.path.join(self.storedir, filename)):
if filename in self.filenamelist and not self.store.sources_is_file(filename):
msg = f"file '{filename}' is listed in filenamelist but no storefile exists"
raise oscerr.PackageInternalError(self.prjname, self.name, msg)
state = self.status(filename)
if not (state == 'A' or state == '!' and filename in self.to_be_added):
shutil.copyfile(os.path.join(self.storedir, filename), os.path.join(self.absdir, filename))
shutil.copyfile(self.store.sources_get_path(filename), os.path.join(self.absdir, filename))
if state == 'D':
self.to_be_deleted.remove(filename)
self.write_deletelist()

View File

@ -11,16 +11,17 @@ from .. import oscerr
from .._private import api
from ..util.xml import ET
from typing import List
# __store_version__ is to be incremented when the format of the working copy
# "store" changes in an incompatible way. Please add any needed migration
# functionality to check_store_version().
__store_version__ = '1.0'
__store_version__ = '2.0'
class Store:
STORE_DIR = ".osc"
STORE_VERSION = "1.0"
STORE_VERSION = "2.0"
@classmethod
def is_project_dir(cls, path):
@ -57,7 +58,11 @@ class Store:
def __iter__(self):
path = os.path.join(self.abspath, self.STORE_DIR)
yield from os.listdir(path)
for fn in os.listdir(path):
full_path = os.path.join(path, fn)
if os.path.isdir(full_path):
continue
yield fn
def assert_is_project(self):
if not self.is_project:
@ -327,6 +332,43 @@ class Store:
root = self.read_xml_node("_meta", "project").getroot()
return root
def sources_get_path(self, file_name: str) -> str:
if "/" in file_name:
raise ValueError(f"Plain file name expected: {file_name}")
result = os.path.join(self.abspath, self.STORE_DIR, "sources", file_name)
os.makedirs(os.path.dirname(result), exist_ok=True)
return result
def sources_list_files(self) -> List[str]:
result = []
invalid = []
topdir = os.path.join(self.abspath, self.STORE_DIR, "sources")
if not os.path.isdir(topdir):
return []
for fn in os.listdir(topdir):
if self.sources_is_file(fn):
result.append(fn)
else:
invalid.append(fn)
if invalid:
msg = ".osc/sources contains entries other than regular files"
raise oscerr.WorkingCopyInconsistent(self.project, self.package, invalid, msg)
return result
def sources_is_file(self, file_name: str) -> bool:
return os.path.isfile(self.sources_get_path(file_name))
def sources_delete_file(self, file_name: str):
try:
os.unlink(self.sources_get_path(file_name))
except:
pass
store = '.osc'
@ -351,12 +393,46 @@ def check_store_version(dir):
raise oscerr.NoWorkingCopy(msg)
if v != __store_version__:
migrated = False
if v in ['0.2', '0.3', '0.4', '0.5', '0.6', '0.7', '0.8', '0.9', '0.95', '0.96', '0.97', '0.98', '0.99']:
# version is fine, no migration needed
f = open(versionfile, 'w')
f.write(__store_version__ + '\n')
f.close()
# no migration needed, only change metadata version to 1.0
s = Store(dir, check=False)
v = "1.0"
s.write_string("_osclib_version", v)
migrated = True
if v == "1.0":
store_dir = os.path.join(dir, store)
sources_dir = os.path.join(dir, store, "sources")
os.makedirs(sources_dir, exist_ok=True)
s = Store(dir, check=False)
if s.is_package and not s.scmurl:
from .package import Package
from .project import Project
scm_files = [i.name for i in s.files]
for fn in os.listdir(store_dir):
old_path = os.path.join(store_dir, fn)
new_path = os.path.join(sources_dir, fn)
if not os.path.isfile(old_path):
continue
if fn in Package.REQ_STOREFILES or fn in Package.OPT_STOREFILES:
continue
if fn.startswith("_") and fn not in scm_files:
continue
if os.path.isfile(old_path):
os.rename(old_path, new_path)
v = "2.0"
s.write_string("_osclib_version", v)
migrated = True
if migrated:
return
msg = f'The osc metadata of your working copy "{dir}"'
msg += f'\nhas __store_version__ = {v}, but it should be {__store_version__}'
msg += '\nPlease do a fresh checkout or update your client. Sorry about the inconvenience.'

View File

@ -73,7 +73,7 @@ class RequestWrongOrder(Exception):
self.exp_method = exp_method
def __str__(self):
return f'{self.url}, {self.exp_url}, {self.method}, {self.exp_method}'
return '%s, %s, %s, %s' % (self.url, self.exp_url, self.method, self.exp_method)
class RequestDataMismatch(Exception):
@ -85,7 +85,7 @@ class RequestDataMismatch(Exception):
self.exp = exp
def __str__(self):
return f'{self.url}, {self.got}, {self.exp}'
return '%s, %s, %s' % (self.url, self.got, self.exp)
EXPECTED_REQUESTS = []
@ -261,8 +261,8 @@ class OscTestCase(unittest.TestCase):
for i in root.findall('entry'):
if i.get('name') in skipfiles:
continue
self.assertTrue(os.path.exists(os.path.join('.osc', i.get('name'))))
self.assertEqual(osc.core.dgst(os.path.join('.osc', i.get('name'))), i.get('md5'))
self.assertTrue(os.path.exists(os.path.join('.osc', 'sources', i.get('name'))))
self.assertEqual(osc.core.dgst(os.path.join('.osc', 'sources', i.get('name'))), i.get('md5'))
def assertFilesEqual(self, first, second):
self.assertTrue(os.path.exists(first))

View File

@ -26,7 +26,7 @@ class TestAddFiles(OscTestCase):
p.addfile('toadd1')
exp = 'A toadd1\n'
self.assertEqual(sys.stdout.getvalue(), exp)
self.assertFalse(os.path.exists(os.path.join('.osc', 'toadd1')))
self.assertFalse(os.path.exists(os.path.join('.osc', 'sources', 'toadd1')))
self._check_status(p, 'toadd1', 'A')
self._check_addlist('toadd1\n')
@ -38,8 +38,8 @@ class TestAddFiles(OscTestCase):
p.addfile('toadd2')
exp = 'A toadd1\nA toadd2\n'
self.assertEqual(sys.stdout.getvalue(), exp)
self.assertFalse(os.path.exists(os.path.join('.osc', 'toadd1')))
self.assertFalse(os.path.exists(os.path.join('.osc', 'toadd2')))
self.assertFalse(os.path.exists(os.path.join('.osc', 'sources', 'toadd1')))
self.assertFalse(os.path.exists(os.path.join('.osc', 'sources', 'toadd2')))
self._check_status(p, 'toadd1', 'A')
self._check_status(p, 'toadd2', 'A')
self._check_addlist('toadd1\ntoadd2\n')
@ -60,7 +60,7 @@ class TestAddFiles(OscTestCase):
self.assertRaises(osc.oscerr.PackageFileConflict, p.addfile, 'toadd1')
exp = 'A toadd1\n'
self.assertEqual(sys.stdout.getvalue(), exp)
self.assertFalse(os.path.exists(os.path.join('.osc', 'toadd1')))
self.assertFalse(os.path.exists(os.path.join('.osc', 'sources', 'toadd1')))
self._check_status(p, 'toadd1', 'A')
self._check_addlist('toadd1\n')
@ -73,7 +73,7 @@ class TestAddFiles(OscTestCase):
p.addfile('foo')
exp = 'A foo\n'
self.assertEqual(sys.stdout.getvalue(), exp)
self.assertFileContentNotEqual(os.path.join('.osc', 'foo'), 'replaced file\n')
self.assertFileContentNotEqual(os.path.join('.osc', 'sources', 'foo'), 'replaced file\n')
self.assertFalse(os.path.exists(os.path.join('.osc', '_to_be_deleted')))
self._check_status(p, 'foo', 'R')
self._check_addlist('foo\n')

View File

@ -43,7 +43,7 @@ class TestCommit(OscTestCase):
self.assertEqual(sys.stdout.getvalue(), exp)
self._check_digests('testSimple_cfilesremote')
self.assertTrue(os.path.exists('nochange'))
self.assertFilesEqual('nochange', os.path.join('.osc', 'nochange'))
self.assertFilesEqual('nochange', os.path.join('.osc', 'sources', 'nochange'))
self._check_status(p, 'nochange', ' ')
self._check_status(p, 'foo', ' ')
self._check_status(p, 'merge', ' ')
@ -67,7 +67,7 @@ class TestCommit(OscTestCase):
self.assertEqual(sys.stdout.getvalue(), exp)
self._check_digests('testAddfile_cfilesremote')
self.assertTrue(os.path.exists('add'))
self.assertFilesEqual('add', os.path.join('.osc', 'add'))
self.assertFilesEqual('add', os.path.join('.osc', 'sources', 'add'))
self.assertFalse(os.path.exists(os.path.join('.osc', '_to_be_added')))
self._check_status(p, 'add', ' ')
self._check_status(p, 'foo', ' ')
@ -89,7 +89,7 @@ class TestCommit(OscTestCase):
self.assertEqual(sys.stdout.getvalue(), exp)
self._check_digests('testDeletefile_cfilesremote')
self.assertFalse(os.path.exists('nochange'))
self.assertFalse(os.path.exists(os.path.join('.osc', 'nochange')))
self.assertFalse(os.path.exists(os.path.join('.osc', 'sources', 'nochange')))
self.assertFalse(os.path.exists(os.path.join('.osc', '_to_be_deleted')))
self._check_status(p, 'foo', ' ')
self._check_status(p, 'merge', ' ')
@ -144,8 +144,8 @@ class TestCommit(OscTestCase):
self._check_digests('testMultiple_cfilesremote')
self.assertFalse(os.path.exists(os.path.join('.osc', '_to_be_added')))
self.assertFalse(os.path.exists(os.path.join('.osc', '_to_be_deleted')))
self.assertFalse(os.path.exists(os.path.join('.osc', 'foo')))
self.assertFalse(os.path.exists(os.path.join('.osc', 'merge')))
self.assertFalse(os.path.exists(os.path.join('.osc', 'sources', 'foo')))
self.assertFalse(os.path.exists(os.path.join('.osc', 'sources', 'merge')))
self.assertRaises(osc.oscerr.OscIOError, p.status, 'foo')
self.assertRaises(osc.oscerr.OscIOError, p.status, 'merge')
self._check_status(p, 'add', ' ')
@ -221,7 +221,7 @@ class TestCommit(OscTestCase):
self.assertFalse(os.path.exists(os.path.join('.osc', '_to_be_added')))
self.assertFalse(os.path.exists(os.path.join('.osc', '_to_be_deleted')))
self.assertFalse(os.path.exists('foo'))
self.assertFalse(os.path.exists(os.path.join('.osc', 'foo')))
self.assertFalse(os.path.exists(os.path.join('.osc', 'sources', 'foo')))
self._check_status(p, 'add', ' ')
self._check_status(p, 'nochange', ' ')
self._check_status(p, 'merge', '!')
@ -244,7 +244,7 @@ class TestCommit(OscTestCase):
self.assertEqual(sys.stdout.getvalue(), exp)
self._check_digests('testAddfile_cfilesremote')
self.assertTrue(os.path.exists('add'))
self.assertFilesEqual('add', os.path.join('.osc', 'add'))
self.assertFilesEqual('add', os.path.join('.osc', 'sources', 'add'))
self.assertFalse(os.path.exists(os.path.join('.osc', '_to_be_added')))
self._check_status(p, 'add', ' ')
self._check_status(p, 'foo', ' ')
@ -344,7 +344,7 @@ class TestCommit(OscTestCase):
self.assertEqual(sys.stdout.getvalue(), exp)
self._check_digests('testSimple_cfilesremote')
self.assertTrue(os.path.exists('nochange'))
self.assertFilesEqual('nochange', os.path.join('.osc', 'nochange'))
self.assertFilesEqual('nochange', os.path.join('.osc', 'sources', 'nochange'))
self._check_status(p, 'nochange', ' ')
self._check_status(p, 'foo', ' ')
self._check_status(p, 'merge', ' ')

View File

@ -25,7 +25,7 @@ class TestDeleteFiles(OscTestCase):
ret = p.delete_file('foo')
self.__check_ret(ret, True, ' ')
self.assertFalse(os.path.exists('foo'))
self.assertTrue(os.path.exists(os.path.join('.osc', 'foo')))
self.assertTrue(os.path.exists(os.path.join('.osc', 'sources', 'foo')))
self._check_deletelist('foo\n')
self._check_status(p, 'foo', 'D')
@ -36,7 +36,7 @@ class TestDeleteFiles(OscTestCase):
ret = p.delete_file('nochange')
self.__check_ret(ret, False, 'M')
self.assertTrue(os.path.exists('nochange'))
self.assertTrue(os.path.exists(os.path.join('.osc', 'nochange')))
self.assertTrue(os.path.exists(os.path.join('.osc', 'sources', 'nochange')))
self.assertFalse(os.path.exists(os.path.join('.osc', '_to_be_deleted')))
self._check_status(p, 'nochange', 'M')
@ -78,7 +78,7 @@ class TestDeleteFiles(OscTestCase):
ret = p.delete_file('foo')
self.__check_ret(ret, False, 'C')
self.assertTrue(os.path.exists('foo'))
self.assertTrue(os.path.exists(os.path.join('.osc', 'foo')))
self.assertTrue(os.path.exists(os.path.join('.osc', 'sources', 'foo')))
self.assertFalse(os.path.exists(os.path.join('.osc', '_to_be_deleted')))
self._check_conflictlist('foo\n')
self._check_status(p, 'foo', 'C')
@ -90,7 +90,7 @@ class TestDeleteFiles(OscTestCase):
ret = p.delete_file('nochange', force=True)
self.__check_ret(ret, True, 'M')
self.assertFalse(os.path.exists('nochange'))
self.assertTrue(os.path.exists(os.path.join('.osc', 'nochange')))
self.assertTrue(os.path.exists(os.path.join('.osc', 'sources', 'nochange')))
self._check_deletelist('nochange\n')
self._check_status(p, 'nochange', 'D')
@ -122,7 +122,7 @@ class TestDeleteFiles(OscTestCase):
ret = p.delete_file('merge', force=True)
self.__check_ret(ret, True, 'R')
self.assertFalse(os.path.exists('merge'))
self.assertTrue(os.path.exists(os.path.join('.osc', 'merge')))
self.assertTrue(os.path.exists(os.path.join('.osc', 'sources', 'merge')))
self._check_deletelist('merge\n')
self._check_addlist('toadd1\n')
self._check_status(p, 'merge', 'D')
@ -136,7 +136,7 @@ class TestDeleteFiles(OscTestCase):
self.assertFalse(os.path.exists('foo'))
self.assertTrue(os.path.exists('foo.r2'))
self.assertTrue(os.path.exists('foo.mine'))
self.assertTrue(os.path.exists(os.path.join('.osc', 'foo')))
self.assertTrue(os.path.exists(os.path.join('.osc', 'sources', 'foo')))
self._check_deletelist('foo\n')
self.assertFalse(os.path.exists(os.path.join('.osc', '_in_conflict')))
self._check_status(p, 'foo', 'D')
@ -151,8 +151,8 @@ class TestDeleteFiles(OscTestCase):
self.__check_ret(ret, True, ' ')
self.assertFalse(os.path.exists('foo'))
self.assertFalse(os.path.exists('merge'))
self.assertTrue(os.path.exists(os.path.join('.osc', 'foo')))
self.assertTrue(os.path.exists(os.path.join('.osc', 'merge')))
self.assertTrue(os.path.exists(os.path.join('.osc', 'sources', 'foo')))
self.assertTrue(os.path.exists(os.path.join('.osc', 'sources', 'merge')))
self._check_deletelist('foo\nmerge\n')
def testDeleteAlreadyDeleted(self):
@ -162,7 +162,7 @@ class TestDeleteFiles(OscTestCase):
ret = p.delete_file('foo')
self.__check_ret(ret, True, 'D')
self.assertFalse(os.path.exists('foo'))
self.assertTrue(os.path.exists(os.path.join('.osc', 'foo')))
self.assertTrue(os.path.exists(os.path.join('.osc', 'sources', 'foo')))
self._check_deletelist('foo\n')
self._check_status(p, 'foo', 'D')
@ -176,7 +176,7 @@ class TestDeleteFiles(OscTestCase):
ret = p.delete_file('toadd1')
self.__check_ret(ret, True, '!')
self.assertFalse(os.path.exists('toadd1'))
self.assertFalse(os.path.exists(os.path.join('.osc', 'toadd1')))
self.assertFalse(os.path.exists(os.path.join('.osc', 'sources', 'toadd1')))
self._check_deletelist('foo\n')
self.assertFalse(os.path.exists(os.path.join('.osc', '_to_be_added')))

View File

@ -25,7 +25,7 @@ class TestRepairWC(OscTestCase):
try:
meth(*args, **kwargs)
except exception:
self.fail(f'{exception.__name__} raised')
self.fail('%s raised' % exception.__name__)
def test_working_empty(self):
"""consistent, empty working copy"""
@ -54,7 +54,7 @@ class TestRepairWC(OscTestCase):
self.assertRaises(osc.oscerr.WorkingCopyInconsistent, osc.core.Package, '.')
p = osc.core.Package('.', wc_check=False)
p.wc_repair()
self.assertTrue(os.path.exists(os.path.join('.osc', 'foo')))
self.assertTrue(os.path.exists(os.path.join('.osc', 'sources', 'foo')))
self._check_deletelist('foo\n')
self._check_status(p, 'foo', 'D')
self._check_status(p, 'nochange', 'M')
@ -69,7 +69,7 @@ class TestRepairWC(OscTestCase):
self.assertRaises(osc.oscerr.WorkingCopyInconsistent, osc.core.Package, '.')
p = osc.core.Package('.', wc_check=False)
p.wc_repair()
self.assertFalse(os.path.exists(os.path.join('.osc', 'somefile')))
self.assertFalse(os.path.exists(os.path.join('.osc', 'sources', 'somefile')))
self._check_deletelist('foo\n')
self._check_status(p, 'foo', 'D')
self._check_status(p, 'nochange', 'M')
@ -79,12 +79,12 @@ class TestRepairWC(OscTestCase):
self.__assertNotRaises(osc.oscerr.WorkingCopyInconsistent, osc.core.Package, '.')
def test_simple3(self):
"""toadd1 has state 'A' and a file .osc/toadd1 exists"""
"""toadd1 has state 'A' and a file .osc/sources/toadd1 exists"""
self._change_to_pkg('simple3')
self.assertRaises(osc.oscerr.WorkingCopyInconsistent, osc.core.Package, '.')
p = osc.core.Package('.', wc_check=False)
p.wc_repair()
self.assertFalse(os.path.exists(os.path.join('.osc', 'toadd1')))
self.assertFalse(os.path.exists(os.path.join('.osc', 'sources', 'toadd1')))
self._check_deletelist('foo\n')
self._check_status(p, 'foo', 'D')
self._check_status(p, 'nochange', 'M')
@ -133,7 +133,7 @@ class TestRepairWC(OscTestCase):
self.assertRaises(osc.oscerr.WorkingCopyInconsistent, osc.core.Package, '.')
p = osc.core.Package('.', wc_check=False)
p.wc_repair()
self.assertTrue(os.path.exists(os.path.join('.osc', 'foo')))
self.assertTrue(os.path.exists(os.path.join('.osc', 'sources', 'foo')))
self._check_deletelist('foo\n')
self._check_status(p, 'foo', 'D')
self._check_status(p, 'nochange', 'M')
@ -155,7 +155,7 @@ class TestRepairWC(OscTestCase):
self.assertRaises(osc.oscerr.WorkingCopyInconsistent, osc.core.Package, '.')
p = osc.core.Package('.', wc_check=False)
p.wc_repair()
self.assertFalse(os.path.exists(os.path.join('.osc', 'skipped')))
self.assertFalse(os.path.exists(os.path.join('.osc', 'sources', 'skipped')))
self._check_deletelist('foo\n')
self._check_status(p, 'foo', 'D')
self._check_status(p, 'nochange', 'M')
@ -178,8 +178,8 @@ class TestRepairWC(OscTestCase):
self.assertRaises(osc.oscerr.WorkingCopyInconsistent, osc.core.Package, '.')
p = osc.core.Package('.', wc_check=False)
p.wc_repair()
self.assertTrue(os.path.exists(os.path.join('.osc', 'foo')))
self.assertFalse(os.path.exists(os.path.join('.osc', 'unknown_file')))
self.assertTrue(os.path.exists(os.path.join('.osc', 'sources', 'foo')))
self.assertFalse(os.path.exists(os.path.join('.osc', 'sources', 'unknown_file')))
self._check_deletelist('foo\n')
self._check_status(p, 'foo', 'D')
self._check_status(p, 'nochange', 'C')

View File

@ -92,7 +92,7 @@ class TestRevertFiles(OscTestCase):
self.assertRaises(osc.oscerr.OscIOError, p.revert, 'skipped')
def __check_file(self, fname):
storefile = os.path.join('.osc', fname)
storefile = os.path.join('.osc', 'sources', fname)
self.assertTrue(os.path.exists(fname))
self.assertTrue(os.path.exists(storefile))
self.assertFilesEqual(fname, storefile)

View File

@ -57,7 +57,7 @@ class TestUpdate(OscTestCase):
self.assertEqual(sys.stdout.getvalue(), exp)
self._check_digests('testUpdateDeletedFile_files')
self.assertFalse(os.path.exists('foo'))
self.assertFalse(os.path.exists(os.path.join('.osc', 'foo')))
self.assertFalse(os.path.exists(os.path.join('.osc', 'sources', 'foo')))
@GET('http://localhost/source/osctest/simple?rev=2', file='testUpdateUpstreamModifiedFile_files')
@GET('http://localhost/source/osctest/simple/foo?rev=2', file='testUpdateUpstreamModifiedFile_foo')
@ -117,7 +117,7 @@ class TestUpdate(OscTestCase):
self.assertEqual(sys.stdout.getvalue(), exp)
self._check_deletelist('foo\n')
self._check_conflictlist('merge\n')
self.assertFilesEqual('foo', os.path.join('.osc', 'foo'))
self.assertFilesEqual('foo', os.path.join('.osc', 'sources', 'foo'))
self._check_digests('testUpdateLocalDeletions_files')
@GET('http://localhost/source/osctest/restore?rev=latest', file='testUpdateRestore_files')
@ -142,7 +142,7 @@ class TestUpdate(OscTestCase):
osc.core.Package('.').update(size_limit=50)
exp = 'D bigfile\nAt revision 2.\n'
self.assertEqual(sys.stdout.getvalue(), exp)
self.assertFalse(os.path.exists(os.path.join('.osc', 'bigfile')))
self.assertFalse(os.path.exists(os.path.join('.osc', 'sources', 'bigfile')))
self.assertFalse(os.path.exists('bigfile'))
self._check_digests('testUpdateLimitSizeNoChange_files', 'bigfile')
@ -158,8 +158,8 @@ class TestUpdate(OscTestCase):
p.update()
exp = 'D bigfile\nD merge\nAt revision 2.\n'
self.assertEqual(sys.stdout.getvalue(), exp)
self.assertFalse(os.path.exists(os.path.join('.osc', 'bigfile')))
self.assertFalse(os.path.exists(os.path.join('.osc', 'merge')))
self.assertFalse(os.path.exists(os.path.join('.osc', 'sources', 'bigfile')))
self.assertFalse(os.path.exists(os.path.join('.osc', 'sources', 'merge')))
self.assertFalse(os.path.exists('bigfile'))
self._check_digests('testUpdateLocalLimitSizeNoChange_files', 'bigfile', 'merge')
self._check_status(p, 'bigfile', 'S')
@ -180,11 +180,11 @@ class TestUpdate(OscTestCase):
osc.core.Package('.').update(size_limit=10)
exp = 'A exists\nD bigfile\nD foo\nD merge\nD nochange\nAt revision 2.\n'
self.assertEqual(sys.stdout.getvalue(), exp)
self.assertFalse(os.path.exists(os.path.join('.osc', 'bigfile')))
self.assertFalse(os.path.exists(os.path.join('.osc', 'sources', 'bigfile')))
self.assertFalse(os.path.exists('bigfile'))
self.assertFalse(os.path.exists(os.path.join('.osc', 'foo')))
self.assertFalse(os.path.exists(os.path.join('.osc', 'sources', 'foo')))
self.assertFalse(os.path.exists('foo'))
self.assertFalse(os.path.exists(os.path.join('.osc', 'merge')))
self.assertFalse(os.path.exists(os.path.join('.osc', 'sources', 'merge')))
self.assertFalse(os.path.exists('merge'))
# exists because local version is modified
self.assertTrue(os.path.exists('nochange'))
@ -202,9 +202,9 @@ class TestUpdate(OscTestCase):
osc.core.Package('.').update(service_files=True)
exp = 'A bigfile\nD _service:exists\nA _service:bar\nA _service:foo\nAt revision 2.\n'
self.assertEqual(sys.stdout.getvalue(), exp)
self.assertFalse(os.path.exists(os.path.join('.osc', '_service:bar')))
self.assertFalse(os.path.exists(os.path.join('.osc', 'sources', '_service:bar')))
self.assertFileContentEqual('_service:bar', 'another service\n')
self.assertFalse(os.path.exists(os.path.join('.osc', '_service:foo')))
self.assertFalse(os.path.exists(os.path.join('.osc', 'sources', '_service:foo')))
self.assertFileContentEqual('_service:foo', 'small\n')
self.assertTrue(os.path.exists('_service:exists'))
self._check_digests('testUpdateServiceFilesAddDelete_files', '_service:foo', '_service:bar')
@ -218,9 +218,9 @@ class TestUpdate(OscTestCase):
osc.core.Package('.').update()
exp = 'A bigfile\nD _service:exists\nAt revision 2.\n'
self.assertEqual(sys.stdout.getvalue(), exp)
self.assertFalse(os.path.exists(os.path.join('.osc', '_service:bar')))
self.assertFalse(os.path.exists(os.path.join('.osc', 'sources', '_service:bar')))
self.assertFalse(os.path.exists('_service:bar'))
self.assertFalse(os.path.exists(os.path.join('.osc', '_service:foo')))
self.assertFalse(os.path.exists(os.path.join('.osc', 'sources', '_service:foo')))
self.assertFalse(os.path.exists('_service:foo'))
self.assertTrue(os.path.exists('_service:exists'))
self._check_digests('testUpdateServiceFilesAddDelete_files', '_service:foo', '_service:bar')
@ -284,7 +284,7 @@ class TestUpdate(OscTestCase):
self.assertEqual(sys.stdout.getvalue(), exp)
self.assertFalse(os.path.exists(os.path.join('.osc', '_in_update')))
self.assertFalse(os.path.exists('added'))
self.assertFalse(os.path.exists(os.path.join('.osc', 'added')))
self.assertFalse(os.path.exists(os.path.join('.osc', 'sources', 'added')))
self._check_digests('testUpdateResumeDeletedFile_files')