diff --git a/osc/_private/api.py b/osc/_private/api.py
index f9f8c6c3..9fe29cc3 100644
--- a/osc/_private/api.py
+++ b/osc/_private/api.py
@@ -72,3 +72,19 @@ def find_node(root, root_name, node_name=None):
if node_name:
return root.find(node_name)
return root
+
+
+def write_xml_node_to_file(node, path, indent=True):
+ """
+ Write a XML node to a file.
+
+ :param node: Node to write.
+ :type node: xml.etree.ElementTree.Element
+ :param path: Path to a file that will be written to.
+ :type path: str
+ :param indent: Whether to indent (pretty-print) the written XML.
+ :type indent: bool
+ """
+ if indent:
+ osc_core.xmlindent(node)
+ osc_core.ET.ElementTree(node).write(path)
diff --git a/osc/_private/package.py b/osc/_private/package.py
index 03c3aaf7..80647c4a 100644
--- a/osc/_private/package.py
+++ b/osc/_private/package.py
@@ -1,6 +1,7 @@
import functools
from .. import core as osc_core
+from .. import store as osc_store
from . import api
@@ -78,10 +79,8 @@ class ApiPackage(PackageBase):
class LocalPackage(PackageBase):
def __init__(self, path):
self.dir = path
- apiurl = osc_core.store_read_apiurl(self.dir)
- project = osc_core.store_read_project(self.dir)
- package = osc_core.store_read_package(self.dir)
- super().__init__(apiurl, project, package)
+ self.store = osc_store.Store(self.dir)
+ super().__init__(self.store.apiurl, self.store.project, self.store.package)
def _get_directory_node(self):
- return osc_core.read_filemeta(self.dir).getroot()
+ return self.store.read_xml_node("_files", "directory").getroot()
diff --git a/osc/commandline.py b/osc/commandline.py
index 1be51da9..45babe61 100644
--- a/osc/commandline.py
+++ b/osc/commandline.py
@@ -27,6 +27,7 @@ from . import build as osc_build
from . import cmdln
from . import conf
from . import oscerr
+from . import store as osc_store
from .core import *
from .grabber import OscFileGrabber
from .meter import create_text_meter
@@ -219,7 +220,7 @@ class Osc(cmdln.Cmdln):
sys.exit(1)
if (is_package_dir(localdir) or is_project_dir(localdir)) and not self.options.apiurl:
- return store_read_apiurl(Path.cwd())
+ return osc_store.Store(Path.cwd()).apiurl
else:
return conf.config['apiurl']
@@ -994,14 +995,14 @@ class Osc(cmdln.Cmdln):
attributepath = []
if cmd in ['prj', 'prjconf']:
if len(args) < 1:
- apiurl = store_read_apiurl(Path.cwd())
+ apiurl = osc_store.Store(Path.cwd()).apiurl
project = store_read_project(Path.cwd())
else:
project = self._process_project_name(args[0])
elif cmd == 'pkg':
if len(args) < 2:
- apiurl = store_read_apiurl(Path.cwd())
+ apiurl = osc_store.Store(Path.cwd()).apiurl
project = store_read_project(Path.cwd())
if len(args) < 1:
package = store_read_package(Path.cwd())
@@ -4903,7 +4904,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
for pac in no_pacs:
if os.path.exists(pac):
# fail with an appropriate error message
- store_read_apiurl(pac, defaulturl=False)
+ Package(pac)
path = os.path.normpath(os.path.join(pac, os.pardir))
if is_project_dir(path):
pac_name = os.path.basename(os.path.normpath(os.path.abspath(pac)))
@@ -4914,7 +4915,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
files.setdefault(path, {})[pac_name] = []
else:
# fail with an appropriate error message
- store_read_apiurl(pac, defaulturl=False)
+ Package(pac)
for prj_path, packages in prj_paths.items():
prj = Project(prj_path)
if not msg and not opts.no_message:
@@ -4939,7 +4940,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
store_unlink_file(p.absdir, '_commit_msg')
elif no_pacs:
# fail with an appropriate error message
- store_read_apiurl(no_pacs[0], defaulturl=False)
+ Package.from_paths(no_pacs)
else:
for p in pacs:
if not p.todo:
@@ -6990,10 +6991,10 @@ Please submit there instead, or use --nodevelproject to force direct submission.
if is_package_dir(Path.cwd()):
project = store_read_project(Path.cwd())
package = store_read_package(Path.cwd())
- apiurl = store_read_apiurl(Path.cwd())
+ apiurl = osc_store.Store(Path.cwd()).apiurl
elif is_project_dir(Path.cwd()):
project = store_read_project(Path.cwd())
- apiurl = store_read_apiurl(Path.cwd())
+ apiurl = osc_store.Store(Path.cwd()).apiurl
else:
raise oscerr.WrongArgs('Too few arguments.')
else:
@@ -7058,7 +7059,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
if is_package_dir(Path.cwd()):
project = store_read_project(Path.cwd())
package = store_read_package(Path.cwd())
- apiurl = store_read_apiurl(Path.cwd())
+ apiurl = osc_store.Store(Path.cwd()).apiurl
repo = args[0]
arch = args[1]
sysrq = args[2]
@@ -7113,10 +7114,10 @@ Please submit there instead, or use --nodevelproject to force direct submission.
if is_package_dir(Path.cwd()):
project = store_read_project(Path.cwd())
package = store_read_package(Path.cwd())
- apiurl = store_read_apiurl(Path.cwd())
+ apiurl = osc_store.Store(Path.cwd()).apiurl
elif is_project_dir(Path.cwd()):
project = store_read_project(Path.cwd())
- apiurl = store_read_apiurl(Path.cwd())
+ apiurl = osc_store.Store(Path.cwd()).apiurl
else:
raise oscerr.WrongArgs('Too few arguments.')
else:
@@ -7931,7 +7932,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
createPackageDir(os.path.join(project.dir, pac), project)
else:
if not os.path.exists(os.path.join(project_dir, pac)):
- apiurl = store_read_apiurl(project_dir)
+ apiurl = osc_store.Store(project_dir).apiurl
user = conf.get_apiurl_usr(apiurl)
data = meta_exists(metatype='pkg',
path_args=(quote_plus(project), quote_plus(pac)),
@@ -8956,7 +8957,7 @@ Please submit there instead, or use --nodevelproject to force direct submission.
return 1
if args and is_package_dir(args[0]):
- apiurl = store_read_apiurl(args[0])
+ apiurl = osc_store.Store(args[0]).apiurl
else:
apiurl = self.get_api_url()
diff --git a/osc/core.py b/osc/core.py
index d140b74b..dc3eafe6 100644
--- a/osc/core.py
+++ b/osc/core.py
@@ -46,6 +46,7 @@ from . import conf
from . import meter
from . import oscerr
from .connection import http_request, http_GET, http_POST, http_PUT, http_DELETE
+from .store import Store
from .util.helper import decode_list, decode_it, raw_input, _html_escape
@@ -248,6 +249,7 @@ def os_path_samefile(path1, path2):
return os.path.realpath(path1) == os.path.realpath(path2)
+@total_ordering
class File:
"""represent a file, including its metadata"""
@@ -264,6 +266,18 @@ class File:
def __str__(self):
return self.name
+ def __eq__(self, other):
+ if isinstance(other, str):
+ return self.name == other
+ self_data = (self.name, self.md5, self.size, self.mtime, self.skipped)
+ other_data = (other.name, other.md5, other.size, other.mtime, other.skipped)
+ return self_data == other_data
+
+ def __lt__(self, other):
+ self_data = (self.name, self.md5, self.size, self.mtime, self.skipped)
+ other_data = (other.name, other.md5, other.size, other.mtime, other.skipped)
+ return self_data < other_data
+
@classmethod
def from_xml_node(cls, node):
assert node.tag == "entry"
@@ -272,10 +286,22 @@ class File:
"md5": node.get("md5"),
"size": int(node.get("size")),
"mtime": int(node.get("mtime")),
- "skipped": "skipped" in node,
+ "skipped": "skipped" in node.attrib,
}
return cls(**kwargs)
+ def to_xml_node(self, parent_node):
+ attributes = {
+ "name": self.name,
+ "md5": self.md5,
+ "size": str(int(self.size)),
+ "mtime": str(int(self.mtime)),
+ }
+ if self.skipped:
+ attributes["skipped"] = "true"
+ new_node = ET.SubElement(parent_node, "entry", attributes)
+ return new_node
+
class Serviceinfo:
"""Source service content
@@ -682,11 +708,12 @@ class Project:
"""
self.dir = dir
self.absdir = os.path.abspath(dir)
+ self.store = Store(dir)
self.progress_obj = progress_obj
self.name = store_read_project(self.dir)
- self.scm_url = store_read_scmurl(self.dir)
- self.apiurl = store_read_apiurl(self.dir, defaulturl=not wc_check)
+ self.scm_url = self.store.scmurl
+ self.apiurl = self.store.apiurl
dirty_files = []
if wc_check:
@@ -734,8 +761,9 @@ class Project:
return dirty_files
def wc_repair(self, apiurl=None):
- global store
- if not os.path.exists(os.path.join(self.dir, store, '_apiurl')) or apiurl:
+ store = Store(self.dir)
+ store.assert_is_project()
+ if not store.exists("_apiurl") or apiurl:
if apiurl is None:
msg = 'cannot repair wc: the \'_apiurl\' file is missing but ' \
'no \'apiurl\' was passed to wc_repair'
@@ -743,8 +771,8 @@ class Project:
raise oscerr.WorkingCopyInconsistent(self.name, None, [], msg)
# sanity check
conf.parse_apisrv_url(None, apiurl)
- store_write_apiurl(self.dir, apiurl)
- self.apiurl = store_read_apiurl(self.dir, defaulturl=False)
+ store.apiurl = apiurl
+ self.apiurl = apiurl
def checkout_missing_pacs(self, sinfos, expand_link=False, unexpand_link=False):
for pac in self.pacs_missing:
@@ -1120,9 +1148,10 @@ class Project:
else:
pac_path = os.path.join(self.dir, pac)
+ store = Store(pac_path)
project = store_read_project(pac_path)
package = store_read_package(pac_path)
- apiurl = store_read_apiurl(pac_path, defaulturl=False)
+ apiurl = store.apiurl
if not meta_exists(metatype='pkg',
path_args=(quote_plus(project), quote_plus(package)),
template_args=None, create_new=False, apiurl=apiurl):
@@ -1158,9 +1187,9 @@ class Project:
os.mkdir(os.path.join(dir, store))
store_write_project(dir, project)
- store_write_apiurl(dir, apiurl)
+ Store(dir).apiurl = apiurl
if scm_url:
- store_write_string(dir, '_scm', scm_url + '\n')
+ Store(dir).scmurl = scm_url
package_tracking = None
if package_tracking:
store_write_initial_packages(dir, project, [])
@@ -1182,10 +1211,11 @@ class Package:
self.dir = workingdir
self.absdir = os.path.abspath(self.dir)
+ self.store = Store(self.dir)
self.storedir = os.path.join(self.absdir, store)
self.progress_obj = progress_obj
self.size_limit = size_limit
- self.scm_url = store_read_scmurl(self.dir)
+ self.scm_url = self.store.scmurl
if size_limit and size_limit == 0:
self.size_limit = None
@@ -1193,7 +1223,7 @@ class Package:
self.prjname = store_read_project(self.dir)
self.name = store_read_package(self.dir)
- self.apiurl = store_read_apiurl(self.dir, defaulturl=not wc_check)
+ self.apiurl = self.store.apiurl
self.update_datastructs()
dirty_files = []
@@ -1294,7 +1324,9 @@ class Package:
return dirty_files
def wc_repair(self, apiurl=None):
- if not os.path.exists(os.path.join(self.storedir, '_apiurl')) or apiurl:
+ store = Store(self.dir)
+ store.assert_is_package()
+ if not store.exists("_apiurl") or apiurl:
if apiurl is None:
msg = 'cannot repair wc: the \'_apiurl\' file is missing but ' \
'no \'apiurl\' was passed to wc_repair'
@@ -1302,8 +1334,9 @@ class Package:
raise oscerr.WorkingCopyInconsistent(self.prjname, self.name, [], msg)
# sanity check
conf.parse_apisrv_url(None, apiurl)
- store_write_apiurl(self.dir, apiurl)
- self.apiurl = store_read_apiurl(self.dir, defaulturl=False)
+ store.apiurl = apiurl
+ self.apiurl = apiurl
+
# all files which are present in the filelist have to exist in the storedir
for f in self.filelist:
# XXX: should we also check the md5?
@@ -1312,17 +1345,20 @@ class Package:
get_source_file(self.apiurl, self.prjname, self.name, f.name,
targetfilename=os.path.join(self.storedir, f.name), revision=self.rev,
mtime=f.mtime)
- for fname in os.listdir(self.storedir):
+
+ for fname in store:
if fname in Package.REQ_STOREFILES or fname in Package.OPT_STOREFILES or \
fname.startswith('_build'):
continue
elif fname not in self.filenamelist or fname in self.skipped:
# this file does not belong to the storedir so remove it
- os.unlink(os.path.join(self.storedir, fname))
+ store.unlink(fname)
+
for fname in self.to_be_deleted[:]:
if fname not in self.filenamelist:
self.to_be_deleted.remove(fname)
self.write_deletelist()
+
for fname in self.in_conflict[:]:
if fname not in self.filenamelist:
self.in_conflict.remove(fname)
@@ -2593,13 +2629,13 @@ rev: %s
os.mkdir(os.path.join(dir, store))
store_write_project(dir, project)
store_write_string(dir, '_package', package + '\n')
- store_write_apiurl(dir, apiurl)
+ Store(dir).apiurl = apiurl
if meta:
store_write_string(dir, '_meta_mode', '')
if size_limit:
store_write_string(dir, '_size_limit', str(size_limit) + '\n')
if scm_url:
- store_write_string(dir, '_scm', scm_url + '\n')
+ Store(dir).scmurl = scm_url
else:
store_write_string(dir, '_files', '' + '\n')
store_write_string(dir, '_osclib_version', __store_version__ + '\n')
@@ -6708,49 +6744,23 @@ def store_read_package(dir):
def store_read_scmurl(dir):
- global store
-
- url_file = os.path.join(dir, store, '_scm')
- if not os.path.exists(url_file):
- return
- try:
- p = open(url_file).readlines()[0].strip()
- except OSError:
- msg = 'Error: \'%s\' is not an osc package working copy' % os.path.abspath(dir)
- if os.path.exists(os.path.join(dir, '.svn')):
- msg += '\nTry svn instead of osc.'
- raise oscerr.NoWorkingCopy(msg)
- return p
+ import warnings
+ warnings.warn(
+ "osc.core.store_read_scmurl() is deprecated. "
+ "You should be using high-level classes such as Store, Project or Package instead.",
+ DeprecationWarning
+ )
+ return Store(dir).scmurl
def store_read_apiurl(dir, defaulturl=True):
- global store
-
- fname = os.path.join(dir, store, '_apiurl')
- try:
- with open(fname) as f:
- url = f.readlines()[0].strip()
- # this is needed to get a proper apiurl
- # (former osc versions may stored an apiurl with a trailing slash etc.)
- apiurl = conf.urljoin(*conf.parse_apisrv_url(None, url))
- except:
- if not defaulturl:
- if is_project_dir(dir):
- project = store_read_project(dir)
- package = None
- elif is_package_dir(dir):
- project = store_read_project(dir)
- package = None
- else:
- msg = 'Error: \'%s\' is not an osc package working copy' % os.path.abspath(dir)
- raise oscerr.NoWorkingCopy(msg)
- msg = 'Your working copy \'%s\' is in an inconsistent state.\n' \
- 'Please run \'osc repairwc %s\' (Note this might _remove_\n' \
- 'files from the .osc/ dir). Please check the state\n' \
- 'of the working copy afterwards (via \'osc status %s\')' % (dir, dir, dir)
- raise oscerr.WorkingCopyInconsistent(project, package, ['_apiurl'], msg)
- apiurl = conf.config['apiurl']
- return apiurl
+ import warnings
+ warnings.warn(
+ "osc.core.store_read_apiurl() is deprecated. "
+ "You should be using high-level classes such as Store, Project or Package instead.",
+ DeprecationWarning
+ )
+ return Store(dir).apiurl
def store_read_last_buildroot(dir):
@@ -6789,7 +6799,13 @@ def store_write_project(dir, project):
def store_write_apiurl(dir, apiurl):
- store_write_string(dir, '_apiurl', apiurl + '\n')
+ import warnings
+ warnings.warn(
+ "osc.core.store_write_apiurl() is deprecated. "
+ "You should be using high-level classes such as Store, Project or Package instead.",
+ DeprecationWarning
+ )
+ Store(dir).apiurl = apiurl
def store_write_last_buildroot(dir, repo, arch, vm_type):
@@ -7458,8 +7474,9 @@ def addFiles(filenames, prj_obj=None, force=False):
prj_dir, pac_dir = getPrjPacPaths(filename)
if not is_package_dir(filename) and os.path.isdir(filename) and is_project_dir(prj_dir) \
and conf.config['do_package_tracking']:
+ store = Store(prj_dir)
prj_name = store_read_project(prj_dir)
- prj_apiurl = store_read_apiurl(prj_dir, defaulturl=False)
+ prj_apiurl = store.apiurl
Package.init_package(prj_apiurl, prj_name, pac_dir, filename)
elif is_package_dir(filename) and conf.config['do_package_tracking']:
print('osc: warning: \'%s\' is already under version control' % filename)
diff --git a/osc/store.py b/osc/store.py
new file mode 100644
index 00000000..1d0199bb
--- /dev/null
+++ b/osc/store.py
@@ -0,0 +1,292 @@
+"""
+Store class wraps access to files in the '.osc' directory.
+It is meant to be used as an implementation detail of Project and Package classes
+and shouldn't be used in any code outside osc.
+"""
+
+
+import os
+from xml.etree import ElementTree as ET
+
+from . import oscerr
+from ._private import api
+
+
+class Store:
+ STORE_DIR = ".osc"
+ STORE_VERSION = "1.0"
+
+ @classmethod
+ def is_project_dir(cls, path):
+ try:
+ store = cls(path)
+ except oscerr.NoWorkingCopy:
+ return False
+ return store.is_project
+
+ @classmethod
+ def is_package_dir(cls, path):
+ try:
+ store = cls(path)
+ except oscerr.NoWorkingCopy:
+ return False
+ return store.is_package
+
+ def __init__(self, path, check=True):
+ self.path = path
+ self.abspath = os.path.abspath(self.path)
+
+ self.is_project = self.exists("_project") and not self.exists("_package")
+ self.is_package = self.exists("_project") and self.exists("_package")
+
+ if check and not any([self.is_project, self.is_package]):
+ msg = f"Directory '{self.path}' is not a working copy"
+ raise oscerr.NoWorkingCopy(msg)
+
+ def __contains__(self, fn):
+ return self.exists(fn)
+
+ def __iter__(self):
+ path = os.path.join(self.abspath, self.STORE_DIR)
+ yield from os.listdir(path)
+
+ def assert_is_project(self):
+ if not self.is_project:
+ msg = f"Directory '{self.path}' is not a working copy of a project"
+ raise oscerr.NoWorkingCopy(msg)
+
+ def assert_is_package(self):
+ if not self.is_package:
+ msg = f"Directory '{self.path}' is not a working copy of a package"
+ raise oscerr.NoWorkingCopy(msg)
+
+ def get_path(self, fn, subdir=None):
+ # sanitize input to ensure that joining path works as expected
+ fn = fn.lstrip("/")
+ if subdir:
+ subdir = subdir.lstrip("/")
+ return os.path.join(self.abspath, self.STORE_DIR, subdir, fn)
+ return os.path.join(self.abspath, self.STORE_DIR, fn)
+
+ def exists(self, fn, subdir=None):
+ return os.path.exists(self.get_path(fn, subdir=subdir))
+
+ def unlink(self, fn, subdir=None):
+ try:
+ os.unlink(self.get_path(fn, subdir=subdir))
+ except FileNotFoundError:
+ pass
+
+ def read_file(self, fn, subdir=None):
+ if not self.exists(fn, subdir=subdir):
+ return None
+ with open(self.get_path(fn, subdir=subdir), encoding="utf-8") as f:
+ return f.read()
+
+ def write_file(self, fn, value, subdir=None):
+ if value is None:
+ self.unlink(fn, subdir=subdir)
+ return
+ try:
+ if subdir:
+ os.makedirs(self.get_path(subdir))
+ else:
+ os.makedirs(self.get_path(""))
+ except FileExistsError:
+ pass
+
+ old = self.get_path(fn, subdir=subdir)
+ new = self.get_path(f"{fn}.new", subdir=subdir)
+ try:
+ with open(new, "w", encoding="utf-8") as f:
+ f.write(value)
+ os.rename(new, old)
+ except:
+ if os.path.exists(new):
+ os.unlink(new)
+ raise
+
+ def read_list(self, fn, subdir=None):
+ if not self.exists(fn, subdir=subdir):
+ return None
+ with open(self.get_path(fn, subdir=subdir), encoding="utf-8") as f:
+ return [line.rstrip("\n") for line in f]
+
+ def write_list(self, fn, value, subdir=None):
+ if value is None:
+ self.unlink(fn, subdir=subdir)
+ return
+ if not isinstance(value, (list, tuple)):
+ msg = f"The argument `value` should be list, not {type(value).__name__}"
+ raise TypeError(msg)
+ value = "".join((f"{line}\n" for line in value))
+ self.write_file(fn, value, subdir=subdir)
+
+ def read_string(self, fn, subdir=None):
+ if not self.exists(fn, subdir=subdir):
+ return None
+ with open(self.get_path(fn, subdir=subdir), encoding="utf-8") as f:
+ return f.readline().strip()
+
+ def write_string(self, fn, value, subdir=None):
+ if value is None:
+ self.unlink(fn, subdir=subdir)
+ return
+ if isinstance(value, bytes):
+ value = value.decode("utf-8")
+ if not isinstance(value, str):
+ msg = f"The argument `value` should be str, not {type(value).__name__}"
+ raise TypeError(msg)
+ self.write_file(fn, value + "\n", subdir=subdir)
+
+ def read_int(self, fn):
+ if not self.exists(fn):
+ return None
+ result = self.read_string(fn)
+ if not result.isdigit():
+ return None
+ return int(result)
+
+ def write_int(self, fn, value, subdir=None):
+ if value is None:
+ self.unlink(fn, subdir=subdir)
+ return
+ if not isinstance(value, int):
+ msg = f"The argument `value` should be int, not {type(value).__name__}"
+ raise TypeError(msg)
+ value = str(value)
+ self.write_string(fn, value, subdir=subdir)
+
+ def read_xml_node(self, fn, node_name, subdir=None):
+ path = self.get_path(fn, subdir=subdir)
+ try:
+ tree = ET.parse(path)
+ except SyntaxError as e:
+ msg = f"Unable to parse '{path}': {e}"
+ raise oscerr.NoWorkingCopy(msg)
+ root = tree.getroot()
+ assert root.tag == node_name
+ # TODO: return root?
+ return tree
+
+ def write_xml_node(self, fn, node_name, node, subdir=None):
+ path = self.get_path(fn, subdir=subdir)
+ assert node.tag == node_name
+ api.write_xml_node_to_file(node, path)
+
+ @property
+ def apiurl(self):
+ return self.read_string("_apiurl")
+
+ @apiurl.setter
+ def apiurl(self, value):
+ self.write_string("_apiurl", value)
+
+ @property
+ def project(self):
+ return self.read_string("_project")
+
+ @project.setter
+ def project(self, value):
+ self.write_string("_project", value)
+
+ @property
+ def package(self):
+ return self.read_string("_package")
+
+ @package.setter
+ def package(self, value):
+ self.write_string("_package", value)
+
+ @property
+ def scmurl(self):
+ return self.read_string("_scm")
+
+ @scmurl.setter
+ def scmurl(self, value):
+ return self.write_string("_scm", value)
+
+ @property
+ def size_limit(self):
+ return self.read_int("_size_limit")
+
+ @size_limit.setter
+ def size_limit(self, value):
+ return self.write_int("_size_limit", value)
+
+ @property
+ def to_be_added(self):
+ self.assert_is_package()
+ return self.read_list("_to_be_added") or []
+
+ @to_be_added.setter
+ def to_be_added(self, value):
+ self.assert_is_package()
+ return self.write_list("_to_be_added", value)
+
+ @property
+ def to_be_deleted(self):
+ self.assert_is_package()
+ return self.read_list("_to_be_deleted") or []
+
+ @to_be_deleted.setter
+ def to_be_deleted(self, value):
+ self.assert_is_package()
+ return self.write_list("_to_be_deleted", value)
+
+ @property
+ def in_conflict(self):
+ self.assert_is_package()
+ return self.read_list("_in_conflict") or []
+
+ @in_conflict.setter
+ def in_conflict(self, value):
+ self.assert_is_package()
+ return self.write_list("_in_conflict", value)
+
+ @property
+ def osclib_version(self):
+ return self.read_string("_osclib_version")
+
+ @property
+ def files(self):
+ self.assert_is_package()
+ if self.exists("_scm"):
+ msg = "Package '{self.path}' is managed via SCM"
+ raise oscerr.NoWorkingCopy(msg)
+ if not self.exists("_files"):
+ msg = "Package '{self.path}' doesn't contain _files metadata"
+ raise oscerr.NoWorkingCopy(msg)
+ result = []
+ directory_node = self.read_xml_node("_files", "directory").getroot()
+ from . import core as osc_core
+ for entry_node in api.find_nodes(directory_node, "directory", "entry"):
+ result.append(osc_core.File.from_xml_node(entry_node))
+ return result
+
+ @files.setter
+ def files(self, value):
+ if not isinstance(value, (list, tuple)):
+ msg = f"The argument `value` should be list, not {type(value).__name__}"
+ raise TypeError(msg)
+
+ root = ET.Element("directory")
+ for file_obj in sorted(value):
+ file_obj.to_xml_node(root)
+ self.write_xml_node("_files", "directory", root)
+
+ @property
+ def last_buildroot(self):
+ self.assert_is_package()
+ items = self.read_list("_last_buildroot")
+ if items is not None and len(items) != 3:
+ msg = f"Package '{self.path}' contains _last_buildroot metadata that doesn't contain 3 lines: [repo, arch, vm_type]"
+ raise oscerr.NoWorkingCopy(msg)
+ return items
+
+ @last_buildroot.setter
+ def last_buildroot(self, value):
+ self.assert_is_package()
+ if len(value) != 3:
+ raise ValueError("A list with exactly 3 items is expected: [repo, arch, vm_type]")
+ self.write_list("_last_buildroot", value)
diff --git a/tests/test_store.py b/tests/test_store.py
new file mode 100644
index 00000000..d905c0ec
--- /dev/null
+++ b/tests/test_store.py
@@ -0,0 +1,200 @@
+import os
+import sys
+import tempfile
+import unittest
+
+import osc.core as osc_core
+from osc.store import Store
+
+
+class TestStore(unittest.TestCase):
+ def setUp(self):
+ self.tmpdir = tempfile.mkdtemp(prefix='osc_test')
+ self.store = Store(self.tmpdir, check=False)
+ self.store.is_package = True
+ self.store.project = "project name"
+ self.store.package = "package name"
+
+ def tearDown(self):
+ try:
+ shutil.rmtree(self.tmpdir)
+ except:
+ pass
+
+ def fileEquals(self, fn, expected_value):
+ path = os.path.join(self.tmpdir, ".osc", fn)
+ with open(path) as f:
+ actual_value = f.read()
+ self.assertEqual(actual_value, expected_value, f"File: {fn}")
+
+ def test_read_write_file(self):
+ self.store.write_file("_file", "\n\nline1\nline2")
+ self.fileEquals("_file", "\n\nline1\nline2")
+ self.assertEqual(self.store.read_file("_file"), "\n\nline1\nline2")
+
+ # writing None removes the file
+ self.store.write_file("_file", None)
+ self.assertFalse(self.store.exists("_file"))
+
+ self.assertRaises(TypeError, self.store.write_string, "_file", 123)
+ self.assertRaises(TypeError, self.store.write_string, "_file", ["123"])
+
+ def test_read_write_int(self):
+ self.store.write_int("_int", 123)
+ self.fileEquals("_int", "123\n")
+ self.assertEqual(self.store.read_int("_int"), 123)
+
+ # writing None removes the file
+ self.store.write_int("_int", None)
+ self.assertFalse(self.store.exists("_int"))
+
+ self.assertRaises(TypeError, self.store.write_int, "_int", "123")
+ self.assertRaises(TypeError, self.store.write_int, "_int", b"123")
+ self.assertRaises(TypeError, self.store.write_int, "_int", ["123"])
+
+ def test_read_write_list(self):
+ self.store.write_list("_list", ["one", "two", "three"])
+ self.fileEquals("_list", "one\ntwo\nthree\n")
+ self.assertEqual(self.store.read_list("_list"), ["one", "two", "three"])
+
+ # writing None removes the file
+ self.store.write_list("_list", None)
+ self.assertFalse(self.store.exists("_list"))
+
+ self.assertRaises(TypeError, self.store.write_list, "_list", "123")
+ self.assertRaises(TypeError, self.store.write_list, "_list", b"123")
+ self.assertRaises(TypeError, self.store.write_list, "_list", 123)
+
+ def test_read_write_string(self):
+ self.store.write_string("_string", "string")
+ self.fileEquals("_string", "string\n")
+ self.assertEqual(self.store.read_string("_string"), "string")
+
+ self.store.write_string("_bytes", b"bytes")
+ self.fileEquals("_bytes", "bytes\n")
+ self.assertEqual(self.store.read_string("_bytes"), "bytes")
+
+ # writing None removes the file
+ self.store.write_string("_string", None)
+ self.assertFalse(self.store.exists("_string"))
+
+ self.assertRaises(TypeError, self.store.write_string, "_string", 123)
+ self.assertRaises(TypeError, self.store.write_string, "_string", ["123"])
+
+ def test_contains(self):
+ self.assertTrue("_project" in self.store)
+ self.assertTrue("_package" in self.store)
+ self.assertFalse("_foo" in self.store)
+
+ def test_iter(self):
+ self.assertEqual(len(list(self.store)), 2)
+ for fn in self.store:
+ self.assertIn(fn, ["_project", "_package"])
+
+ def test_apiurl(self):
+ self.store.apiurl = "https://example.com"
+ self.fileEquals("_apiurl", "https://example.com\n")
+
+ store2 = Store(self.tmpdir)
+ self.assertEqual(store2.apiurl, "https://example.com")
+
+ def test_package(self):
+ self.fileEquals("_package", "package name\n")
+
+ store2 = Store(self.tmpdir)
+ self.assertEqual(store2.package, "package name")
+
+ def test_project(self):
+ self.fileEquals("_project", "project name\n")
+
+ store2 = Store(self.tmpdir)
+ self.assertEqual(store2.project, "project name")
+
+ def test_scmurl(self):
+ self.store.scmurl = "https://example.com/project.git"
+ self.fileEquals("_scm", "https://example.com/project.git\n")
+
+ store2 = Store(self.tmpdir)
+ self.assertEqual(store2.scmurl, "https://example.com/project.git")
+
+ def test_size_limit(self):
+ self.store.size_limit = 123
+ self.fileEquals("_size_limit", "123\n")
+
+ store2 = Store(self.tmpdir)
+ self.assertEqual(store2.size_limit, 123)
+
+ def test_to_be_added(self):
+ self.store.to_be_added = ["foo", "bar", "baz"]
+ self.fileEquals("_to_be_added", "foo\nbar\nbaz\n")
+
+ store2 = Store(self.tmpdir)
+ self.assertEqual(store2.to_be_added, ["foo", "bar", "baz"])
+
+ def test_to_be_deleted(self):
+ self.store.to_be_deleted = ["foo", "bar", "baz"]
+ self.fileEquals("_to_be_deleted", "foo\nbar\nbaz\n")
+
+ store2 = Store(self.tmpdir)
+ self.assertEqual(store2.to_be_deleted, ["foo", "bar", "baz"])
+
+ def test_in_conflict(self):
+ self.store.in_conflict = ["foo", "bar", "baz"]
+ self.fileEquals("_in_conflict", "foo\nbar\nbaz\n")
+
+ store2 = Store(self.tmpdir)
+ self.assertEqual(store2.in_conflict, ["foo", "bar", "baz"])
+
+ def test_osclib_version(self):
+ # no setter, users are not supposed to set the version
+ self.assertRaises(AttributeError, setattr, self.store, "osclib_version", "123")
+ self.store.write_string("_osclib_version", "123")
+ self.fileEquals("_osclib_version", "123\n")
+
+ store2 = Store(self.tmpdir)
+ self.assertEqual(store2.osclib_version, "123")
+
+ def test_files(self):
+ files = [
+ osc_core.File(name="foo", md5="aabbcc", size=1, mtime=2),
+ osc_core.File(name="bar", md5="ddeeff", size=3, mtime=4, skipped=True),
+ ]
+
+ self.store.files = files
+
+ expected = """
+
+
+
+""".lstrip()
+
+ if sys.version_info[:2] <= (3, 7):
+ # ElementTree doesn't preserve attribute order on py <= 3.7; https://bugs.python.org/issue34160
+ expected = """
+
+
+
+""".lstrip()
+
+ self.fileEquals("_files", expected)
+
+ store2 = Store(self.tmpdir)
+ files2 = store2.files
+ # files got ordered
+ self.assertTrue(files2[0] == files[1])
+ self.assertTrue(files2[1] == files[0])
+
+ def test_last_buildroot(self):
+ self.store.last_buildroot = "repo", "arch", "vm_type"
+ self.fileEquals("_last_buildroot", "repo\narch\nvm_type\n")
+
+ self.assertRaises(ValueError, setattr, self.store, "last_buildroot", ["one"])
+ self.assertRaises(ValueError, setattr, self.store, "last_buildroot", ["one", "two"])
+ self.assertRaises(ValueError, setattr, self.store, "last_buildroot", ["one", "two", "three", "four"])
+
+ store2 = Store(self.tmpdir)
+ self.assertEqual(store2.last_buildroot, ["repo", "arch", "vm_type"])
+
+
+if __name__ == "__main__":
+ unittest.main()