1
0
mirror of https://github.com/openSUSE/osc.git synced 2024-11-15 16:46:14 +01:00
github.com_openSUSE_osc/osc/obs_scm/project.py

627 lines
27 KiB
Python

import fnmatch
import os
from pathlib import Path
from typing import Optional
from .. import conf
from .. import oscerr
from ..util.xml import ET
from .store import Store
from .store import delete_storedir
from .store import store
from .store import store_read_package
from .store import store_read_project
from .store import store_write_initial_packages
from .store import store_write_project
from .store import store_write_string
from .store import is_package_dir
class Project:
"""
Represent a checked out project directory, holding packages.
:Attributes:
``dir``
The directory path containing the project.
``name``
The name of the project.
``apiurl``
The endpoint URL of the API server.
``pacs_available``
List of names of packages available server-side.
This is only populated if ``getPackageList`` is set
to ``True`` in the constructor.
``pacs_have``
List of names of packages which exist server-side
and exist in the local project working copy (if
'do_package_tracking' is disabled).
If 'do_package_tracking' is enabled it represents the
list names of packages which are tracked in the project
working copy (that is it might contain packages which
exist on the server as well as packages which do not
exist on the server (for instance if the local package
was added or if the package was removed on the server-side)).
``pacs_excluded``
List of names of packages in the local project directory
which are excluded by the `exclude_glob` configuration
variable. Only set if `do_package_tracking` is enabled.
``pacs_unvers``
List of names of packages in the local project directory
which are not tracked. Only set if `do_package_tracking`
is enabled.
``pacs_broken``
List of names of packages which are tracked but do not
exist in the local project working copy. Only set if
`do_package_tracking` is enabled.
``pacs_missing``
List of names of packages which exist server-side but
are not expected to exist in the local project directory.
"""
REQ_STOREFILES = ('_project', '_apiurl')
def __init__(self, dir, getPackageList=True, progress_obj=None, wc_check=True):
"""
Constructor.
:Parameters:
`dir` : str
The directory path containing the checked out project.
`getPackageList` : bool
Set to `False` if you want to skip retrieval from the
server of the list of packages in the project .
`wc_check` : bool
"""
from ..core import meta_get_packagelist
self.dir = Path(dir)
self.absdir = os.path.abspath(dir)
self.store = Store(dir)
self.progress_obj = progress_obj
self.name = store_read_project(self.dir)
self.scm_url = self.store.scmurl
self.apiurl = self.store.apiurl
dirty_files = []
if wc_check:
dirty_files = self.wc_check()
if dirty_files:
msg = 'Your working copy \'%s\' is in an inconsistent state.\n' \
'Please run \'osc repairwc %s\' and check the state\n' \
'of the working copy afterwards (via \'osc status %s\')' % (self.dir, self.dir, self.dir)
raise oscerr.WorkingCopyInconsistent(self.name, None, dirty_files, msg)
if getPackageList:
self.pacs_available = meta_get_packagelist(self.apiurl, self.name)
else:
self.pacs_available = []
if conf.config['do_package_tracking']:
self.pac_root = self.read_packages().getroot()
self.pacs_have = [pac.get('name') for pac in self.pac_root.findall('package')]
self.pacs_excluded = [i for i in os.listdir(self.dir)
for j in conf.config['exclude_glob']
if fnmatch.fnmatch(i, j)]
self.pacs_unvers = [i for i in os.listdir(self.dir) if i not in self.pacs_have and i not in self.pacs_excluded]
# store all broken packages (e.g. packages which where removed by a non-osc cmd)
# in the self.pacs_broken list
self.pacs_broken = []
for p in self.pacs_have:
if not os.path.isdir(os.path.join(self.absdir, p)):
# all states will be replaced with the '!'-state
# (except it is already marked as deleted ('D'-state))
self.pacs_broken.append(p)
else:
self.pacs_have = [i for i in os.listdir(self.dir) if i in self.pacs_available]
self.pacs_missing = [i for i in self.pacs_available if i not in self.pacs_have]
def wc_check(self):
global store
dirty_files = []
req_storefiles = Project.REQ_STOREFILES
if conf.config['do_package_tracking'] and self.scm_url is None:
req_storefiles += ('_packages',)
for fname in req_storefiles:
if not os.path.exists(os.path.join(self.absdir, store, fname)):
dirty_files.append(fname)
return dirty_files
def wc_repair(self, apiurl: Optional[str] = None):
store = Store(self.dir)
store.assert_is_project()
if not store.exists("_apiurl") or apiurl:
if apiurl is None:
msg = 'cannot repair wc: the \'_apiurl\' file is missing but ' \
'no \'apiurl\' was passed to wc_repair'
# hmm should we raise oscerr.WrongArgs?
raise oscerr.WorkingCopyInconsistent(self.name, None, [], msg)
# sanity check
conf.parse_apisrv_url(None, apiurl)
store.apiurl = apiurl
self.apiurl = apiurl
def checkout_missing_pacs(self, sinfos, expand_link=False, unexpand_link=False):
from ..core import checkout_package
from ..core import getTransActPath
for pac in self.pacs_missing:
if conf.config['do_package_tracking'] and pac in self.pacs_unvers:
# pac is not under version control but a local file/dir exists
msg = f'can\'t add package \'{pac}\': Object already exists'
raise oscerr.PackageExists(self.name, pac, msg)
if not (expand_link or unexpand_link):
sinfo = sinfos.get(pac)
if sinfo is None:
# should never happen...
continue
linked = sinfo.find('linked')
if linked is not None and linked.get('project') == self.name:
# hmm what about a linkerror (sinfo.get('lsrcmd5') is None)?
# Should we skip the package as well or should we it out?
# let's skip it for now
print(f"Skipping {pac} (link to package {linked.get('package')})")
continue
print(f'checking out new package {pac}')
checkout_package(self.apiurl, self.name, pac,
pathname=getTransActPath(os.path.join(self.dir, pac)),
prj_obj=self, prj_dir=self.dir,
expand_link=expand_link or not unexpand_link, progress_obj=self.progress_obj)
def status(self, pac: str):
exists = os.path.exists(os.path.join(self.absdir, pac))
st = self.get_state(pac)
if st is None and exists:
return '?'
elif st is None:
raise oscerr.OscIOError(None, f'osc: \'{pac}\' is not under version control')
elif st in ('A', ' ') and not exists:
return '!'
elif st == 'D' and not exists:
return 'D'
else:
return st
def get_status(self, *exclude_states):
res = []
for pac in self.pacs_have:
st = self.status(pac)
if st not in exclude_states:
res.append((st, pac))
if '?' not in exclude_states:
res.extend([('?', pac) for pac in self.pacs_unvers])
return res
def get_pacobj(self, pac, *pac_args, **pac_kwargs):
from ..core import Package
try:
st = self.status(pac)
if st in ('?', '!') or st == 'D' and not os.path.exists(os.path.join(self.dir, pac)):
return None
return Package(os.path.join(self.dir, pac), *pac_args, **pac_kwargs)
except oscerr.OscIOError:
return None
def set_state(self, pac, state):
node = self.get_package_node(pac)
if node is None:
self.new_package_entry(pac, state)
else:
node.set('state', state)
def get_package_node(self, pac: str):
for node in self.pac_root.findall('package'):
if pac == node.get('name'):
return node
return None
def del_package_node(self, pac):
for node in self.pac_root.findall('package'):
if pac == node.get('name'):
self.pac_root.remove(node)
def get_state(self, pac: str):
node = self.get_package_node(pac)
if node is not None:
return node.get('state')
else:
return None
def new_package_entry(self, name, state):
ET.SubElement(self.pac_root, 'package', name=name, state=state)
def read_packages(self):
"""
Returns an ``xml.etree.ElementTree`` object representing the
parsed contents of the project's ``.osc/_packages`` XML file.
"""
from ..core import Package
from ..core import meta_get_packagelist
global store
packages_file = os.path.join(self.absdir, store, '_packages')
if os.path.isfile(packages_file) and os.path.getsize(packages_file):
try:
result = ET.parse(packages_file)
except:
msg = f'Cannot read package file \'{packages_file}\'. '
msg += 'You can try to remove it and then run osc repairwc.'
raise oscerr.OscIOError(None, msg)
return result
else:
# scan project for existing packages and migrate them
cur_pacs = []
for data in os.listdir(self.dir):
pac_dir = os.path.join(self.absdir, data)
# we cannot use self.pacs_available because we cannot guarantee that the package list
# was fetched from the server
if data in meta_get_packagelist(self.apiurl, self.name) and is_package_dir(pac_dir) \
and Package(pac_dir).name == data:
cur_pacs.append(ET.Element('package', name=data, state=' '))
store_write_initial_packages(self.absdir, self.name, cur_pacs)
return ET.parse(os.path.join(self.absdir, store, '_packages'))
def write_packages(self):
from ..core import ET_ENCODING
from ..core import xmlindent
xmlindent(self.pac_root)
store_write_string(self.absdir, '_packages', ET.tostring(self.pac_root, encoding=ET_ENCODING))
def addPackage(self, pac):
for i in conf.config['exclude_glob']:
if fnmatch.fnmatch(pac, i):
msg = f'invalid package name: \'{pac}\' (see \'exclude_glob\' config option)'
raise oscerr.OscIOError(None, msg)
state = self.get_state(pac)
if state is None or state == 'D':
self.new_package_entry(pac, 'A')
self.write_packages()
# sometimes the new pac doesn't exist in the list because
# it would take too much time to update all data structs regularly
if pac in self.pacs_unvers:
self.pacs_unvers.remove(pac)
else:
raise oscerr.PackageExists(self.name, pac, f'package \'{pac}\' is already under version control')
def delPackage(self, pac, force=False):
from ..core import delete_dir
from ..core import getTransActPath
from ..core import statfrmt
state = self.get_state(pac.name)
can_delete = True
if state == ' ' or state == 'D':
del_files = []
for filename in pac.filenamelist + pac.filenamelist_unvers:
filestate = pac.status(filename)
if filestate == 'M' or filestate == 'C' or \
filestate == 'A' or filestate == '?':
can_delete = False
else:
del_files.append(filename)
if can_delete or force:
for filename in del_files:
pac.delete_localfile(filename)
if pac.status(filename) != '?':
# this is not really necessary
pac.put_on_deletelist(filename)
print(statfrmt('D', getTransActPath(os.path.join(pac.dir, filename))))
print(statfrmt('D', getTransActPath(os.path.join(pac.dir, os.pardir, pac.name))))
pac.write_deletelist()
self.set_state(pac.name, 'D')
self.write_packages()
else:
print(f'package \'{pac.name}\' has local modifications (see osc st for details)')
elif state == 'A':
if force:
delete_dir(pac.absdir)
self.del_package_node(pac.name)
self.write_packages()
print(statfrmt('D', pac.name))
else:
print(f'package \'{pac.name}\' has local modifications (see osc st for details)')
elif state is None:
print('package is not under version control')
else:
print('unsupported state')
def update(self, pacs=(), expand_link=False, unexpand_link=False, service_files=False):
from ..core import Package
from ..core import checkout_package
from ..core import get_project_sourceinfo
from ..core import getTransActPath
from ..core import show_upstream_xsrcmd5
if pacs:
for pac in pacs:
Package(os.path.join(self.dir, pac), progress_obj=self.progress_obj).update()
else:
# we need to make sure that the _packages file will be written (even if an exception
# occurs)
try:
# update complete project
# packages which no longer exists upstream
upstream_del = [pac for pac in self.pacs_have if pac not in self.pacs_available and self.get_state(pac) != 'A']
sinfo_pacs = [pac for pac in self.pacs_have if self.get_state(pac) in (' ', 'D') and pac not in self.pacs_broken]
sinfo_pacs.extend(self.pacs_missing)
sinfos = get_project_sourceinfo(self.apiurl, self.name, True, *sinfo_pacs)
for pac in upstream_del:
if self.status(pac) != '!':
p = Package(os.path.join(self.dir, pac))
self.delPackage(p, force=True)
delete_storedir(p.storedir)
try:
os.rmdir(pac)
except:
pass
self.pac_root.remove(self.get_package_node(pac))
self.pacs_have.remove(pac)
for pac in self.pacs_have:
state = self.get_state(pac)
if pac in self.pacs_broken:
if self.get_state(pac) != 'A':
checkout_package(self.apiurl, self.name, pac,
pathname=getTransActPath(os.path.join(self.dir, pac)), prj_obj=self,
prj_dir=self.dir, expand_link=not unexpand_link, progress_obj=self.progress_obj)
elif state == ' ':
# do a simple update
p = Package(os.path.join(self.dir, pac), progress_obj=self.progress_obj)
rev = None
needs_update = True
if p.scm_url is not None:
# git managed.
print("Skipping git managed package ", pac)
continue
elif expand_link and p.islink() and not p.isexpanded():
if p.haslinkerror():
try:
rev = show_upstream_xsrcmd5(p.apiurl, p.prjname, p.name, revision=p.rev)
except:
rev = show_upstream_xsrcmd5(p.apiurl, p.prjname, p.name, revision=p.rev, linkrev="base")
p.mark_frozen()
else:
rev = p.linkinfo.xsrcmd5
print('Expanding to rev', rev)
elif unexpand_link and p.islink() and p.isexpanded():
rev = p.linkinfo.lsrcmd5
print('Unexpanding to rev', rev)
elif p.islink() and p.isexpanded():
needs_update = p.update_needed(sinfos[p.name])
if needs_update:
rev = p.latest_rev()
elif p.hasserviceinfo() and p.serviceinfo.isexpanded() and not service_files:
# FIXME: currently, do_update does not propagate the --server-side-source-service-files
# option to this method. Consequence: an expanded service is always unexpanded during
# an update (TODO: discuss if this is a reasonable behavior (at least this the default
# behavior for a while))
needs_update = True
else:
needs_update = p.update_needed(sinfos[p.name])
print(f'Updating {p.name}')
if needs_update:
p.update(rev, service_files)
else:
print(f'At revision {p.rev}.')
if unexpand_link:
p.unmark_frozen()
elif state == 'D':
# pac exists (the non-existent pac case was handled in the first if block)
p = Package(os.path.join(self.dir, pac), progress_obj=self.progress_obj)
if p.update_needed(sinfos[p.name]):
p.update()
elif state == 'A' and pac in self.pacs_available:
# file/dir called pac already exists and is under version control
msg = f'can\'t add package \'{pac}\': Object already exists'
raise oscerr.PackageExists(self.name, pac, msg)
elif state == 'A':
# do nothing
pass
else:
print(f'unexpected state.. package \'{pac}\'')
self.checkout_missing_pacs(sinfos, expand_link, unexpand_link)
finally:
self.write_packages()
def commit(self, pacs=(), msg='', files=None, verbose=False, skip_local_service_run=False, can_branch=False, force=False):
from ..core import Package
from ..core import os_path_samefile
files = files or {}
if pacs:
try:
for pac in pacs:
todo = []
if pac in files:
todo = files[pac]
state = self.get_state(pac)
if state == 'A':
self.commitNewPackage(pac, msg, todo, verbose=verbose, skip_local_service_run=skip_local_service_run)
elif state == 'D':
self.commitDelPackage(pac, force=force)
elif state == ' ':
# display the correct dir when sending the changes
if os_path_samefile(os.path.join(self.dir, pac), os.getcwd()):
p = Package('.')
else:
p = Package(os.path.join(self.dir, pac))
p.todo = todo
p.commit(msg, verbose=verbose, skip_local_service_run=skip_local_service_run, can_branch=can_branch, force=force)
elif pac in self.pacs_unvers and not is_package_dir(os.path.join(self.dir, pac)):
print(f'osc: \'{pac}\' is not under version control')
elif pac in self.pacs_broken or not os.path.exists(os.path.join(self.dir, pac)):
print(f'osc: \'{pac}\' package not found')
elif state is None:
self.commitExtPackage(pac, msg, todo, verbose=verbose, skip_local_service_run=skip_local_service_run)
finally:
self.write_packages()
else:
# if we have packages marked as '!' we cannot commit
for pac in self.pacs_broken:
if self.get_state(pac) != 'D':
msg = f'commit failed: package \'{pac}\' is missing'
raise oscerr.PackageMissing(self.name, pac, msg)
try:
for pac in self.pacs_have:
state = self.get_state(pac)
if state == ' ':
# do a simple commit
Package(os.path.join(self.dir, pac)).commit(msg, verbose=verbose, skip_local_service_run=skip_local_service_run)
elif state == 'D':
self.commitDelPackage(pac, force=force)
elif state == 'A':
self.commitNewPackage(pac, msg, verbose=verbose, skip_local_service_run=skip_local_service_run)
finally:
self.write_packages()
def commitNewPackage(self, pac, msg='', files=None, verbose=False, skip_local_service_run=False):
"""creates and commits a new package if it does not exist on the server"""
from ..core import Package
from ..core import edit_meta
from ..core import os_path_samefile
from ..core import statfrmt
files = files or []
if pac in self.pacs_available:
print(f'package \'{pac}\' already exists')
else:
user = conf.get_apiurl_usr(self.apiurl)
edit_meta(metatype='pkg',
path_args=(self.name, pac),
template_args=({
'name': pac,
'user': user}),
apiurl=self.apiurl)
# display the correct dir when sending the changes
olddir = os.getcwd()
if os_path_samefile(os.path.join(self.dir, pac), os.curdir):
os.chdir(os.pardir)
p = Package(pac)
else:
p = Package(os.path.join(self.dir, pac))
p.todo = files
print(statfrmt('Sending', os.path.normpath(p.dir)))
p.commit(msg=msg, verbose=verbose, skip_local_service_run=skip_local_service_run)
self.set_state(pac, ' ')
os.chdir(olddir)
def commitDelPackage(self, pac, force=False):
"""deletes a package on the server and in the working copy"""
from ..core import Package
from ..core import delete_package
from ..core import getTransActPath
from ..core import os_path_samefile
from ..core import statfrmt
try:
# display the correct dir when sending the changes
if os_path_samefile(os.path.join(self.dir, pac), os.curdir):
pac_dir = pac
else:
pac_dir = os.path.join(self.dir, pac)
p = Package(os.path.join(self.dir, pac))
# print statfrmt('Deleting', os.path.normpath(os.path.join(p.dir, os.pardir, pac)))
delete_storedir(p.storedir)
try:
os.rmdir(p.dir)
except:
pass
except OSError:
pac_dir = os.path.join(self.dir, pac)
except (oscerr.NoWorkingCopy, oscerr.WorkingCopyOutdated, oscerr.PackageError):
pass
# print statfrmt('Deleting', getTransActPath(os.path.join(self.dir, pac)))
print(statfrmt('Deleting', getTransActPath(pac_dir)))
delete_package(self.apiurl, self.name, pac, force=force)
self.del_package_node(pac)
def commitExtPackage(self, pac, msg, files=None, verbose=False, skip_local_service_run=False):
"""commits a package from an external project"""
from ..core import Package
from ..core import edit_meta
from ..core import meta_exists
from ..core import os_path_samefile
files = files or []
if os_path_samefile(os.path.join(self.dir, pac), os.getcwd()):
pac_path = '.'
else:
pac_path = os.path.join(self.dir, pac)
store = Store(pac_path)
project = store_read_project(pac_path)
package = store_read_package(pac_path)
apiurl = store.apiurl
if not meta_exists(metatype='pkg',
path_args=(project, package),
template_args=None, create_new=False, apiurl=apiurl):
user = conf.get_apiurl_usr(self.apiurl)
edit_meta(metatype='pkg',
path_args=(project, package),
template_args=({'name': pac, 'user': user}), apiurl=apiurl)
p = Package(pac_path)
p.todo = files
p.commit(msg=msg, verbose=verbose, skip_local_service_run=skip_local_service_run)
def __str__(self):
r = []
r.append('*****************************************************')
r.append(f'Project {self.name} (dir={self.dir}, absdir={self.absdir})')
r.append(f"have pacs:\n{', '.join(self.pacs_have)}")
r.append(f"missing pacs:\n{', '.join(self.pacs_missing)}")
r.append('*****************************************************')
return '\n'.join(r)
@staticmethod
def init_project(
apiurl: str,
dir: Path,
project,
package_tracking=True,
getPackageList=True,
progress_obj=None,
wc_check=True,
scm_url=None,
):
global store
if not os.path.exists(dir):
# use makedirs (checkout_no_colon config option might be enabled)
os.makedirs(dir)
elif not os.path.isdir(dir):
raise oscerr.OscIOError(None, f'error: \'{dir}\' is no directory')
if os.path.exists(os.path.join(dir, store)):
raise oscerr.OscIOError(None, f'error: \'{dir}\' is already an initialized osc working copy')
else:
os.mkdir(os.path.join(dir, store))
store_write_project(dir, project)
Store(dir).apiurl = apiurl
if scm_url:
Store(dir).scmurl = scm_url
package_tracking = None
if package_tracking:
store_write_initial_packages(dir, project, [])
return Project(dir, getPackageList, progress_obj, wc_check)