1
0
mirror of https://github.com/openSUSE/osc.git synced 2024-12-25 17:36:13 +01:00

Implement Store class that will replace store_{read,write}* functions

This commit is contained in:
Daniel Mach 2022-10-24 16:39:45 +02:00
parent db06625197
commit 4e773cec32
3 changed files with 493 additions and 0 deletions

View File

@ -46,6 +46,7 @@ from . import conf
from . import meter
from . import oscerr
from .connection import http_request, http_GET, http_POST, http_PUT, http_DELETE
from .store import Store
from .util.helper import decode_list, decode_it, raw_input, _html_escape

292
osc/store.py Normal file
View File

@ -0,0 +1,292 @@
"""
Store class wraps access to files in the '.osc' directory.
It is meant to be used as an implementation detail of Project and Package classes
and shouldn't be used in any code outside osc.
"""
import os
from xml.etree import ElementTree as ET
from . import oscerr
from ._private import api
class Store:
STORE_DIR = ".osc"
STORE_VERSION = "1.0"
@classmethod
def is_project_dir(cls, path):
try:
store = cls(path)
except oscerr.NoWorkingCopy:
return False
return store.is_project
@classmethod
def is_package_dir(cls, path):
try:
store = cls(path)
except oscerr.NoWorkingCopy:
return False
return store.is_package
def __init__(self, path, check=True):
self.path = path
self.abspath = os.path.abspath(self.path)
self.is_project = self.exists("_project") and not self.exists("_package")
self.is_package = self.exists("_project") and self.exists("_package")
if check and not any([self.is_project, self.is_package]):
msg = f"Directory '{self.path}' is not a working copy"
raise oscerr.NoWorkingCopy(msg)
def __contains__(self, fn):
return self.exists(fn)
def __iter__(self):
path = os.path.join(self.abspath, self.STORE_DIR)
yield from os.listdir(path)
def assert_is_project(self):
if not self.is_project:
msg = f"Directory '{self.path}' is not a working copy of a project"
raise oscerr.NoWorkingCopy(msg)
def assert_is_package(self):
if not self.is_package:
msg = f"Directory '{self.path}' is not a working copy of a package"
raise oscerr.NoWorkingCopy(msg)
def get_path(self, fn, subdir=None):
# sanitize input to ensure that joining path works as expected
fn = fn.lstrip("/")
if subdir:
subdir = subdir.lstrip("/")
return os.path.join(self.abspath, self.STORE_DIR, subdir, fn)
return os.path.join(self.abspath, self.STORE_DIR, fn)
def exists(self, fn, subdir=None):
return os.path.exists(self.get_path(fn, subdir=subdir))
def unlink(self, fn, subdir=None):
try:
os.unlink(self.get_path(fn, subdir=subdir))
except FileNotFoundError:
pass
def read_file(self, fn, subdir=None):
if not self.exists(fn, subdir=subdir):
return None
with open(self.get_path(fn, subdir=subdir), encoding="utf-8") as f:
return f.read()
def write_file(self, fn, value, subdir=None):
if value is None:
self.unlink(fn, subdir=subdir)
return
try:
if subdir:
os.makedirs(self.get_path(subdir))
else:
os.makedirs(self.get_path(""))
except FileExistsError:
pass
old = self.get_path(fn, subdir=subdir)
new = self.get_path(f"{fn}.new", subdir=subdir)
try:
with open(new, "w", encoding="utf-8") as f:
f.write(value)
os.rename(new, old)
except:
if os.path.exists(new):
os.unlink(new)
raise
def read_list(self, fn, subdir=None):
if not self.exists(fn, subdir=subdir):
return None
with open(self.get_path(fn, subdir=subdir), encoding="utf-8") as f:
return [line.rstrip("\n") for line in f]
def write_list(self, fn, value, subdir=None):
if value is None:
self.unlink(fn, subdir=subdir)
return
if not isinstance(value, (list, tuple)):
msg = f"The argument `value` should be list, not {type(value).__name__}"
raise TypeError(msg)
value = "".join((f"{line}\n" for line in value))
self.write_file(fn, value, subdir=subdir)
def read_string(self, fn, subdir=None):
if not self.exists(fn, subdir=subdir):
return None
with open(self.get_path(fn, subdir=subdir), encoding="utf-8") as f:
return f.readline().strip()
def write_string(self, fn, value, subdir=None):
if value is None:
self.unlink(fn, subdir=subdir)
return
if isinstance(value, bytes):
value = value.decode("utf-8")
if not isinstance(value, str):
msg = f"The argument `value` should be str, not {type(value).__name__}"
raise TypeError(msg)
self.write_file(fn, value + "\n", subdir=subdir)
def read_int(self, fn):
if not self.exists(fn):
return None
result = self.read_string(fn)
if not result.isdigit():
return None
return int(result)
def write_int(self, fn, value, subdir=None):
if value is None:
self.unlink(fn, subdir=subdir)
return
if not isinstance(value, int):
msg = f"The argument `value` should be int, not {type(value).__name__}"
raise TypeError(msg)
value = str(value)
self.write_string(fn, value, subdir=subdir)
def read_xml_node(self, fn, node_name, subdir=None):
path = self.get_path(fn, subdir=subdir)
try:
tree = ET.parse(path)
except SyntaxError as e:
msg = f"Unable to parse '{path}': {e}"
raise oscerr.NoWorkingCopy(msg)
root = tree.getroot()
assert root.tag == node_name
# TODO: return root?
return tree
def write_xml_node(self, fn, node_name, node, subdir=None):
path = self.get_path(fn, subdir=subdir)
assert node.tag == node_name
api.write_xml_node_to_file(node, path)
@property
def apiurl(self):
return self.read_string("_apiurl")
@apiurl.setter
def apiurl(self, value):
self.write_string("_apiurl", value)
@property
def project(self):
return self.read_string("_project")
@project.setter
def project(self, value):
self.write_string("_project", value)
@property
def package(self):
return self.read_string("_package")
@package.setter
def package(self, value):
self.write_string("_package", value)
@property
def scmurl(self):
return self.read_string("_scm")
@scmurl.setter
def scmurl(self, value):
return self.write_string("_scm", value)
@property
def size_limit(self):
return self.read_int("_size_limit")
@size_limit.setter
def size_limit(self, value):
return self.write_int("_size_limit", value)
@property
def to_be_added(self):
self.assert_is_package()
return self.read_list("_to_be_added") or []
@to_be_added.setter
def to_be_added(self, value):
self.assert_is_package()
return self.write_list("_to_be_added", value)
@property
def to_be_deleted(self):
self.assert_is_package()
return self.read_list("_to_be_deleted") or []
@to_be_deleted.setter
def to_be_deleted(self, value):
self.assert_is_package()
return self.write_list("_to_be_deleted", value)
@property
def in_conflict(self):
self.assert_is_package()
return self.read_list("_in_conflict") or []
@in_conflict.setter
def in_conflict(self, value):
self.assert_is_package()
return self.write_list("_in_conflict", value)
@property
def osclib_version(self):
return self.read_string("_osclib_version")
@property
def files(self):
self.assert_is_package()
if self.exists("_scm"):
msg = "Package '{self.path}' is managed via SCM"
raise oscerr.NoWorkingCopy(msg)
if not self.exists("_files"):
msg = "Package '{self.path}' doesn't contain _files metadata"
raise oscerr.NoWorkingCopy(msg)
result = []
directory_node = self.read_xml_node("_files", "directory").getroot()
from . import core as osc_core
for entry_node in api.find_nodes(directory_node, "directory", "entry"):
result.append(osc_core.File.from_xml_node(entry_node))
return result
@files.setter
def files(self, value):
if not isinstance(value, (list, tuple)):
msg = f"The argument `value` should be list, not {type(value).__name__}"
raise TypeError(msg)
root = ET.Element("directory")
for file_obj in sorted(value):
file_obj.to_xml_node(root)
self.write_xml_node("_files", "directory", root)
@property
def last_buildroot(self):
self.assert_is_package()
items = self.read_list("_last_buildroot")
if items is not None and len(items) != 3:
msg = f"Package '{self.path}' contains _last_buildroot metadata that doesn't contain 3 lines: [repo, arch, vm_type]"
raise oscerr.NoWorkingCopy(msg)
return items
@last_buildroot.setter
def last_buildroot(self, value):
self.assert_is_package()
if len(value) != 3:
raise ValueError("A list with exactly 3 items is expected: [repo, arch, vm_type]")
self.write_list("_last_buildroot", value)

200
tests/test_store.py Normal file
View File

@ -0,0 +1,200 @@
import os
import sys
import tempfile
import unittest
import osc.core as osc_core
from osc.store import Store
class TestStore(unittest.TestCase):
def setUp(self):
self.tmpdir = tempfile.mkdtemp(prefix='osc_test')
self.store = Store(self.tmpdir, check=False)
self.store.is_package = True
self.store.project = "project name"
self.store.package = "package name"
def tearDown(self):
try:
shutil.rmtree(self.tmpdir)
except:
pass
def fileEquals(self, fn, expected_value):
path = os.path.join(self.tmpdir, ".osc", fn)
with open(path) as f:
actual_value = f.read()
self.assertEqual(actual_value, expected_value, f"File: {fn}")
def test_read_write_file(self):
self.store.write_file("_file", "\n\nline1\nline2")
self.fileEquals("_file", "\n\nline1\nline2")
self.assertEqual(self.store.read_file("_file"), "\n\nline1\nline2")
# writing None removes the file
self.store.write_file("_file", None)
self.assertFalse(self.store.exists("_file"))
self.assertRaises(TypeError, self.store.write_string, "_file", 123)
self.assertRaises(TypeError, self.store.write_string, "_file", ["123"])
def test_read_write_int(self):
self.store.write_int("_int", 123)
self.fileEquals("_int", "123\n")
self.assertEqual(self.store.read_int("_int"), 123)
# writing None removes the file
self.store.write_int("_int", None)
self.assertFalse(self.store.exists("_int"))
self.assertRaises(TypeError, self.store.write_int, "_int", "123")
self.assertRaises(TypeError, self.store.write_int, "_int", b"123")
self.assertRaises(TypeError, self.store.write_int, "_int", ["123"])
def test_read_write_list(self):
self.store.write_list("_list", ["one", "two", "three"])
self.fileEquals("_list", "one\ntwo\nthree\n")
self.assertEqual(self.store.read_list("_list"), ["one", "two", "three"])
# writing None removes the file
self.store.write_list("_list", None)
self.assertFalse(self.store.exists("_list"))
self.assertRaises(TypeError, self.store.write_list, "_list", "123")
self.assertRaises(TypeError, self.store.write_list, "_list", b"123")
self.assertRaises(TypeError, self.store.write_list, "_list", 123)
def test_read_write_string(self):
self.store.write_string("_string", "string")
self.fileEquals("_string", "string\n")
self.assertEqual(self.store.read_string("_string"), "string")
self.store.write_string("_bytes", b"bytes")
self.fileEquals("_bytes", "bytes\n")
self.assertEqual(self.store.read_string("_bytes"), "bytes")
# writing None removes the file
self.store.write_string("_string", None)
self.assertFalse(self.store.exists("_string"))
self.assertRaises(TypeError, self.store.write_string, "_string", 123)
self.assertRaises(TypeError, self.store.write_string, "_string", ["123"])
def test_contains(self):
self.assertTrue("_project" in self.store)
self.assertTrue("_package" in self.store)
self.assertFalse("_foo" in self.store)
def test_iter(self):
self.assertEqual(len(list(self.store)), 2)
for fn in self.store:
self.assertIn(fn, ["_project", "_package"])
def test_apiurl(self):
self.store.apiurl = "https://example.com"
self.fileEquals("_apiurl", "https://example.com\n")
store2 = Store(self.tmpdir)
self.assertEqual(store2.apiurl, "https://example.com")
def test_package(self):
self.fileEquals("_package", "package name\n")
store2 = Store(self.tmpdir)
self.assertEqual(store2.package, "package name")
def test_project(self):
self.fileEquals("_project", "project name\n")
store2 = Store(self.tmpdir)
self.assertEqual(store2.project, "project name")
def test_scmurl(self):
self.store.scmurl = "https://example.com/project.git"
self.fileEquals("_scm", "https://example.com/project.git\n")
store2 = Store(self.tmpdir)
self.assertEqual(store2.scmurl, "https://example.com/project.git")
def test_size_limit(self):
self.store.size_limit = 123
self.fileEquals("_size_limit", "123\n")
store2 = Store(self.tmpdir)
self.assertEqual(store2.size_limit, 123)
def test_to_be_added(self):
self.store.to_be_added = ["foo", "bar", "baz"]
self.fileEquals("_to_be_added", "foo\nbar\nbaz\n")
store2 = Store(self.tmpdir)
self.assertEqual(store2.to_be_added, ["foo", "bar", "baz"])
def test_to_be_deleted(self):
self.store.to_be_deleted = ["foo", "bar", "baz"]
self.fileEquals("_to_be_deleted", "foo\nbar\nbaz\n")
store2 = Store(self.tmpdir)
self.assertEqual(store2.to_be_deleted, ["foo", "bar", "baz"])
def test_in_conflict(self):
self.store.in_conflict = ["foo", "bar", "baz"]
self.fileEquals("_in_conflict", "foo\nbar\nbaz\n")
store2 = Store(self.tmpdir)
self.assertEqual(store2.in_conflict, ["foo", "bar", "baz"])
def test_osclib_version(self):
# no setter, users are not supposed to set the version
self.assertRaises(AttributeError, setattr, self.store, "osclib_version", "123")
self.store.write_string("_osclib_version", "123")
self.fileEquals("_osclib_version", "123\n")
store2 = Store(self.tmpdir)
self.assertEqual(store2.osclib_version, "123")
def test_files(self):
files = [
osc_core.File(name="foo", md5="aabbcc", size=1, mtime=2),
osc_core.File(name="bar", md5="ddeeff", size=3, mtime=4, skipped=True),
]
self.store.files = files
expected = """
<directory>
<entry name="bar" md5="ddeeff" size="3" mtime="4" skipped="true" />
<entry name="foo" md5="aabbcc" size="1" mtime="2" />
</directory>""".lstrip()
if sys.version_info[:2] <= (3, 7):
# ElementTree doesn't preserve attribute order on py <= 3.7; https://bugs.python.org/issue34160
expected = """
<directory>
<entry md5="ddeeff" mtime="4" name="bar" size="3" skipped="true" />
<entry md5="aabbcc" mtime="2" name="foo" size="1" />
</directory>""".lstrip()
self.fileEquals("_files", expected)
store2 = Store(self.tmpdir)
files2 = store2.files
# files got ordered
self.assertTrue(files2[0] == files[1])
self.assertTrue(files2[1] == files[0])
def test_last_buildroot(self):
self.store.last_buildroot = "repo", "arch", "vm_type"
self.fileEquals("_last_buildroot", "repo\narch\nvm_type\n")
self.assertRaises(ValueError, setattr, self.store, "last_buildroot", ["one"])
self.assertRaises(ValueError, setattr, self.store, "last_buildroot", ["one", "two"])
self.assertRaises(ValueError, setattr, self.store, "last_buildroot", ["one", "two", "three", "four"])
store2 = Store(self.tmpdir)
self.assertEqual(store2.last_buildroot, ["repo", "arch", "vm_type"])
if __name__ == "__main__":
unittest.main()