1
0
mirror of https://github.com/openSUSE/osc.git synced 2025-01-25 22:36:13 +01:00

Move functions manipulating store from core to obs_scm.store

This commit is contained in:
Daniel Mach 2024-04-15 15:42:20 +02:00
parent 354f4caca6
commit 59f530c793
2 changed files with 272 additions and 242 deletions

View File

@ -4,12 +4,6 @@
# either version 2, or version 3 (at your option).
# __store_version__ is to be incremented when the format of the working copy
# "store" changes in an incompatible way. Please add any needed migration
# functionality to check_store_version().
__store_version__ = '1.0'
import codecs
import copy
import datetime
@ -56,8 +50,32 @@ from .connection import http_request, http_GET, http_POST, http_PUT, http_DELETE
from .obs_scm import File
from .obs_scm import Linkinfo
from .obs_scm import Serviceinfo
from .obs_scm import Store
from .obs_scm.store import __store_version__
from .obs_scm.store import check_store_version
from .obs_scm.store import delete_storedir
from .obs_scm.store import is_package_dir
from .obs_scm.store import is_project_dir
from .obs_scm.store import read_inconflict
from .obs_scm.store import read_filemeta
from .obs_scm.store import read_sizelimit
from .obs_scm.store import read_tobeadded
from .obs_scm.store import read_tobedeleted
from .obs_scm.store import store
from .obs_scm.store import store_read_apiurl
from .obs_scm.store import store_read_file
from .obs_scm.store import store_read_last_buildroot
from .obs_scm.store import store_readlist
from .obs_scm.store import store_read_package
from .obs_scm.store import store_read_project
from .obs_scm.store import store_read_scmurl
from .obs_scm.store import store_unlink_file
from .obs_scm.store import store_write_apiurl
from .obs_scm.store import store_write_initial_packages
from .obs_scm.store import store_write_last_buildroot
from .obs_scm.store import store_write_project
from .obs_scm.store import store_write_string
from .output import sanitize_text
from .store import Store
from .util import xdg
from .util.helper import decode_list, decode_it, raw_input, _html_escape
from .util.xml import xml_indent_compat as xmlindent
@ -76,7 +94,6 @@ def cmp(a, b):
DISTURL_RE = re.compile(r"^(?P<bs>.*)://(?P<apiurl>.*?)/(?P<project>.*?)/(?P<repository>.*?)/(?P<revision>.*)-(?P<source>.*)$")
BUILDLOGURL_RE = re.compile(r"^(?P<apiurl>https?://.*?)/build/(?P<project>.*?)/(?P<repository>.*?)/(?P<arch>.*?)/(?P<package>.*?)/_log$")
BUFSIZE = 1024 * 1024
store = '.osc'
new_project_templ = """\
<project name="%(name)s">
@ -3047,20 +3064,6 @@ def shorttime(t):
return time.strftime('%b %d %Y', time.gmtime(t))
def is_project_dir(d):
global store
return os.path.exists(os.path.join(d, store, '_project')) and not \
os.path.exists(os.path.join(d, store, '_package'))
def is_package_dir(d):
global store
return os.path.exists(os.path.join(d, store, '_project')) and \
os.path.exists(os.path.join(d, store, '_package'))
def parse_disturl(disturl: str):
"""Parse a disturl, returns tuple (apiurl, project, source, repository,
revision), else raises an oscerr.WrongArgs exception
@ -3166,62 +3169,6 @@ def findpacs(files, progress_obj=None, fatal=True):
return Package.from_paths_nofail(files, progress_obj)
def read_filemeta(dir):
global store
msg = f'\'{dir}\' is not a valid working copy.'
filesmeta = os.path.join(dir, store, '_files')
if not is_package_dir(dir):
raise oscerr.NoWorkingCopy(msg)
if os.path.isfile(os.path.join(dir, store, '_scm')):
raise oscerr.NoWorkingCopy("Is managed via scm")
if not os.path.isfile(filesmeta):
raise oscerr.NoWorkingCopy(f'{msg} ({filesmeta} does not exist)')
try:
r = ET.parse(filesmeta)
except SyntaxError as e:
raise oscerr.NoWorkingCopy(f'{msg}\nWhen parsing .osc/_files, the following error was encountered:\n{e}')
return r
def store_readlist(dir, name):
global store
r = []
if os.path.exists(os.path.join(dir, store, name)):
with open(os.path.join(dir, store, name)) as f:
r = [line.rstrip('\n') for line in f]
return r
def read_tobeadded(dir):
return store_readlist(dir, '_to_be_added')
def read_tobedeleted(dir):
return store_readlist(dir, '_to_be_deleted')
def read_sizelimit(dir):
global store
r = None
fname = os.path.join(dir, store, '_size_limit')
if os.path.exists(fname):
with open(fname) as f:
r = f.readline().strip()
if r is None or not r.isdigit():
return None
return int(r)
def read_inconflict(dir):
return store_readlist(dir, '_in_conflict')
def parseargs(list_of_args):
"""Convenience method osc's commandline argument parsing.
@ -3313,35 +3260,6 @@ def makeurl(apiurl: str, path: List[str], query: Optional[dict] = None):
return urlunsplit((apiurl_scheme, apiurl_netloc, path_str, query_str, ""))
def check_store_version(dir):
global store
versionfile = os.path.join(dir, store, '_osclib_version')
try:
with open(versionfile) as f:
v = f.read().strip()
except:
v = ''
if v == '':
msg = f'Error: "{os.path.abspath(dir)}" is not an osc package working copy.'
if os.path.exists(os.path.join(dir, '.svn')):
msg = msg + '\nTry svn instead of osc.'
raise oscerr.NoWorkingCopy(msg)
if v != __store_version__:
if v in ['0.2', '0.3', '0.4', '0.5', '0.6', '0.7', '0.8', '0.9', '0.95', '0.96', '0.97', '0.98', '0.99']:
# version is fine, no migration needed
f = open(versionfile, 'w')
f.write(__store_version__ + '\n')
f.close()
return
msg = f'The osc metadata of your working copy "{dir}"'
msg += f'\nhas __store_version__ = {v}, but it should be {__store_version__}'
msg += '\nPlease do a fresh checkout or update your client. Sorry about the inconvenience.'
raise oscerr.WorkingCopyWrongVersion(msg)
def meta_get_packagelist(apiurl: str, prj, deleted=None, expand=False):
query = {}
@ -6984,132 +6902,6 @@ def rebuild(apiurl: str, prj: str, package: str, repo: str, arch: str, code=None
return root.get('code')
def store_read_project(dir):
global store
try:
with open(os.path.join(dir, store, '_project')) as f:
p = f.readline().strip()
except OSError:
msg = f'Error: \'{os.path.abspath(dir)}\' is not an osc project dir or working copy'
if os.path.exists(os.path.join(dir, '.svn')):
msg += '\nTry svn instead of osc.'
raise oscerr.NoWorkingCopy(msg)
return p
def store_read_package(dir):
global store
try:
with open(os.path.join(dir, store, '_package')) as f:
p = f.readline().strip()
except OSError:
msg = f'Error: \'{os.path.abspath(dir)}\' is not an osc package working copy'
if os.path.exists(os.path.join(dir, '.svn')):
msg += '\nTry svn instead of osc.'
raise oscerr.NoWorkingCopy(msg)
return p
def store_read_scmurl(dir):
import warnings
warnings.warn(
"osc.core.store_read_scmurl() is deprecated. "
"You should be using high-level classes such as Store, Project or Package instead.",
DeprecationWarning
)
return Store(dir).scmurl
def store_read_apiurl(dir, defaulturl=True):
import warnings
warnings.warn(
"osc.core.store_read_apiurl() is deprecated. "
"You should be using high-level classes such as Store, Project or Package instead.",
DeprecationWarning
)
return Store(dir).apiurl
def store_read_last_buildroot(dir):
global store
fname = os.path.join(dir, store, '_last_buildroot')
if os.path.exists(fname):
lines = open(fname).read().splitlines()
if len(lines) == 3:
return lines
return
def store_write_string(dir, file, string, subdir=''):
global store
if subdir and not os.path.isdir(os.path.join(dir, store, subdir)):
os.mkdir(os.path.join(dir, store, subdir))
fname = os.path.join(dir, store, subdir, file)
try:
f = open(fname + '.new', 'w')
if not isinstance(string, str):
string = decode_it(string)
f.write(string)
f.close()
os.rename(fname + '.new', fname)
except:
if os.path.exists(fname + '.new'):
os.unlink(fname + '.new')
raise
def store_write_project(dir, project):
store_write_string(dir, '_project', project + '\n')
def store_write_apiurl(dir, apiurl):
import warnings
warnings.warn(
"osc.core.store_write_apiurl() is deprecated. "
"You should be using high-level classes such as Store, Project or Package instead.",
DeprecationWarning
)
Store(dir).apiurl = apiurl
def store_write_last_buildroot(dir, repo, arch, vm_type):
store_write_string(dir, '_last_buildroot', repo + '\n' + arch + '\n' + vm_type + '\n')
def store_unlink_file(dir, file):
global store
try:
os.unlink(os.path.join(dir, store, file))
except:
pass
def store_read_file(dir, file):
global store
try:
with open(os.path.join(dir, store, file)) as f:
return f.read()
except:
return None
def store_write_initial_packages(dir, project, subelements):
global store
fname = os.path.join(dir, store, '_packages')
root = ET.Element('project', name=project)
for elem in subelements:
root.append(elem)
ET.ElementTree(root).write(fname)
def get_osc_version():
return __version__
@ -7443,15 +7235,6 @@ def delete_dir(dir):
os.rmdir(dir)
def delete_storedir(store_dir):
"""
This method deletes a store dir.
"""
head, tail = os.path.split(store_dir)
if tail == '.osc':
delete_dir(store_dir)
def unpack_srcrpm(srpm, dir, *files):
"""
This method unpacks the passed srpm into the

View File

@ -12,6 +12,12 @@ from .._private import api
from ..util.xml import ET
# __store_version__ is to be incremented when the format of the working copy
# "store" changes in an incompatible way. Please add any needed migration
# functionality to check_store_version().
__store_version__ = '1.0'
class Store:
STORE_DIR = ".osc"
STORE_VERSION = "1.0"
@ -317,3 +323,244 @@ class Store:
else:
root = self.read_xml_node("_meta", "project").getroot()
return root
store = '.osc'
def check_store_version(dir):
global store
versionfile = os.path.join(dir, store, '_osclib_version')
try:
with open(versionfile) as f:
v = f.read().strip()
except:
v = ''
if v == '':
msg = f'Error: "{os.path.abspath(dir)}" is not an osc package working copy.'
if os.path.exists(os.path.join(dir, '.svn')):
msg = msg + '\nTry svn instead of osc.'
raise oscerr.NoWorkingCopy(msg)
if v != __store_version__:
if v in ['0.2', '0.3', '0.4', '0.5', '0.6', '0.7', '0.8', '0.9', '0.95', '0.96', '0.97', '0.98', '0.99']:
# version is fine, no migration needed
f = open(versionfile, 'w')
f.write(__store_version__ + '\n')
f.close()
return
msg = f'The osc metadata of your working copy "{dir}"'
msg += f'\nhas __store_version__ = {v}, but it should be {__store_version__}'
msg += '\nPlease do a fresh checkout or update your client. Sorry about the inconvenience.'
raise oscerr.WorkingCopyWrongVersion(msg)
def is_project_dir(d):
global store
return os.path.exists(os.path.join(d, store, '_project')) and not \
os.path.exists(os.path.join(d, store, '_package'))
def is_package_dir(d):
global store
return os.path.exists(os.path.join(d, store, '_project')) and \
os.path.exists(os.path.join(d, store, '_package'))
def read_filemeta(dir):
global store
msg = f'\'{dir}\' is not a valid working copy.'
filesmeta = os.path.join(dir, store, '_files')
if not is_package_dir(dir):
raise oscerr.NoWorkingCopy(msg)
if os.path.isfile(os.path.join(dir, store, '_scm')):
raise oscerr.NoWorkingCopy("Is managed via scm")
if not os.path.isfile(filesmeta):
raise oscerr.NoWorkingCopy(f'{msg} ({filesmeta} does not exist)')
try:
r = ET.parse(filesmeta)
except SyntaxError as e:
raise oscerr.NoWorkingCopy(f'{msg}\nWhen parsing .osc/_files, the following error was encountered:\n{e}')
return r
def store_readlist(dir, name):
global store
r = []
if os.path.exists(os.path.join(dir, store, name)):
with open(os.path.join(dir, store, name)) as f:
r = [line.rstrip('\n') for line in f]
return r
def read_tobeadded(dir):
return store_readlist(dir, '_to_be_added')
def read_tobedeleted(dir):
return store_readlist(dir, '_to_be_deleted')
def read_sizelimit(dir):
global store
r = None
fname = os.path.join(dir, store, '_size_limit')
if os.path.exists(fname):
with open(fname) as f:
r = f.readline().strip()
if r is None or not r.isdigit():
return None
return int(r)
def read_inconflict(dir):
return store_readlist(dir, '_in_conflict')
def store_read_project(dir):
global store
try:
with open(os.path.join(dir, store, '_project')) as f:
p = f.readline().strip()
except OSError:
msg = f'Error: \'{os.path.abspath(dir)}\' is not an osc project dir or working copy'
if os.path.exists(os.path.join(dir, '.svn')):
msg += '\nTry svn instead of osc.'
raise oscerr.NoWorkingCopy(msg)
return p
def store_read_package(dir):
global store
try:
with open(os.path.join(dir, store, '_package')) as f:
p = f.readline().strip()
except OSError:
msg = f'Error: \'{os.path.abspath(dir)}\' is not an osc package working copy'
if os.path.exists(os.path.join(dir, '.svn')):
msg += '\nTry svn instead of osc.'
raise oscerr.NoWorkingCopy(msg)
return p
def store_read_scmurl(dir):
import warnings
warnings.warn(
"osc.core.store_read_scmurl() is deprecated. "
"You should be using high-level classes such as Store, Project or Package instead.",
DeprecationWarning
)
return Store(dir).scmurl
def store_read_apiurl(dir, defaulturl=True):
import warnings
warnings.warn(
"osc.core.store_read_apiurl() is deprecated. "
"You should be using high-level classes such as Store, Project or Package instead.",
DeprecationWarning
)
return Store(dir).apiurl
def store_read_last_buildroot(dir):
global store
fname = os.path.join(dir, store, '_last_buildroot')
if os.path.exists(fname):
lines = open(fname).read().splitlines()
if len(lines) == 3:
return lines
return
def store_write_string(dir, file, string, subdir=''):
from ..core import decode_it
global store
if subdir and not os.path.isdir(os.path.join(dir, store, subdir)):
os.mkdir(os.path.join(dir, store, subdir))
fname = os.path.join(dir, store, subdir, file)
try:
f = open(fname + '.new', 'w')
if not isinstance(string, str):
string = decode_it(string)
f.write(string)
f.close()
os.rename(fname + '.new', fname)
except:
if os.path.exists(fname + '.new'):
os.unlink(fname + '.new')
raise
def store_write_project(dir, project):
store_write_string(dir, '_project', project + '\n')
def store_write_apiurl(dir, apiurl):
import warnings
warnings.warn(
"osc.core.store_write_apiurl() is deprecated. "
"You should be using high-level classes such as Store, Project or Package instead.",
DeprecationWarning
)
Store(dir).apiurl = apiurl
def store_write_last_buildroot(dir, repo, arch, vm_type):
store_write_string(dir, '_last_buildroot', repo + '\n' + arch + '\n' + vm_type + '\n')
def store_unlink_file(dir, file):
global store
try:
os.unlink(os.path.join(dir, store, file))
except:
pass
def store_read_file(dir, file):
global store
try:
with open(os.path.join(dir, store, file)) as f:
return f.read()
except:
return None
def store_write_initial_packages(dir, project, subelements):
global store
fname = os.path.join(dir, store, '_packages')
root = ET.Element('project', name=project)
for elem in subelements:
root.append(elem)
ET.ElementTree(root).write(fname)
def delete_storedir(store_dir):
"""
This method deletes a store dir.
"""
from ..core import delete_dir
head, tail = os.path.split(store_dir)
if tail == '.osc':
delete_dir(store_dir)