1
0
mirror of https://github.com/openSUSE/osc.git synced 2024-11-15 00:36:11 +01:00
github.com_openSUSE_osc/osc/obs_scm/store.py

567 lines
16 KiB
Python

"""
Store class wraps access to files in the '.osc' directory.
It is meant to be used as an implementation detail of Project and Package classes
and shouldn't be used in any code outside osc.
"""
import os
from .. import oscerr
from .._private import api
from ..util.xml import ET
# __store_version__ is to be incremented when the format of the working copy
# "store" changes in an incompatible way. Please add any needed migration
# functionality to check_store_version().
__store_version__ = '1.0'
class Store:
STORE_DIR = ".osc"
STORE_VERSION = "1.0"
@classmethod
def is_project_dir(cls, path):
try:
store = cls(path)
except oscerr.NoWorkingCopy:
return False
return store.is_project
@classmethod
def is_package_dir(cls, path):
try:
store = cls(path)
except oscerr.NoWorkingCopy:
return False
return store.is_package
def __init__(self, path, check=True):
self.path = path
self.abspath = os.path.abspath(self.path)
self.is_project = self.exists("_project") and not self.exists("_package")
self.is_package = self.exists("_project") and self.exists("_package")
if check and not any([self.is_project, self.is_package]):
msg = f"Directory '{self.path}' is not an OBS SCM working copy"
raise oscerr.NoWorkingCopy(msg)
def __contains__(self, fn):
return self.exists(fn)
def __iter__(self):
path = os.path.join(self.abspath, self.STORE_DIR)
yield from os.listdir(path)
def assert_is_project(self):
if not self.is_project:
msg = f"Directory '{self.path}' is not an OBS SCM working copy of a project"
raise oscerr.NoWorkingCopy(msg)
def assert_is_package(self):
if not self.is_package:
msg = f"Directory '{self.path}' is not an OBS SCM working copy of a package"
raise oscerr.NoWorkingCopy(msg)
def get_path(self, fn, subdir=None):
# sanitize input to ensure that joining path works as expected
fn = fn.lstrip("/")
if subdir:
subdir = subdir.lstrip("/")
return os.path.join(self.abspath, self.STORE_DIR, subdir, fn)
return os.path.join(self.abspath, self.STORE_DIR, fn)
def exists(self, fn, subdir=None):
return os.path.exists(self.get_path(fn, subdir=subdir))
def unlink(self, fn, subdir=None):
try:
os.unlink(self.get_path(fn, subdir=subdir))
except FileNotFoundError:
pass
def read_file(self, fn, subdir=None):
if not self.exists(fn, subdir=subdir):
return None
with open(self.get_path(fn, subdir=subdir), encoding="utf-8") as f:
return f.read()
def write_file(self, fn, value, subdir=None):
if value is None:
self.unlink(fn, subdir=subdir)
return
try:
if subdir:
os.makedirs(self.get_path(subdir))
else:
os.makedirs(self.get_path(""))
except FileExistsError:
pass
old = self.get_path(fn, subdir=subdir)
new = self.get_path(f"{fn}.new", subdir=subdir)
try:
with open(new, "w", encoding="utf-8") as f:
f.write(value)
os.rename(new, old)
except:
if os.path.exists(new):
os.unlink(new)
raise
def read_list(self, fn, subdir=None):
if not self.exists(fn, subdir=subdir):
return None
with open(self.get_path(fn, subdir=subdir), encoding="utf-8") as f:
return [line.rstrip("\n") for line in f]
def write_list(self, fn, value, subdir=None):
if value is None:
self.unlink(fn, subdir=subdir)
return
if not isinstance(value, (list, tuple)):
msg = f"The argument `value` should be list, not {type(value).__name__}"
raise TypeError(msg)
value = "".join((f"{line or ''}\n" for line in value))
self.write_file(fn, value, subdir=subdir)
def read_string(self, fn, subdir=None):
if not self.exists(fn, subdir=subdir):
return None
with open(self.get_path(fn, subdir=subdir), encoding="utf-8") as f:
return f.readline().strip()
def write_string(self, fn, value, subdir=None):
if value is None:
self.unlink(fn, subdir=subdir)
return
if isinstance(value, bytes):
value = value.decode("utf-8")
if not isinstance(value, str):
msg = f"The argument `value` should be str, not {type(value).__name__}"
raise TypeError(msg)
self.write_file(fn, f"{value}\n", subdir=subdir)
def read_int(self, fn):
if not self.exists(fn):
return None
result = self.read_string(fn)
if not result.isdigit():
return None
return int(result)
def write_int(self, fn, value, subdir=None):
if value is None:
self.unlink(fn, subdir=subdir)
return
if not isinstance(value, int):
msg = f"The argument `value` should be int, not {type(value).__name__}"
raise TypeError(msg)
value = str(value)
self.write_string(fn, value, subdir=subdir)
def read_xml_node(self, fn, node_name, subdir=None):
path = self.get_path(fn, subdir=subdir)
try:
tree = ET.parse(path)
except SyntaxError as e:
msg = f"Unable to parse '{path}': {e}"
raise oscerr.NoWorkingCopy(msg)
root = tree.getroot()
assert root.tag == node_name
# TODO: return root?
return tree
def write_xml_node(self, fn, node_name, node, subdir=None):
path = self.get_path(fn, subdir=subdir)
assert node.tag == node_name
api.write_xml_node_to_file(node, path)
def _sanitize_apiurl(self, value):
# apiurl shouldn't end with a slash, strip it so we can use apiurl without modifications
# in config['api_host_options'][apiurl] and other places
if isinstance(value, str):
value = value.strip("/")
elif isinstance(value, bytes):
value = value.strip(b"/")
return value
@property
def apiurl(self):
return self._sanitize_apiurl(self.read_string("_apiurl"))
@apiurl.setter
def apiurl(self, value):
self.write_string("_apiurl", self._sanitize_apiurl(value))
@property
def project(self):
return self.read_string("_project")
@project.setter
def project(self, value):
self.write_string("_project", value)
@property
def package(self):
return self.read_string("_package")
@package.setter
def package(self, value):
self.write_string("_package", value)
@property
def scmurl(self):
return self.read_string("_scm")
@scmurl.setter
def scmurl(self, value):
return self.write_string("_scm", value)
@property
def size_limit(self):
return self.read_int("_size_limit")
@size_limit.setter
def size_limit(self, value):
return self.write_int("_size_limit", value)
@property
def to_be_added(self):
self.assert_is_package()
return self.read_list("_to_be_added") or []
@to_be_added.setter
def to_be_added(self, value):
self.assert_is_package()
return self.write_list("_to_be_added", value)
@property
def to_be_deleted(self):
self.assert_is_package()
return self.read_list("_to_be_deleted") or []
@to_be_deleted.setter
def to_be_deleted(self, value):
self.assert_is_package()
return self.write_list("_to_be_deleted", value)
@property
def in_conflict(self):
self.assert_is_package()
return self.read_list("_in_conflict") or []
@in_conflict.setter
def in_conflict(self, value):
self.assert_is_package()
return self.write_list("_in_conflict", value)
@property
def osclib_version(self):
return self.read_string("_osclib_version")
@property
def files(self):
from .. import core as osc_core
self.assert_is_package()
if self.exists("_scm"):
msg = "Package '{self.path}' is managed via SCM"
raise oscerr.NoWorkingCopy(msg)
if not self.exists("_files"):
msg = "Package '{self.path}' doesn't contain _files metadata"
raise oscerr.NoWorkingCopy(msg)
result = []
directory_node = self.read_xml_node("_files", "directory").getroot()
for entry_node in api.find_nodes(directory_node, "directory", "entry"):
result.append(osc_core.File.from_xml_node(entry_node))
return result
@files.setter
def files(self, value):
if not isinstance(value, (list, tuple)):
msg = f"The argument `value` should be list, not {type(value).__name__}"
raise TypeError(msg)
root = ET.Element("directory")
for file_obj in sorted(value):
file_obj.to_xml_node(root)
self.write_xml_node("_files", "directory", root)
@property
def last_buildroot(self):
self.assert_is_package()
items = self.read_list("_last_buildroot")
if items is None:
return items
if len(items) != 3:
msg = f"Package '{self.path}' contains _last_buildroot metadata that doesn't contain 3 lines: [repo, arch, vm_type]"
raise oscerr.NoWorkingCopy(msg)
if items[2] in ("", "None"):
items[2] = None
return items
@last_buildroot.setter
def last_buildroot(self, value):
self.assert_is_package()
if len(value) != 3:
raise ValueError("A list with exactly 3 items is expected: [repo, arch, vm_type]")
self.write_list("_last_buildroot", value)
@property
def _meta_node(self):
if not self.exists("_meta"):
return None
if self.is_package:
root = self.read_xml_node("_meta", "package").getroot()
else:
root = self.read_xml_node("_meta", "project").getroot()
return root
store = '.osc'
def check_store_version(dir):
global store
versionfile = os.path.join(dir, store, '_osclib_version')
try:
with open(versionfile) as f:
v = f.read().strip()
except:
v = ''
if v == '':
msg = f'Error: "{os.path.abspath(dir)}" is not an osc package working copy.'
if os.path.exists(os.path.join(dir, '.svn')):
msg = msg + '\nTry svn instead of osc.'
raise oscerr.NoWorkingCopy(msg)
if v != __store_version__:
if v in ['0.2', '0.3', '0.4', '0.5', '0.6', '0.7', '0.8', '0.9', '0.95', '0.96', '0.97', '0.98', '0.99']:
# version is fine, no migration needed
f = open(versionfile, 'w')
f.write(__store_version__ + '\n')
f.close()
return
msg = f'The osc metadata of your working copy "{dir}"'
msg += f'\nhas __store_version__ = {v}, but it should be {__store_version__}'
msg += '\nPlease do a fresh checkout or update your client. Sorry about the inconvenience.'
raise oscerr.WorkingCopyWrongVersion(msg)
def is_project_dir(d):
global store
return os.path.exists(os.path.join(d, store, '_project')) and not \
os.path.exists(os.path.join(d, store, '_package'))
def is_package_dir(d):
global store
return os.path.exists(os.path.join(d, store, '_project')) and \
os.path.exists(os.path.join(d, store, '_package'))
def read_filemeta(dir):
global store
msg = f'\'{dir}\' is not a valid working copy.'
filesmeta = os.path.join(dir, store, '_files')
if not is_package_dir(dir):
raise oscerr.NoWorkingCopy(msg)
if os.path.isfile(os.path.join(dir, store, '_scm')):
raise oscerr.NoWorkingCopy("Is managed via scm")
if not os.path.isfile(filesmeta):
raise oscerr.NoWorkingCopy(f'{msg} ({filesmeta} does not exist)')
try:
r = ET.parse(filesmeta)
except SyntaxError as e:
raise oscerr.NoWorkingCopy(f'{msg}\nWhen parsing .osc/_files, the following error was encountered:\n{e}')
return r
def store_readlist(dir, name):
global store
r = []
if os.path.exists(os.path.join(dir, store, name)):
with open(os.path.join(dir, store, name)) as f:
r = [line.rstrip('\n') for line in f]
return r
def read_tobeadded(dir):
return store_readlist(dir, '_to_be_added')
def read_tobedeleted(dir):
return store_readlist(dir, '_to_be_deleted')
def read_sizelimit(dir):
global store
r = None
fname = os.path.join(dir, store, '_size_limit')
if os.path.exists(fname):
with open(fname) as f:
r = f.readline().strip()
if r is None or not r.isdigit():
return None
return int(r)
def read_inconflict(dir):
return store_readlist(dir, '_in_conflict')
def store_read_project(dir):
global store
try:
with open(os.path.join(dir, store, '_project')) as f:
p = f.readline().strip()
except OSError:
msg = f'Error: \'{os.path.abspath(dir)}\' is not an osc project dir or working copy'
if os.path.exists(os.path.join(dir, '.svn')):
msg += '\nTry svn instead of osc.'
raise oscerr.NoWorkingCopy(msg)
return p
def store_read_package(dir):
global store
try:
with open(os.path.join(dir, store, '_package')) as f:
p = f.readline().strip()
except OSError:
msg = f'Error: \'{os.path.abspath(dir)}\' is not an osc package working copy'
if os.path.exists(os.path.join(dir, '.svn')):
msg += '\nTry svn instead of osc.'
raise oscerr.NoWorkingCopy(msg)
return p
def store_read_scmurl(dir):
import warnings
warnings.warn(
"osc.core.store_read_scmurl() is deprecated. "
"You should be using high-level classes such as Store, Project or Package instead.",
DeprecationWarning
)
return Store(dir).scmurl
def store_read_apiurl(dir, defaulturl=True):
import warnings
warnings.warn(
"osc.core.store_read_apiurl() is deprecated. "
"You should be using high-level classes such as Store, Project or Package instead.",
DeprecationWarning
)
return Store(dir).apiurl
def store_read_last_buildroot(dir):
global store
fname = os.path.join(dir, store, '_last_buildroot')
if os.path.exists(fname):
lines = open(fname).read().splitlines()
if len(lines) == 3:
return lines
return
def store_write_string(dir, file, string, subdir=''):
from ..core import decode_it
global store
if subdir and not os.path.isdir(os.path.join(dir, store, subdir)):
os.mkdir(os.path.join(dir, store, subdir))
fname = os.path.join(dir, store, subdir, file)
try:
f = open(fname + '.new', 'w')
if not isinstance(string, str):
string = decode_it(string)
f.write(string)
f.close()
os.rename(fname + '.new', fname)
except:
if os.path.exists(fname + '.new'):
os.unlink(fname + '.new')
raise
def store_write_project(dir, project):
store_write_string(dir, '_project', project + '\n')
def store_write_apiurl(dir, apiurl):
import warnings
warnings.warn(
"osc.core.store_write_apiurl() is deprecated. "
"You should be using high-level classes such as Store, Project or Package instead.",
DeprecationWarning
)
Store(dir).apiurl = apiurl
def store_write_last_buildroot(dir, repo, arch, vm_type):
store_write_string(dir, '_last_buildroot', repo + '\n' + arch + '\n' + vm_type + '\n')
def store_unlink_file(dir, file):
global store
try:
os.unlink(os.path.join(dir, store, file))
except:
pass
def store_read_file(dir, file):
global store
try:
with open(os.path.join(dir, store, file)) as f:
return f.read()
except:
return None
def store_write_initial_packages(dir, project, subelements):
global store
fname = os.path.join(dir, store, '_packages')
root = ET.Element('project', name=project)
for elem in subelements:
root.append(elem)
ET.ElementTree(root).write(fname)
def delete_storedir(store_dir):
"""
This method deletes a store dir.
"""
from ..core import delete_dir
head, tail = os.path.split(store_dir)
if tail == '.osc':
delete_dir(store_dir)