1
0
mirror of https://github.com/openSUSE/osc.git synced 2025-02-24 11:12:14 +01:00

core: Add type annotations

This commit is contained in:
Enno Gotthold 2022-10-21 10:38:57 +02:00
parent e23e13062f
commit 3e1d91e4bd
No known key found for this signature in database
GPG Key ID: 18569829AD881702
2 changed files with 131 additions and 64 deletions

View File

@ -1376,12 +1376,12 @@ def main(apiurl, opts, argv):
for dep in bi.deps: for dep in bi.deps:
if dep.sysroot: if dep.sysroot:
# packages installed in sysroot subdirectory need to get a prefix for init_buildsystem # packages installed in sysroot subdirectory need to get a prefix for init_buildsystem
rpmlist.append('sysroot: %s %s\n' % (dep.name, dep.fullfilename)) rpmlist.append("sysroot: %s %s\n" % (dep.name, dep.fullfilename))
else: else:
rpmlist.append('%s %s\n' % (dep.name, dep.fullfilename)) rpmlist.append("%s %s\n" % (dep.name, dep.fullfilename))
for i in imagebins: for i in imagebins:
rpmlist.append('%s preinstallimage\n' % i) rpmlist.append("%s preinstallimage\n" % i)
rpmlist += ['%s %s\n' % (i[0], i[1]) for i in rpmlist_prefers] rpmlist += ["%s %s\n" % (i[0], i[1]) for i in rpmlist_prefers]
if imagefile: if imagefile:
rpmlist.append('preinstallimage: %s\n' % imagefile) rpmlist.append('preinstallimage: %s\n' % imagefile)
@ -1474,11 +1474,11 @@ def main(apiurl, opts, argv):
print() print()
print('The buildroot was:', build_root) print('The buildroot was:', build_root)
sys.exit(rc) sys.exit(rc)
except KeyboardInterrupt as i: except KeyboardInterrupt as keyboard_interrupt_exception:
print("keyboard interrupt, killing build ...") print("keyboard interrupt, killing build ...")
cmd.append('--kill') cmd.append('--kill')
run_external(cmd[0], *cmd[1:]) run_external(cmd[0], *cmd[1:])
raise i raise keyboard_interrupt_exception
pacdir = os.path.join(build_root, '.build.packages') pacdir = os.path.join(build_root, '.build.packages')
if os.path.islink(pacdir): if os.path.islink(pacdir):

View File

@ -31,6 +31,7 @@ import time
from functools import cmp_to_key, total_ordering from functools import cmp_to_key, total_ordering
from http.client import IncompleteRead from http.client import IncompleteRead
from io import StringIO from io import StringIO
from typing import Optional, Dict, Union, List
from urllib.parse import urlsplit, urlunsplit, urlparse, quote_plus, urlencode, unquote from urllib.parse import urlsplit, urlunsplit, urlparse, quote_plus, urlencode, unquote
from urllib.error import HTTPError from urllib.error import HTTPError
from urllib.request import pathname2url from urllib.request import pathname2url
@ -310,9 +311,9 @@ class Serviceinfo:
def __init__(self): def __init__(self):
"""creates an empty serviceinfo instance""" """creates an empty serviceinfo instance"""
self.services = [] self.services = []
self.apiurl = None self.apiurl: Optional[str] = None
self.project = None self.project: Optional[str] = None
self.package = None self.package: Optional[str] = None
def read(self, serviceinfo_node, append=False): def read(self, serviceinfo_node, append=False):
"""read in the source services ``<services>`` element passed as """read in the source services ``<services>`` element passed as
@ -351,7 +352,7 @@ class Serviceinfo:
data['command'] = command data['command'] = command
self.services.append(data) self.services.append(data)
def getProjectGlobalServices(self, apiurl, project, package): def getProjectGlobalServices(self, apiurl: str, project: str, package: str):
self.apiurl = apiurl self.apiurl = apiurl
# get all project wide services in one file, we don't store it yet # get all project wide services in one file, we don't store it yet
u = makeurl(apiurl, ['source', project, package], query='cmd=getprojectservices') u = makeurl(apiurl, ['source', project, package], query='cmd=getprojectservices')
@ -368,7 +369,7 @@ class Serviceinfo:
elif e.code != 403 and e.code != 400: elif e.code != 403 and e.code != 400:
raise e raise e
def addVerifyFile(self, serviceinfo_node, filename): def addVerifyFile(self, serviceinfo_node, filename: str):
f = open(filename, 'rb') f = open(filename, 'rb')
digest = hashlib.sha256(f.read()).hexdigest() digest = hashlib.sha256(f.read()).hexdigest()
f.close() f.close()
@ -382,7 +383,7 @@ class Serviceinfo:
r.append(s) r.append(s)
return r return r
def addDownloadUrl(self, serviceinfo_node, url_string): def addDownloadUrl(self, serviceinfo_node, url_string: str):
url = urlparse(url_string) url = urlparse(url_string)
protocol = url.scheme protocol = url.scheme
host = url.netloc host = url.netloc
@ -403,7 +404,7 @@ class Serviceinfo:
r.append(s) r.append(s)
return r return r
def addGitUrl(self, serviceinfo_node, url_string): def addGitUrl(self, serviceinfo_node, url_string: Optional[str]):
r = serviceinfo_node r = serviceinfo_node
s = ET.Element("service", name="obs_scm") s = ET.Element("service", name="obs_scm")
ET.SubElement(s, "param", name="url").text = url_string ET.SubElement(s, "param", name="url").text = url_string
@ -425,7 +426,7 @@ class Serviceinfo:
r.append(s) r.append(s)
return r return r
def execute(self, dir, callmode=None, singleservice=None, verbose=None): def execute(self, dir, callmode: Optional[str] = None, singleservice=None, verbose: Optional[bool] = None):
old_dir = os.path.join(dir, '.old') old_dir = os.path.join(dir, '.old')
# if 2 osc instances are executed at a time one, of them fails on .old file existence # if 2 osc instances are executed at a time one, of them fails on .old file existence
@ -445,8 +446,9 @@ class Serviceinfo:
shutil.rmtree(old_dir) shutil.rmtree(old_dir)
return result return result
def _execute(self, dir, old_dir, callmode=None, singleservice=None, def _execute(
verbose=None): self, dir, old_dir, callmode: Optional[str] = None, singleservice=None, verbose: Optional[bool] = None
):
# cleanup existing generated files # cleanup existing generated files
for filename in os.listdir(dir): for filename in os.listdir(dir):
if filename.startswith('_service:') or filename.startswith('_service_'): if filename.startswith('_service:') or filename.startswith('_service_'):
@ -760,7 +762,7 @@ class Project:
dirty_files.append(fname) dirty_files.append(fname)
return dirty_files return dirty_files
def wc_repair(self, apiurl=None): def wc_repair(self, apiurl: Optional[str] = None):
store = Store(self.dir) store = Store(self.dir)
store.assert_is_project() store.assert_is_project()
if not store.exists("_apiurl") or apiurl: if not store.exists("_apiurl") or apiurl:
@ -800,7 +802,7 @@ class Project:
prj_obj=self, prj_dir=self.dir, prj_obj=self, prj_dir=self.dir,
expand_link=expand_link or not unexpand_link, progress_obj=self.progress_obj) expand_link=expand_link or not unexpand_link, progress_obj=self.progress_obj)
def status(self, pac): def status(self, pac: str):
exists = os.path.exists(os.path.join(self.absdir, pac)) exists = os.path.exists(os.path.join(self.absdir, pac))
st = self.get_state(pac) st = self.get_state(pac)
if st is None and exists: if st is None and exists:
@ -840,7 +842,7 @@ class Project:
else: else:
node.set('state', state) node.set('state', state)
def get_package_node(self, pac): def get_package_node(self, pac: str):
for node in self.pac_root.findall('package'): for node in self.pac_root.findall('package'):
if pac == node.get('name'): if pac == node.get('name'):
return node return node
@ -851,7 +853,7 @@ class Project:
if pac == node.get('name'): if pac == node.get('name'):
self.pac_root.remove(node) self.pac_root.remove(node)
def get_state(self, pac): def get_state(self, pac: str):
node = self.get_package_node(pac) node = self.get_package_node(pac)
if node is not None: if node is not None:
return node.get('state') return node.get('state')
@ -1173,7 +1175,16 @@ class Project:
return '\n'.join(r) return '\n'.join(r)
@staticmethod @staticmethod
def init_project(apiurl, dir, project, package_tracking=True, getPackageList=True, progress_obj=None, wc_check=True, scm_url=None): def init_project(
apiurl: str,
dir,
project,
package_tracking=True,
getPackageList=True,
progress_obj=None,
wc_check=True,
scm_url=None,
):
global store global store
if not os.path.exists(dir): if not os.path.exists(dir):
@ -1323,7 +1334,7 @@ class Package:
dirty_files.append(fname) dirty_files.append(fname)
return dirty_files return dirty_files
def wc_repair(self, apiurl=None): def wc_repair(self, apiurl: Optional[str] = None):
store = Store(self.dir) store = Store(self.dir)
store.assert_is_package() store.assert_is_package()
if not store.exists("_apiurl") or apiurl: if not store.exists("_apiurl") or apiurl:
@ -1523,7 +1534,7 @@ class Package:
return root return root
@staticmethod @staticmethod
def commit_filelist(apiurl, project, package, filelist, msg='', user=None, **query): def commit_filelist(apiurl: str, project: str, package: str, filelist, msg="", user=None, **query):
"""send the commitlog and the local filelist to the server""" """send the commitlog and the local filelist to the server"""
if user is None: if user is None:
user = conf.get_apiurl_usr(apiurl) user = conf.get_apiurl_usr(apiurl)
@ -2616,7 +2627,7 @@ rev: %s
self.write_addlist() self.write_addlist()
@staticmethod @staticmethod
def init_package(apiurl, project, package, dir, size_limit=None, meta=False, progress_obj=None, scm_url=None): def init_package(apiurl: str, project, package, dir, size_limit=None, meta=False, progress_obj=None, scm_url=None):
global store global store
if not os.path.exists(dir): if not os.path.exists(dir):
@ -3213,7 +3224,7 @@ class Request:
return '\n'.join(lines) return '\n'.join(lines)
def create(self, apiurl, addrevision=False, enforce_branching=False): def create(self, apiurl: str, addrevision=False, enforce_branching=False):
"""create a new request""" """create a new request"""
query = {'cmd': 'create'} query = {'cmd': 'create'}
if addrevision: if addrevision:
@ -3436,7 +3447,7 @@ def pathjoin(a, *p):
return path return path
def makeurl(baseurl, l, query=None): def makeurl(baseurl: str, l, query=None):
"""Given a list of path compoments, construct a complete URL. """Given a list of path compoments, construct a complete URL.
Optional parameters for a query string can be given as a list, as a Optional parameters for a query string can be given as a list, as a
@ -3486,7 +3497,7 @@ def check_store_version(dir):
raise oscerr.WorkingCopyWrongVersion(msg) raise oscerr.WorkingCopyWrongVersion(msg)
def meta_get_packagelist(apiurl, prj, deleted=None, expand=False): def meta_get_packagelist(apiurl: str, prj, deleted=None, expand=False):
query = {} query = {}
if deleted: if deleted:
@ -3505,11 +3516,11 @@ def meta_get_packagelist(apiurl, prj, deleted=None, expand=False):
return [node.get('name') for node in root.findall('entry')] return [node.get('name') for node in root.findall('entry')]
def meta_get_filelist(apiurl, prj, package, verbose=False, expand=False, revision=None, meta=False, deleted=False): def meta_get_filelist(apiurl: str, prj, package, verbose=False, expand=False, revision=None, meta=False, deleted=False):
"""return a list of file names, """return a list of file names,
or a list File() instances if verbose=True""" or a list File() instances if verbose=True"""
query = {} query: Dict[str, Union[str, int]] = {}
if deleted: if deleted:
query['deleted'] = 1 query['deleted'] = 1
if expand: if expand:
@ -3542,7 +3553,7 @@ def meta_get_filelist(apiurl, prj, package, verbose=False, expand=False, revisio
return l return l
def meta_get_project_list(apiurl, deleted=None): def meta_get_project_list(apiurl: str, deleted=False):
query = {} query = {}
if deleted: if deleted:
query['deleted'] = 1 query['deleted'] = 1
@ -3553,7 +3564,7 @@ def meta_get_project_list(apiurl, deleted=None):
return sorted(node.get('name') for node in root if node.get('name')) return sorted(node.get('name') for node in root if node.get('name'))
def show_project_meta(apiurl, prj, rev=None, blame=None): def show_project_meta(apiurl: str, prj: str, rev=None, blame=None):
query = {} query = {}
if blame: if blame:
query['view'] = "blame" query['view'] = "blame"
@ -3580,7 +3591,7 @@ def show_project_meta(apiurl, prj, rev=None, blame=None):
return f.readlines() return f.readlines()
def show_project_conf(apiurl, prj, rev=None, blame=None): def show_project_conf(apiurl: str, prj: str, rev=None, blame=None):
query = {} query = {}
url = None url = None
if rev: if rev:
@ -3594,7 +3605,7 @@ def show_project_conf(apiurl, prj, rev=None, blame=None):
return f.readlines() return f.readlines()
def show_package_trigger_reason(apiurl, prj, pac, repo, arch): def show_package_trigger_reason(apiurl: str, prj: str, pac: str, repo: str, arch: str):
url = makeurl(apiurl, ['build', prj, repo, arch, pac, '_reason']) url = makeurl(apiurl, ['build', prj, repo, arch, pac, '_reason'])
try: try:
f = http_GET(url) f = http_GET(url)
@ -3604,8 +3615,8 @@ def show_package_trigger_reason(apiurl, prj, pac, repo, arch):
raise raise
def show_package_meta(apiurl, prj, pac, meta=False, blame=None): def show_package_meta(apiurl: str, prj: str, pac: str, meta=False, blame=None):
query = {} query: Dict[str, Union[str, int]] = {}
if meta: if meta:
query['meta'] = 1 query['meta'] = 1
if blame: if blame:
@ -3621,7 +3632,7 @@ def show_package_meta(apiurl, prj, pac, meta=False, blame=None):
raise raise
def show_attribute_meta(apiurl, prj, pac, subpac, attribute, with_defaults, with_project): def show_attribute_meta(apiurl: str, prj: str, pac, subpac, attribute, with_defaults, with_project):
path = [] path = []
path.append('source') path.append('source')
path.append(prj) path.append(prj)
@ -4968,11 +4979,23 @@ def get_source_file_diff(dir, filename, rev, oldfilename=None, olddir=None, orig
return d return d
def server_diff(apiurl, def server_diff(
old_project, old_package, old_revision, apiurl: str,
new_project, new_package, new_revision, old_project: str,
unified=False, missingok=False, meta=False, expand=True, onlyissues=False, full=True, xml=False): old_package: str,
query = {'cmd': 'diff'} old_revision: str,
new_project: str,
new_package: str,
new_revision: str,
unified=False,
missingok=False,
meta=False,
expand=True,
onlyissues=False,
full=True,
xml=False,
):
query: Dict[str, Union[str, int]] = {"cmd": "diff"}
if expand: if expand:
query['expand'] = 1 query['expand'] = 1
if old_project: if old_project:
@ -5019,10 +5042,21 @@ def server_diff(apiurl,
return f.read() return f.read()
def server_diff_noex(apiurl, def server_diff_noex(
old_project, old_package, old_revision, apiurl: str,
new_project, new_package, new_revision, old_project: str,
unified=False, missingok=False, meta=False, expand=True, onlyissues=False, xml=False): old_package: str,
old_revision: str,
new_project: str,
new_package: str,
new_revision: str,
unified=False,
missingok=False,
meta=False,
expand=True,
onlyissues=False,
xml=False,
):
try: try:
return server_diff(apiurl, return server_diff(apiurl,
old_project, old_package, old_revision, old_project, old_package, old_revision,
@ -5048,7 +5082,7 @@ def server_diff_noex(apiurl,
except: except:
elm = ET.fromstring(body).find('summary') elm = ET.fromstring(body).find('summary')
summary = '' summary = ''
if elm is not None: if elm is not None and elm.text is not None:
summary = elm.text summary = elm.text
return b'error: diffing failed: %s' % summary.encode() return b'error: diffing failed: %s' % summary.encode()
return rdiff return rdiff
@ -5111,7 +5145,9 @@ def submit_action_diff(apiurl, action):
raise e raise e
def make_dir(apiurl, project, package, pathname=None, prj_dir=None, package_tracking=True, pkg_path=None): def make_dir(
apiurl: str, project: str, package: str, pathname=None, prj_dir=None, package_tracking=True, pkg_path=None
):
""" """
creates the plain directory structure for a package dir. creates the plain directory structure for a package dir.
The 'apiurl' parameter is needed for the project dir initialization. The 'apiurl' parameter is needed for the project dir initialization.
@ -5158,9 +5194,22 @@ def make_dir(apiurl, project, package, pathname=None, prj_dir=None, package_trac
return pkg_path return pkg_path
def checkout_package(apiurl, project, package, def checkout_package(
revision=None, pathname=None, prj_obj=None, apiurl: str,
expand_link=False, prj_dir=None, server_service_files=None, service_files=None, progress_obj=None, size_limit=None, meta=False, outdir=None): project: str,
package: str,
revision=None,
pathname=None,
prj_obj=None,
expand_link=False,
prj_dir=None,
server_service_files=None,
service_files=None,
progress_obj=None,
size_limit=None,
meta=False,
outdir=None,
):
try: try:
# the project we're in might be deleted. # the project we're in might be deleted.
# that'll throw an error then. # that'll throw an error then.
@ -5220,10 +5269,11 @@ def checkout_package(apiurl, project, package,
# exists # exists
meta_data = b''.join(show_package_meta(apiurl, quote_plus(project), quote_plus(package))) meta_data = b''.join(show_package_meta(apiurl, quote_plus(project), quote_plus(package)))
root = ET.fromstring(meta_data) root = ET.fromstring(meta_data)
if root.find('scmsync') is not None and root.find('scmsync').text is not None: scmsync_element = root.find("scmsync")
if scmsync_element is not None and scmsync_element.text is not None:
if not os.path.isfile('/usr/lib/obs/service/obs_scm_bridge'): if not os.path.isfile('/usr/lib/obs/service/obs_scm_bridge'):
raise oscerr.OscIOError(None, 'Install the obs-scm-bridge package to work on packages managed in scm (git)!') raise oscerr.OscIOError(None, 'Install the obs-scm-bridge package to work on packages managed in scm (git)!')
scm_url = root.find('scmsync').text scm_url = scmsync_element.text
directory = make_dir(apiurl, project, package, pathname, prj_dir, conf.config['do_package_tracking'], outdir) directory = make_dir(apiurl, project, package, pathname, prj_dir, conf.config['do_package_tracking'], outdir)
os.putenv("OSC_VERSION", get_osc_version()) os.putenv("OSC_VERSION", get_osc_version())
run_external(['/usr/lib/obs/service/obs_scm_bridge', '--outdir', directory, '--url', scm_url]) run_external(['/usr/lib/obs/service/obs_scm_bridge', '--outdir', directory, '--url', scm_url])
@ -5885,7 +5935,9 @@ def get_repos_of_project(apiurl, prj):
yield Repo(node.get('name'), node2.text) yield Repo(node.get('name'), node2.text)
def get_binarylist(apiurl, prj, repo, arch, package=None, verbose=False, withccache=False): def get_binarylist(
apiurl: str, prj: str, repo: str, arch: str, package: Optional[str] = None, verbose=False, withccache=False
):
what = package or '_repository' what = package or '_repository'
query = {} query = {}
if withccache: if withccache:
@ -5914,7 +5966,18 @@ def get_binarylist_published(apiurl, prj, repo, arch):
return r return r
def show_results_meta(apiurl, prj, package=None, lastbuild=None, repository=None, arch=None, oldstate=None, multibuild=False, locallink=False, code=None): def show_results_meta(
apiurl: str,
prj: str,
package: Optional[str] = None,
lastbuild: Optional[str] = None,
repository: Optional[List[str]] = None,
arch: Optional[List[str]] = None,
oldstate: Optional[str] = None,
multibuild=False,
locallink=False,
code: Optional[str] = None,
):
repository = repository or [] repository = repository or []
arch = arch or [] arch = arch or []
query = [] query = []
@ -8140,7 +8203,7 @@ def filter_role(meta, user, role):
root.remove(node) root.remove(node)
def find_default_project(apiurl=None, package=None): def find_default_project(apiurl: Optional[str] = None, package: Optional[str] = None):
""" """
look though the list of conf.config['getpac_default_project'] look though the list of conf.config['getpac_default_project']
and find the first project where the given package exists in the build service. and find the first project where the given package exists in the build service.
@ -8174,7 +8237,7 @@ def utime(filename, arg, ignore_einval=True):
raise raise
def which(name): def which(name: str):
"""Searches "name" in PATH.""" """Searches "name" in PATH."""
name = os.path.expanduser(name) name = os.path.expanduser(name)
if os.path.isabs(name): if os.path.isabs(name):
@ -8188,13 +8251,13 @@ def which(name):
return None return None
def get_comments(apiurl, kind, *args): def get_comments(apiurl: str, kind, *args):
url = makeurl(apiurl, ('comments', kind) + args) url = makeurl(apiurl, ('comments', kind) + args)
f = http_GET(url) f = http_GET(url)
return ET.parse(f).getroot() return ET.parse(f).getroot()
def print_comments(apiurl, kind, *args): def print_comments(apiurl: str, kind, *args):
def print_rec(comments, indent=''): def print_rec(comments, indent=''):
for comment in comments: for comment in comments:
print(indent, end='') print(indent, end='')
@ -8210,30 +8273,34 @@ def print_comments(apiurl, kind, *args):
print_rec(comments) print_rec(comments)
def create_comment(apiurl, kind, comment, *args, **kwargs): def create_comment(apiurl: str, kind, comment, *args, **kwargs) -> Optional[str]:
query = {} query = {}
if kwargs.get('parent') is not None: if kwargs.get('parent') is not None:
query = {'parent_id': kwargs['parent']} query = {'parent_id': kwargs['parent']}
u = makeurl(apiurl, ('comments', kind) + args, query=query) u = makeurl(apiurl, ('comments', kind) + args, query=query)
f = http_POST(u, data=comment) f = http_POST(u, data=comment)
ret = ET.fromstring(f.read()).find('summary') ret = ET.fromstring(f.read()).find('summary')
if ret is None:
return None
return ret.text return ret.text
def delete_comment(apiurl, cid): def delete_comment(apiurl: str, cid: str) -> Optional[str]:
u = makeurl(apiurl, ['comment', cid]) u = makeurl(apiurl, ['comment', cid])
f = http_DELETE(u) f = http_DELETE(u)
ret = ET.fromstring(f.read()).find('summary') ret = ET.fromstring(f.read()).find('summary')
if ret is None:
return None
return ret.text return ret.text
def get_rpmlint_log(apiurl, proj, pkg, repo, arch): def get_rpmlint_log(apiurl: str, proj: str, pkg: str, repo: str, arch: str):
u = makeurl(apiurl, ['build', proj, repo, arch, pkg, 'rpmlint.log']) u = makeurl(apiurl, ['build', proj, repo, arch, pkg, 'rpmlint.log'])
f = http_GET(u) f = http_GET(u)
return f.read() return f.read()
def checkout_deleted_package(apiurl, proj, pkg, dst): def checkout_deleted_package(apiurl: str, proj: str, pkg: str, dst):
pl = meta_get_filelist(apiurl, proj, pkg, deleted=True) pl = meta_get_filelist(apiurl, proj, pkg, deleted=True)
query = {} query = {}
query['deleted'] = 1 query['deleted'] = 1
@ -8254,7 +8321,7 @@ def checkout_deleted_package(apiurl, proj, pkg, dst):
print('done.') print('done.')
def vc_export_env(apiurl, quiet=False): def vc_export_env(apiurl: str, quiet=False):
# try to set the env variables for the user's realname and email # try to set the env variables for the user's realname and email
# (the variables are used by the "vc" script or some source service) # (the variables are used by the "vc" script or some source service)
tag2envs = {'realname': ['VC_REALNAME'], tag2envs = {'realname': ['VC_REALNAME'],
@ -8288,7 +8355,7 @@ def vc_export_env(apiurl, quiet=False):
class MultibuildFlavorResolver: class MultibuildFlavorResolver:
def __init__(self, apiurl, project, package, use_local=False): def __init__(self, apiurl: str, project: str, package: str, use_local=False):
self.apiurl = apiurl self.apiurl = apiurl
self.project = project self.project = project
self.package = package self.package = package