1
0
mirror of https://github.com/openSUSE/osc.git synced 2024-11-11 07:06:16 +01:00

Merge pull request #1181 from dmach/store

Introduce Store class, migrate some functionality to it
This commit is contained in:
Daniel Mach 2022-10-25 13:36:18 +02:00 committed by GitHub
commit 77d6f0b65b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 604 additions and 79 deletions

View File

@ -72,3 +72,19 @@ def find_node(root, root_name, node_name=None):
if node_name:
return root.find(node_name)
return root
def write_xml_node_to_file(node, path, indent=True):
"""
Write a XML node to a file.
:param node: Node to write.
:type node: xml.etree.ElementTree.Element
:param path: Path to a file that will be written to.
:type path: str
:param indent: Whether to indent (pretty-print) the written XML.
:type indent: bool
"""
if indent:
osc_core.xmlindent(node)
osc_core.ET.ElementTree(node).write(path)

View File

@ -1,6 +1,7 @@
import functools
from .. import core as osc_core
from .. import store as osc_store
from . import api
@ -78,10 +79,8 @@ class ApiPackage(PackageBase):
class LocalPackage(PackageBase):
def __init__(self, path):
self.dir = path
apiurl = osc_core.store_read_apiurl(self.dir)
project = osc_core.store_read_project(self.dir)
package = osc_core.store_read_package(self.dir)
super().__init__(apiurl, project, package)
self.store = osc_store.Store(self.dir)
super().__init__(self.store.apiurl, self.store.project, self.store.package)
def _get_directory_node(self):
return osc_core.read_filemeta(self.dir).getroot()
return self.store.read_xml_node("_files", "directory").getroot()

View File

@ -27,6 +27,7 @@ from . import build as osc_build
from . import cmdln
from . import conf
from . import oscerr
from . import store as osc_store
from .core import *
from .grabber import OscFileGrabber
from .meter import create_text_meter
@ -219,7 +220,7 @@ class Osc(cmdln.Cmdln):
sys.exit(1)
if (is_package_dir(localdir) or is_project_dir(localdir)) and not self.options.apiurl:
return store_read_apiurl(Path.cwd())
return osc_store.Store(Path.cwd()).apiurl
else:
return conf.config['apiurl']
@ -994,14 +995,14 @@ class Osc(cmdln.Cmdln):
attributepath = []
if cmd in ['prj', 'prjconf']:
if len(args) < 1:
apiurl = store_read_apiurl(Path.cwd())
apiurl = osc_store.Store(Path.cwd()).apiurl
project = store_read_project(Path.cwd())
else:
project = self._process_project_name(args[0])
elif cmd == 'pkg':
if len(args) < 2:
apiurl = store_read_apiurl(Path.cwd())
apiurl = osc_store.Store(Path.cwd()).apiurl
project = store_read_project(Path.cwd())
if len(args) < 1:
package = store_read_package(Path.cwd())
@ -4903,7 +4904,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
for pac in no_pacs:
if os.path.exists(pac):
# fail with an appropriate error message
store_read_apiurl(pac, defaulturl=False)
Package(pac)
path = os.path.normpath(os.path.join(pac, os.pardir))
if is_project_dir(path):
pac_name = os.path.basename(os.path.normpath(os.path.abspath(pac)))
@ -4914,7 +4915,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
files.setdefault(path, {})[pac_name] = []
else:
# fail with an appropriate error message
store_read_apiurl(pac, defaulturl=False)
Package(pac)
for prj_path, packages in prj_paths.items():
prj = Project(prj_path)
if not msg and not opts.no_message:
@ -4939,7 +4940,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
store_unlink_file(p.absdir, '_commit_msg')
elif no_pacs:
# fail with an appropriate error message
store_read_apiurl(no_pacs[0], defaulturl=False)
Package.from_paths(no_pacs)
else:
for p in pacs:
if not p.todo:
@ -6990,10 +6991,10 @@ Please submit there instead, or use --nodevelproject to force direct submission.
if is_package_dir(Path.cwd()):
project = store_read_project(Path.cwd())
package = store_read_package(Path.cwd())
apiurl = store_read_apiurl(Path.cwd())
apiurl = osc_store.Store(Path.cwd()).apiurl
elif is_project_dir(Path.cwd()):
project = store_read_project(Path.cwd())
apiurl = store_read_apiurl(Path.cwd())
apiurl = osc_store.Store(Path.cwd()).apiurl
else:
raise oscerr.WrongArgs('Too few arguments.')
else:
@ -7058,7 +7059,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
if is_package_dir(Path.cwd()):
project = store_read_project(Path.cwd())
package = store_read_package(Path.cwd())
apiurl = store_read_apiurl(Path.cwd())
apiurl = osc_store.Store(Path.cwd()).apiurl
repo = args[0]
arch = args[1]
sysrq = args[2]
@ -7113,10 +7114,10 @@ Please submit there instead, or use --nodevelproject to force direct submission.
if is_package_dir(Path.cwd()):
project = store_read_project(Path.cwd())
package = store_read_package(Path.cwd())
apiurl = store_read_apiurl(Path.cwd())
apiurl = osc_store.Store(Path.cwd()).apiurl
elif is_project_dir(Path.cwd()):
project = store_read_project(Path.cwd())
apiurl = store_read_apiurl(Path.cwd())
apiurl = osc_store.Store(Path.cwd()).apiurl
else:
raise oscerr.WrongArgs('Too few arguments.')
else:
@ -7931,7 +7932,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
createPackageDir(os.path.join(project.dir, pac), project)
else:
if not os.path.exists(os.path.join(project_dir, pac)):
apiurl = store_read_apiurl(project_dir)
apiurl = osc_store.Store(project_dir).apiurl
user = conf.get_apiurl_usr(apiurl)
data = meta_exists(metatype='pkg',
path_args=(quote_plus(project), quote_plus(pac)),
@ -8956,7 +8957,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
return 1
if args and is_package_dir(args[0]):
apiurl = store_read_apiurl(args[0])
apiurl = osc_store.Store(args[0]).apiurl
else:
apiurl = self.get_api_url()

View File

@ -46,6 +46,7 @@ from . import conf
from . import meter
from . import oscerr
from .connection import http_request, http_GET, http_POST, http_PUT, http_DELETE
from .store import Store
from .util.helper import decode_list, decode_it, raw_input, _html_escape
@ -248,6 +249,7 @@ def os_path_samefile(path1, path2):
return os.path.realpath(path1) == os.path.realpath(path2)
@total_ordering
class File:
"""represent a file, including its metadata"""
@ -264,6 +266,18 @@ class File:
def __str__(self):
return self.name
def __eq__(self, other):
if isinstance(other, str):
return self.name == other
self_data = (self.name, self.md5, self.size, self.mtime, self.skipped)
other_data = (other.name, other.md5, other.size, other.mtime, other.skipped)
return self_data == other_data
def __lt__(self, other):
self_data = (self.name, self.md5, self.size, self.mtime, self.skipped)
other_data = (other.name, other.md5, other.size, other.mtime, other.skipped)
return self_data < other_data
@classmethod
def from_xml_node(cls, node):
assert node.tag == "entry"
@ -272,10 +286,22 @@ class File:
"md5": node.get("md5"),
"size": int(node.get("size")),
"mtime": int(node.get("mtime")),
"skipped": "skipped" in node,
"skipped": "skipped" in node.attrib,
}
return cls(**kwargs)
def to_xml_node(self, parent_node):
attributes = {
"name": self.name,
"md5": self.md5,
"size": str(int(self.size)),
"mtime": str(int(self.mtime)),
}
if self.skipped:
attributes["skipped"] = "true"
new_node = ET.SubElement(parent_node, "entry", attributes)
return new_node
class Serviceinfo:
"""Source service content
@ -682,11 +708,12 @@ class Project:
"""
self.dir = dir
self.absdir = os.path.abspath(dir)
self.store = Store(dir)
self.progress_obj = progress_obj
self.name = store_read_project(self.dir)
self.scm_url = store_read_scmurl(self.dir)
self.apiurl = store_read_apiurl(self.dir, defaulturl=not wc_check)
self.scm_url = self.store.scmurl
self.apiurl = self.store.apiurl
dirty_files = []
if wc_check:
@ -734,8 +761,9 @@ class Project:
return dirty_files
def wc_repair(self, apiurl=None):
global store
if not os.path.exists(os.path.join(self.dir, store, '_apiurl')) or apiurl:
store = Store(self.dir)
store.assert_is_project()
if not store.exists("_apiurl") or apiurl:
if apiurl is None:
msg = 'cannot repair wc: the \'_apiurl\' file is missing but ' \
'no \'apiurl\' was passed to wc_repair'
@ -743,8 +771,8 @@ class Project:
raise oscerr.WorkingCopyInconsistent(self.name, None, [], msg)
# sanity check
conf.parse_apisrv_url(None, apiurl)
store_write_apiurl(self.dir, apiurl)
self.apiurl = store_read_apiurl(self.dir, defaulturl=False)
store.apiurl = apiurl
self.apiurl = apiurl
def checkout_missing_pacs(self, sinfos, expand_link=False, unexpand_link=False):
for pac in self.pacs_missing:
@ -1120,9 +1148,10 @@ class Project:
else:
pac_path = os.path.join(self.dir, pac)
store = Store(pac_path)
project = store_read_project(pac_path)
package = store_read_package(pac_path)
apiurl = store_read_apiurl(pac_path, defaulturl=False)
apiurl = store.apiurl
if not meta_exists(metatype='pkg',
path_args=(quote_plus(project), quote_plus(package)),
template_args=None, create_new=False, apiurl=apiurl):
@ -1158,9 +1187,9 @@ class Project:
os.mkdir(os.path.join(dir, store))
store_write_project(dir, project)
store_write_apiurl(dir, apiurl)
Store(dir).apiurl = apiurl
if scm_url:
store_write_string(dir, '_scm', scm_url + '\n')
Store(dir).scmurl = scm_url
package_tracking = None
if package_tracking:
store_write_initial_packages(dir, project, [])
@ -1182,10 +1211,11 @@ class Package:
self.dir = workingdir
self.absdir = os.path.abspath(self.dir)
self.store = Store(self.dir)
self.storedir = os.path.join(self.absdir, store)
self.progress_obj = progress_obj
self.size_limit = size_limit
self.scm_url = store_read_scmurl(self.dir)
self.scm_url = self.store.scmurl
if size_limit and size_limit == 0:
self.size_limit = None
@ -1193,7 +1223,7 @@ class Package:
self.prjname = store_read_project(self.dir)
self.name = store_read_package(self.dir)
self.apiurl = store_read_apiurl(self.dir, defaulturl=not wc_check)
self.apiurl = self.store.apiurl
self.update_datastructs()
dirty_files = []
@ -1294,7 +1324,9 @@ class Package:
return dirty_files
def wc_repair(self, apiurl=None):
if not os.path.exists(os.path.join(self.storedir, '_apiurl')) or apiurl:
store = Store(self.dir)
store.assert_is_package()
if not store.exists("_apiurl") or apiurl:
if apiurl is None:
msg = 'cannot repair wc: the \'_apiurl\' file is missing but ' \
'no \'apiurl\' was passed to wc_repair'
@ -1302,8 +1334,9 @@ class Package:
raise oscerr.WorkingCopyInconsistent(self.prjname, self.name, [], msg)
# sanity check
conf.parse_apisrv_url(None, apiurl)
store_write_apiurl(self.dir, apiurl)
self.apiurl = store_read_apiurl(self.dir, defaulturl=False)
store.apiurl = apiurl
self.apiurl = apiurl
# all files which are present in the filelist have to exist in the storedir
for f in self.filelist:
# XXX: should we also check the md5?
@ -1312,17 +1345,20 @@ class Package:
get_source_file(self.apiurl, self.prjname, self.name, f.name,
targetfilename=os.path.join(self.storedir, f.name), revision=self.rev,
mtime=f.mtime)
for fname in os.listdir(self.storedir):
for fname in store:
if fname in Package.REQ_STOREFILES or fname in Package.OPT_STOREFILES or \
fname.startswith('_build'):
continue
elif fname not in self.filenamelist or fname in self.skipped:
# this file does not belong to the storedir so remove it
os.unlink(os.path.join(self.storedir, fname))
store.unlink(fname)
for fname in self.to_be_deleted[:]:
if fname not in self.filenamelist:
self.to_be_deleted.remove(fname)
self.write_deletelist()
for fname in self.in_conflict[:]:
if fname not in self.filenamelist:
self.in_conflict.remove(fname)
@ -2593,13 +2629,13 @@ rev: %s
os.mkdir(os.path.join(dir, store))
store_write_project(dir, project)
store_write_string(dir, '_package', package + '\n')
store_write_apiurl(dir, apiurl)
Store(dir).apiurl = apiurl
if meta:
store_write_string(dir, '_meta_mode', '')
if size_limit:
store_write_string(dir, '_size_limit', str(size_limit) + '\n')
if scm_url:
store_write_string(dir, '_scm', scm_url + '\n')
Store(dir).scmurl = scm_url
else:
store_write_string(dir, '_files', '<directory />' + '\n')
store_write_string(dir, '_osclib_version', __store_version__ + '\n')
@ -6708,49 +6744,23 @@ def store_read_package(dir):
def store_read_scmurl(dir):
global store
url_file = os.path.join(dir, store, '_scm')
if not os.path.exists(url_file):
return
try:
p = open(url_file).readlines()[0].strip()
except OSError:
msg = 'Error: \'%s\' is not an osc package working copy' % os.path.abspath(dir)
if os.path.exists(os.path.join(dir, '.svn')):
msg += '\nTry svn instead of osc.'
raise oscerr.NoWorkingCopy(msg)
return p
import warnings
warnings.warn(
"osc.core.store_read_scmurl() is deprecated. "
"You should be using high-level classes such as Store, Project or Package instead.",
DeprecationWarning
)
return Store(dir).scmurl
def store_read_apiurl(dir, defaulturl=True):
global store
fname = os.path.join(dir, store, '_apiurl')
try:
with open(fname) as f:
url = f.readlines()[0].strip()
# this is needed to get a proper apiurl
# (former osc versions may stored an apiurl with a trailing slash etc.)
apiurl = conf.urljoin(*conf.parse_apisrv_url(None, url))
except:
if not defaulturl:
if is_project_dir(dir):
project = store_read_project(dir)
package = None
elif is_package_dir(dir):
project = store_read_project(dir)
package = None
else:
msg = 'Error: \'%s\' is not an osc package working copy' % os.path.abspath(dir)
raise oscerr.NoWorkingCopy(msg)
msg = 'Your working copy \'%s\' is in an inconsistent state.\n' \
'Please run \'osc repairwc %s\' (Note this might _remove_\n' \
'files from the .osc/ dir). Please check the state\n' \
'of the working copy afterwards (via \'osc status %s\')' % (dir, dir, dir)
raise oscerr.WorkingCopyInconsistent(project, package, ['_apiurl'], msg)
apiurl = conf.config['apiurl']
return apiurl
import warnings
warnings.warn(
"osc.core.store_read_apiurl() is deprecated. "
"You should be using high-level classes such as Store, Project or Package instead.",
DeprecationWarning
)
return Store(dir).apiurl
def store_read_last_buildroot(dir):
@ -6789,7 +6799,13 @@ def store_write_project(dir, project):
def store_write_apiurl(dir, apiurl):
store_write_string(dir, '_apiurl', apiurl + '\n')
import warnings
warnings.warn(
"osc.core.store_write_apiurl() is deprecated. "
"You should be using high-level classes such as Store, Project or Package instead.",
DeprecationWarning
)
Store(dir).apiurl = apiurl
def store_write_last_buildroot(dir, repo, arch, vm_type):
@ -7458,8 +7474,9 @@ def addFiles(filenames, prj_obj=None, force=False):
prj_dir, pac_dir = getPrjPacPaths(filename)
if not is_package_dir(filename) and os.path.isdir(filename) and is_project_dir(prj_dir) \
and conf.config['do_package_tracking']:
store = Store(prj_dir)
prj_name = store_read_project(prj_dir)
prj_apiurl = store_read_apiurl(prj_dir, defaulturl=False)
prj_apiurl = store.apiurl
Package.init_package(prj_apiurl, prj_name, pac_dir, filename)
elif is_package_dir(filename) and conf.config['do_package_tracking']:
print('osc: warning: \'%s\' is already under version control' % filename)

292
osc/store.py Normal file
View File

@ -0,0 +1,292 @@
"""
Store class wraps access to files in the '.osc' directory.
It is meant to be used as an implementation detail of Project and Package classes
and shouldn't be used in any code outside osc.
"""
import os
from xml.etree import ElementTree as ET
from . import oscerr
from ._private import api
class Store:
STORE_DIR = ".osc"
STORE_VERSION = "1.0"
@classmethod
def is_project_dir(cls, path):
try:
store = cls(path)
except oscerr.NoWorkingCopy:
return False
return store.is_project
@classmethod
def is_package_dir(cls, path):
try:
store = cls(path)
except oscerr.NoWorkingCopy:
return False
return store.is_package
def __init__(self, path, check=True):
self.path = path
self.abspath = os.path.abspath(self.path)
self.is_project = self.exists("_project") and not self.exists("_package")
self.is_package = self.exists("_project") and self.exists("_package")
if check and not any([self.is_project, self.is_package]):
msg = f"Directory '{self.path}' is not a working copy"
raise oscerr.NoWorkingCopy(msg)
def __contains__(self, fn):
return self.exists(fn)
def __iter__(self):
path = os.path.join(self.abspath, self.STORE_DIR)
yield from os.listdir(path)
def assert_is_project(self):
if not self.is_project:
msg = f"Directory '{self.path}' is not a working copy of a project"
raise oscerr.NoWorkingCopy(msg)
def assert_is_package(self):
if not self.is_package:
msg = f"Directory '{self.path}' is not a working copy of a package"
raise oscerr.NoWorkingCopy(msg)
def get_path(self, fn, subdir=None):
# sanitize input to ensure that joining path works as expected
fn = fn.lstrip("/")
if subdir:
subdir = subdir.lstrip("/")
return os.path.join(self.abspath, self.STORE_DIR, subdir, fn)
return os.path.join(self.abspath, self.STORE_DIR, fn)
def exists(self, fn, subdir=None):
return os.path.exists(self.get_path(fn, subdir=subdir))
def unlink(self, fn, subdir=None):
try:
os.unlink(self.get_path(fn, subdir=subdir))
except FileNotFoundError:
pass
def read_file(self, fn, subdir=None):
if not self.exists(fn, subdir=subdir):
return None
with open(self.get_path(fn, subdir=subdir), encoding="utf-8") as f:
return f.read()
def write_file(self, fn, value, subdir=None):
if value is None:
self.unlink(fn, subdir=subdir)
return
try:
if subdir:
os.makedirs(self.get_path(subdir))
else:
os.makedirs(self.get_path(""))
except FileExistsError:
pass
old = self.get_path(fn, subdir=subdir)
new = self.get_path(f"{fn}.new", subdir=subdir)
try:
with open(new, "w", encoding="utf-8") as f:
f.write(value)
os.rename(new, old)
except:
if os.path.exists(new):
os.unlink(new)
raise
def read_list(self, fn, subdir=None):
if not self.exists(fn, subdir=subdir):
return None
with open(self.get_path(fn, subdir=subdir), encoding="utf-8") as f:
return [line.rstrip("\n") for line in f]
def write_list(self, fn, value, subdir=None):
if value is None:
self.unlink(fn, subdir=subdir)
return
if not isinstance(value, (list, tuple)):
msg = f"The argument `value` should be list, not {type(value).__name__}"
raise TypeError(msg)
value = "".join((f"{line}\n" for line in value))
self.write_file(fn, value, subdir=subdir)
def read_string(self, fn, subdir=None):
if not self.exists(fn, subdir=subdir):
return None
with open(self.get_path(fn, subdir=subdir), encoding="utf-8") as f:
return f.readline().strip()
def write_string(self, fn, value, subdir=None):
if value is None:
self.unlink(fn, subdir=subdir)
return
if isinstance(value, bytes):
value = value.decode("utf-8")
if not isinstance(value, str):
msg = f"The argument `value` should be str, not {type(value).__name__}"
raise TypeError(msg)
self.write_file(fn, value + "\n", subdir=subdir)
def read_int(self, fn):
if not self.exists(fn):
return None
result = self.read_string(fn)
if not result.isdigit():
return None
return int(result)
def write_int(self, fn, value, subdir=None):
if value is None:
self.unlink(fn, subdir=subdir)
return
if not isinstance(value, int):
msg = f"The argument `value` should be int, not {type(value).__name__}"
raise TypeError(msg)
value = str(value)
self.write_string(fn, value, subdir=subdir)
def read_xml_node(self, fn, node_name, subdir=None):
path = self.get_path(fn, subdir=subdir)
try:
tree = ET.parse(path)
except SyntaxError as e:
msg = f"Unable to parse '{path}': {e}"
raise oscerr.NoWorkingCopy(msg)
root = tree.getroot()
assert root.tag == node_name
# TODO: return root?
return tree
def write_xml_node(self, fn, node_name, node, subdir=None):
path = self.get_path(fn, subdir=subdir)
assert node.tag == node_name
api.write_xml_node_to_file(node, path)
@property
def apiurl(self):
return self.read_string("_apiurl")
@apiurl.setter
def apiurl(self, value):
self.write_string("_apiurl", value)
@property
def project(self):
return self.read_string("_project")
@project.setter
def project(self, value):
self.write_string("_project", value)
@property
def package(self):
return self.read_string("_package")
@package.setter
def package(self, value):
self.write_string("_package", value)
@property
def scmurl(self):
return self.read_string("_scm")
@scmurl.setter
def scmurl(self, value):
return self.write_string("_scm", value)
@property
def size_limit(self):
return self.read_int("_size_limit")
@size_limit.setter
def size_limit(self, value):
return self.write_int("_size_limit", value)
@property
def to_be_added(self):
self.assert_is_package()
return self.read_list("_to_be_added") or []
@to_be_added.setter
def to_be_added(self, value):
self.assert_is_package()
return self.write_list("_to_be_added", value)
@property
def to_be_deleted(self):
self.assert_is_package()
return self.read_list("_to_be_deleted") or []
@to_be_deleted.setter
def to_be_deleted(self, value):
self.assert_is_package()
return self.write_list("_to_be_deleted", value)
@property
def in_conflict(self):
self.assert_is_package()
return self.read_list("_in_conflict") or []
@in_conflict.setter
def in_conflict(self, value):
self.assert_is_package()
return self.write_list("_in_conflict", value)
@property
def osclib_version(self):
return self.read_string("_osclib_version")
@property
def files(self):
self.assert_is_package()
if self.exists("_scm"):
msg = "Package '{self.path}' is managed via SCM"
raise oscerr.NoWorkingCopy(msg)
if not self.exists("_files"):
msg = "Package '{self.path}' doesn't contain _files metadata"
raise oscerr.NoWorkingCopy(msg)
result = []
directory_node = self.read_xml_node("_files", "directory").getroot()
from . import core as osc_core
for entry_node in api.find_nodes(directory_node, "directory", "entry"):
result.append(osc_core.File.from_xml_node(entry_node))
return result
@files.setter
def files(self, value):
if not isinstance(value, (list, tuple)):
msg = f"The argument `value` should be list, not {type(value).__name__}"
raise TypeError(msg)
root = ET.Element("directory")
for file_obj in sorted(value):
file_obj.to_xml_node(root)
self.write_xml_node("_files", "directory", root)
@property
def last_buildroot(self):
self.assert_is_package()
items = self.read_list("_last_buildroot")
if items is not None and len(items) != 3:
msg = f"Package '{self.path}' contains _last_buildroot metadata that doesn't contain 3 lines: [repo, arch, vm_type]"
raise oscerr.NoWorkingCopy(msg)
return items
@last_buildroot.setter
def last_buildroot(self, value):
self.assert_is_package()
if len(value) != 3:
raise ValueError("A list with exactly 3 items is expected: [repo, arch, vm_type]")
self.write_list("_last_buildroot", value)

200
tests/test_store.py Normal file
View File

@ -0,0 +1,200 @@
import os
import sys
import tempfile
import unittest
import osc.core as osc_core
from osc.store import Store
class TestStore(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp(prefix='osc_test')
self.store = Store(self.tmpdir, check=False)
self.store.is_package = True
self.store.project = "project name"
self.store.package = "package name"
def tearDown(self):
try:
shutil.rmtree(self.tmpdir)
except:
pass
def fileEquals(self, fn, expected_value):
path = os.path.join(self.tmpdir, ".osc", fn)
with open(path) as f:
actual_value = f.read()
self.assertEqual(actual_value, expected_value, f"File: {fn}")
def test_read_write_file(self):
self.store.write_file("_file", "\n\nline1\nline2")
self.fileEquals("_file", "\n\nline1\nline2")
self.assertEqual(self.store.read_file("_file"), "\n\nline1\nline2")
# writing None removes the file
self.store.write_file("_file", None)
self.assertFalse(self.store.exists("_file"))
self.assertRaises(TypeError, self.store.write_string, "_file", 123)
self.assertRaises(TypeError, self.store.write_string, "_file", ["123"])
def test_read_write_int(self):
self.store.write_int("_int", 123)
self.fileEquals("_int", "123\n")
self.assertEqual(self.store.read_int("_int"), 123)
# writing None removes the file
self.store.write_int("_int", None)
self.assertFalse(self.store.exists("_int"))
self.assertRaises(TypeError, self.store.write_int, "_int", "123")
self.assertRaises(TypeError, self.store.write_int, "_int", b"123")
self.assertRaises(TypeError, self.store.write_int, "_int", ["123"])
def test_read_write_list(self):
self.store.write_list("_list", ["one", "two", "three"])
self.fileEquals("_list", "one\ntwo\nthree\n")
self.assertEqual(self.store.read_list("_list"), ["one", "two", "three"])
# writing None removes the file
self.store.write_list("_list", None)
self.assertFalse(self.store.exists("_list"))
self.assertRaises(TypeError, self.store.write_list, "_list", "123")
self.assertRaises(TypeError, self.store.write_list, "_list", b"123")
self.assertRaises(TypeError, self.store.write_list, "_list", 123)
def test_read_write_string(self):
self.store.write_string("_string", "string")
self.fileEquals("_string", "string\n")
self.assertEqual(self.store.read_string("_string"), "string")
self.store.write_string("_bytes", b"bytes")
self.fileEquals("_bytes", "bytes\n")
self.assertEqual(self.store.read_string("_bytes"), "bytes")
# writing None removes the file
self.store.write_string("_string", None)
self.assertFalse(self.store.exists("_string"))
self.assertRaises(TypeError, self.store.write_string, "_string", 123)
self.assertRaises(TypeError, self.store.write_string, "_string", ["123"])
def test_contains(self):
self.assertTrue("_project" in self.store)
self.assertTrue("_package" in self.store)
self.assertFalse("_foo" in self.store)
def test_iter(self):
self.assertEqual(len(list(self.store)), 2)
for fn in self.store:
self.assertIn(fn, ["_project", "_package"])
def test_apiurl(self):
self.store.apiurl = "https://example.com"
self.fileEquals("_apiurl", "https://example.com\n")
store2 = Store(self.tmpdir)
self.assertEqual(store2.apiurl, "https://example.com")
def test_package(self):
self.fileEquals("_package", "package name\n")
store2 = Store(self.tmpdir)
self.assertEqual(store2.package, "package name")
def test_project(self):
self.fileEquals("_project", "project name\n")
store2 = Store(self.tmpdir)
self.assertEqual(store2.project, "project name")
def test_scmurl(self):
self.store.scmurl = "https://example.com/project.git"
self.fileEquals("_scm", "https://example.com/project.git\n")
store2 = Store(self.tmpdir)
self.assertEqual(store2.scmurl, "https://example.com/project.git")
def test_size_limit(self):
self.store.size_limit = 123
self.fileEquals("_size_limit", "123\n")
store2 = Store(self.tmpdir)
self.assertEqual(store2.size_limit, 123)
def test_to_be_added(self):
self.store.to_be_added = ["foo", "bar", "baz"]
self.fileEquals("_to_be_added", "foo\nbar\nbaz\n")
store2 = Store(self.tmpdir)
self.assertEqual(store2.to_be_added, ["foo", "bar", "baz"])
def test_to_be_deleted(self):
self.store.to_be_deleted = ["foo", "bar", "baz"]
self.fileEquals("_to_be_deleted", "foo\nbar\nbaz\n")
store2 = Store(self.tmpdir)
self.assertEqual(store2.to_be_deleted, ["foo", "bar", "baz"])
def test_in_conflict(self):
self.store.in_conflict = ["foo", "bar", "baz"]
self.fileEquals("_in_conflict", "foo\nbar\nbaz\n")
store2 = Store(self.tmpdir)
self.assertEqual(store2.in_conflict, ["foo", "bar", "baz"])
def test_osclib_version(self):
# no setter, users are not supposed to set the version
self.assertRaises(AttributeError, setattr, self.store, "osclib_version", "123")
self.store.write_string("_osclib_version", "123")
self.fileEquals("_osclib_version", "123\n")
store2 = Store(self.tmpdir)
self.assertEqual(store2.osclib_version, "123")
def test_files(self):
files = [
osc_core.File(name="foo", md5="aabbcc", size=1, mtime=2),
osc_core.File(name="bar", md5="ddeeff", size=3, mtime=4, skipped=True),
]
self.store.files = files
expected = """
<directory>
<entry name="bar" md5="ddeeff" size="3" mtime="4" skipped="true" />
<entry name="foo" md5="aabbcc" size="1" mtime="2" />
</directory>""".lstrip()
if sys.version_info[:2] <= (3, 7):
# ElementTree doesn't preserve attribute order on py <= 3.7; https://bugs.python.org/issue34160
expected = """
<directory>
<entry md5="ddeeff" mtime="4" name="bar" size="3" skipped="true" />
<entry md5="aabbcc" mtime="2" name="foo" size="1" />
</directory>""".lstrip()
self.fileEquals("_files", expected)
store2 = Store(self.tmpdir)
files2 = store2.files
# files got ordered
self.assertTrue(files2[0] == files[1])
self.assertTrue(files2[1] == files[0])
def test_last_buildroot(self):
self.store.last_buildroot = "repo", "arch", "vm_type"
self.fileEquals("_last_buildroot", "repo\narch\nvm_type\n")
self.assertRaises(ValueError, setattr, self.store, "last_buildroot", ["one"])
self.assertRaises(ValueError, setattr, self.store, "last_buildroot", ["one", "two"])
self.assertRaises(ValueError, setattr, self.store, "last_buildroot", ["one", "two", "three", "four"])
store2 = Store(self.tmpdir)
self.assertEqual(store2.last_buildroot, ["repo", "arch", "vm_type"])
if __name__ == "__main__":
unittest.main()