1
0
mirror of https://github.com/openSUSE/osc.git synced 2024-11-10 22:56:15 +01:00

Merge pull request #1179 from SchoolGuy/add-type-annotations

Add type annotations
This commit is contained in:
Daniel Mach 2022-12-05 15:52:58 +01:00 committed by GitHub
commit d17bece45d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 163 additions and 64 deletions

View File

@ -13,6 +13,29 @@ on:
- '**.py'
jobs:
mypy:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- uses: actions/setup-python@v4
- run: pip install mypy
- run: pip install types-cryptography types-urllib3
- run: pip install distro keyring progressbar zstandard
- run: mypy osc
darker:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
with:
fetch-depth: 0
- uses: actions/setup-python@v4
- uses: akaihola/darker@1.5.1
with:
options: "--check --diff --color --line-length=120"
src: "."
version: "1.5.1"
pylint_plugins:
name: 'Diff pylint runs on osc plugin'
runs-on: 'ubuntu-latest'

View File

@ -1376,12 +1376,12 @@ def main(apiurl, opts, argv):
for dep in bi.deps:
if dep.sysroot:
# packages installed in sysroot subdirectory need to get a prefix for init_buildsystem
rpmlist.append('sysroot: %s %s\n' % (dep.name, dep.fullfilename))
rpmlist.append("sysroot: %s %s\n" % (dep.name, dep.fullfilename))
else:
rpmlist.append('%s %s\n' % (dep.name, dep.fullfilename))
rpmlist.append("%s %s\n" % (dep.name, dep.fullfilename))
for i in imagebins:
rpmlist.append('%s preinstallimage\n' % i)
rpmlist += ['%s %s\n' % (i[0], i[1]) for i in rpmlist_prefers]
rpmlist.append("%s preinstallimage\n" % i)
rpmlist += ["%s %s\n" % (i[0], i[1]) for i in rpmlist_prefers]
if imagefile:
rpmlist.append('preinstallimage: %s\n' % imagefile)
@ -1474,11 +1474,11 @@ def main(apiurl, opts, argv):
print()
print('The buildroot was:', build_root)
sys.exit(rc)
except KeyboardInterrupt as i:
except KeyboardInterrupt as keyboard_interrupt_exception:
print("keyboard interrupt, killing build ...")
cmd.append('--kill')
run_external(cmd[0], *cmd[1:])
raise i
raise keyboard_interrupt_exception
pacdir = os.path.join(build_root, '.build.packages')
if os.path.islink(pacdir):

View File

@ -31,6 +31,7 @@ import time
from functools import cmp_to_key, total_ordering
from http.client import IncompleteRead
from io import StringIO
from typing import Optional, Dict, Union, List
from urllib.parse import urlsplit, urlunsplit, urlparse, quote_plus, urlencode, unquote
from urllib.error import HTTPError
from urllib.request import pathname2url
@ -310,9 +311,9 @@ class Serviceinfo:
def __init__(self):
"""creates an empty serviceinfo instance"""
self.services = []
self.apiurl = None
self.project = None
self.package = None
self.apiurl: Optional[str] = None
self.project: Optional[str] = None
self.package: Optional[str] = None
def read(self, serviceinfo_node, append=False):
"""read in the source services ``<services>`` element passed as
@ -351,7 +352,7 @@ class Serviceinfo:
data['command'] = command
self.services.append(data)
def getProjectGlobalServices(self, apiurl, project, package):
def getProjectGlobalServices(self, apiurl: str, project: str, package: str):
self.apiurl = apiurl
# get all project wide services in one file, we don't store it yet
u = makeurl(apiurl, ['source', project, package], query='cmd=getprojectservices')
@ -368,7 +369,7 @@ class Serviceinfo:
elif e.code != 403 and e.code != 400:
raise e
def addVerifyFile(self, serviceinfo_node, filename):
def addVerifyFile(self, serviceinfo_node, filename: str):
f = open(filename, 'rb')
digest = hashlib.sha256(f.read()).hexdigest()
f.close()
@ -382,7 +383,7 @@ class Serviceinfo:
r.append(s)
return r
def addDownloadUrl(self, serviceinfo_node, url_string):
def addDownloadUrl(self, serviceinfo_node, url_string: str):
url = urlparse(url_string)
protocol = url.scheme
host = url.netloc
@ -403,7 +404,7 @@ class Serviceinfo:
r.append(s)
return r
def addGitUrl(self, serviceinfo_node, url_string):
def addGitUrl(self, serviceinfo_node, url_string: Optional[str]):
r = serviceinfo_node
s = ET.Element("service", name="obs_scm")
ET.SubElement(s, "param", name="url").text = url_string
@ -425,7 +426,7 @@ class Serviceinfo:
r.append(s)
return r
def execute(self, dir, callmode=None, singleservice=None, verbose=None):
def execute(self, dir, callmode: Optional[str] = None, singleservice=None, verbose: Optional[bool] = None):
old_dir = os.path.join(dir, '.old')
# if 2 osc instances are executed at a time one, of them fails on .old file existence
@ -445,8 +446,9 @@ class Serviceinfo:
shutil.rmtree(old_dir)
return result
def _execute(self, dir, old_dir, callmode=None, singleservice=None,
verbose=None):
def _execute(
self, dir, old_dir, callmode: Optional[str] = None, singleservice=None, verbose: Optional[bool] = None
):
# cleanup existing generated files
for filename in os.listdir(dir):
if filename.startswith('_service:') or filename.startswith('_service_'):
@ -760,7 +762,7 @@ class Project:
dirty_files.append(fname)
return dirty_files
def wc_repair(self, apiurl=None):
def wc_repair(self, apiurl: Optional[str] = None):
store = Store(self.dir)
store.assert_is_project()
if not store.exists("_apiurl") or apiurl:
@ -800,7 +802,7 @@ class Project:
prj_obj=self, prj_dir=self.dir,
expand_link=expand_link or not unexpand_link, progress_obj=self.progress_obj)
def status(self, pac):
def status(self, pac: str):
exists = os.path.exists(os.path.join(self.absdir, pac))
st = self.get_state(pac)
if st is None and exists:
@ -840,7 +842,7 @@ class Project:
else:
node.set('state', state)
def get_package_node(self, pac):
def get_package_node(self, pac: str):
for node in self.pac_root.findall('package'):
if pac == node.get('name'):
return node
@ -851,7 +853,7 @@ class Project:
if pac == node.get('name'):
self.pac_root.remove(node)
def get_state(self, pac):
def get_state(self, pac: str):
node = self.get_package_node(pac)
if node is not None:
return node.get('state')
@ -1173,7 +1175,16 @@ class Project:
return '\n'.join(r)
@staticmethod
def init_project(apiurl, dir, project, package_tracking=True, getPackageList=True, progress_obj=None, wc_check=True, scm_url=None):
def init_project(
apiurl: str,
dir,
project,
package_tracking=True,
getPackageList=True,
progress_obj=None,
wc_check=True,
scm_url=None,
):
global store
if not os.path.exists(dir):
@ -1323,7 +1334,7 @@ class Package:
dirty_files.append(fname)
return dirty_files
def wc_repair(self, apiurl=None):
def wc_repair(self, apiurl: Optional[str] = None):
store = Store(self.dir)
store.assert_is_package()
if not store.exists("_apiurl") or apiurl:
@ -1523,7 +1534,7 @@ class Package:
return root
@staticmethod
def commit_filelist(apiurl, project, package, filelist, msg='', user=None, **query):
def commit_filelist(apiurl: str, project: str, package: str, filelist, msg="", user=None, **query):
"""send the commitlog and the local filelist to the server"""
if user is None:
user = conf.get_apiurl_usr(apiurl)
@ -2616,7 +2627,7 @@ rev: %s
self.write_addlist()
@staticmethod
def init_package(apiurl, project, package, dir, size_limit=None, meta=False, progress_obj=None, scm_url=None):
def init_package(apiurl: str, project, package, dir, size_limit=None, meta=False, progress_obj=None, scm_url=None):
global store
if not os.path.exists(dir):
@ -3213,7 +3224,7 @@ class Request:
return '\n'.join(lines)
def create(self, apiurl, addrevision=False, enforce_branching=False):
def create(self, apiurl: str, addrevision=False, enforce_branching=False):
"""create a new request"""
query = {'cmd': 'create'}
if addrevision:
@ -3436,7 +3447,7 @@ def pathjoin(a, *p):
return path
def makeurl(baseurl, l, query=None):
def makeurl(baseurl: str, l, query=None):
"""Given a list of path compoments, construct a complete URL.
Optional parameters for a query string can be given as a list, as a
@ -3486,7 +3497,7 @@ def check_store_version(dir):
raise oscerr.WorkingCopyWrongVersion(msg)
def meta_get_packagelist(apiurl, prj, deleted=None, expand=False):
def meta_get_packagelist(apiurl: str, prj, deleted=None, expand=False):
query = {}
if deleted:
@ -3505,11 +3516,11 @@ def meta_get_packagelist(apiurl, prj, deleted=None, expand=False):
return [node.get('name') for node in root.findall('entry')]
def meta_get_filelist(apiurl, prj, package, verbose=False, expand=False, revision=None, meta=False, deleted=False):
def meta_get_filelist(apiurl: str, prj, package, verbose=False, expand=False, revision=None, meta=False, deleted=False):
"""return a list of file names,
or a list File() instances if verbose=True"""
query = {}
query: Dict[str, Union[str, int]] = {}
if deleted:
query['deleted'] = 1
if expand:
@ -3542,7 +3553,7 @@ def meta_get_filelist(apiurl, prj, package, verbose=False, expand=False, revisio
return l
def meta_get_project_list(apiurl, deleted=None):
def meta_get_project_list(apiurl: str, deleted=False):
query = {}
if deleted:
query['deleted'] = 1
@ -3553,7 +3564,7 @@ def meta_get_project_list(apiurl, deleted=None):
return sorted(node.get('name') for node in root if node.get('name'))
def show_project_meta(apiurl, prj, rev=None, blame=None):
def show_project_meta(apiurl: str, prj: str, rev=None, blame=None):
query = {}
if blame:
query['view'] = "blame"
@ -3580,7 +3591,7 @@ def show_project_meta(apiurl, prj, rev=None, blame=None):
return f.readlines()
def show_project_conf(apiurl, prj, rev=None, blame=None):
def show_project_conf(apiurl: str, prj: str, rev=None, blame=None):
query = {}
url = None
if rev:
@ -3594,7 +3605,7 @@ def show_project_conf(apiurl, prj, rev=None, blame=None):
return f.readlines()
def show_package_trigger_reason(apiurl, prj, pac, repo, arch):
def show_package_trigger_reason(apiurl: str, prj: str, pac: str, repo: str, arch: str):
url = makeurl(apiurl, ['build', prj, repo, arch, pac, '_reason'])
try:
f = http_GET(url)
@ -3604,8 +3615,8 @@ def show_package_trigger_reason(apiurl, prj, pac, repo, arch):
raise
def show_package_meta(apiurl, prj, pac, meta=False, blame=None):
query = {}
def show_package_meta(apiurl: str, prj: str, pac: str, meta=False, blame=None):
query: Dict[str, Union[str, int]] = {}
if meta:
query['meta'] = 1
if blame:
@ -3621,7 +3632,7 @@ def show_package_meta(apiurl, prj, pac, meta=False, blame=None):
raise
def show_attribute_meta(apiurl, prj, pac, subpac, attribute, with_defaults, with_project):
def show_attribute_meta(apiurl: str, prj: str, pac, subpac, attribute, with_defaults, with_project):
path = []
path.append('source')
path.append(prj)
@ -4968,11 +4979,23 @@ def get_source_file_diff(dir, filename, rev, oldfilename=None, olddir=None, orig
return d
def server_diff(apiurl,
old_project, old_package, old_revision,
new_project, new_package, new_revision,
unified=False, missingok=False, meta=False, expand=True, onlyissues=False, full=True, xml=False):
query = {'cmd': 'diff'}
def server_diff(
apiurl: str,
old_project: str,
old_package: str,
old_revision: str,
new_project: str,
new_package: str,
new_revision: str,
unified=False,
missingok=False,
meta=False,
expand=True,
onlyissues=False,
full=True,
xml=False,
):
query: Dict[str, Union[str, int]] = {"cmd": "diff"}
if expand:
query['expand'] = 1
if old_project:
@ -5019,10 +5042,21 @@ def server_diff(apiurl,
return f.read()
def server_diff_noex(apiurl,
old_project, old_package, old_revision,
new_project, new_package, new_revision,
unified=False, missingok=False, meta=False, expand=True, onlyissues=False, xml=False):
def server_diff_noex(
apiurl: str,
old_project: str,
old_package: str,
old_revision: str,
new_project: str,
new_package: str,
new_revision: str,
unified=False,
missingok=False,
meta=False,
expand=True,
onlyissues=False,
xml=False,
):
try:
return server_diff(apiurl,
old_project, old_package, old_revision,
@ -5048,7 +5082,7 @@ def server_diff_noex(apiurl,
except:
elm = ET.fromstring(body).find('summary')
summary = ''
if elm is not None:
if elm is not None and elm.text is not None:
summary = elm.text
return b'error: diffing failed: %s' % summary.encode()
return rdiff
@ -5111,7 +5145,9 @@ def submit_action_diff(apiurl, action):
raise e
def make_dir(apiurl, project, package, pathname=None, prj_dir=None, package_tracking=True, pkg_path=None):
def make_dir(
apiurl: str, project: str, package: str, pathname=None, prj_dir=None, package_tracking=True, pkg_path=None
):
"""
creates the plain directory structure for a package dir.
The 'apiurl' parameter is needed for the project dir initialization.
@ -5158,9 +5194,22 @@ def make_dir(apiurl, project, package, pathname=None, prj_dir=None, package_trac
return pkg_path
def checkout_package(apiurl, project, package,
revision=None, pathname=None, prj_obj=None,
expand_link=False, prj_dir=None, server_service_files=None, service_files=None, progress_obj=None, size_limit=None, meta=False, outdir=None):
def checkout_package(
apiurl: str,
project: str,
package: str,
revision=None,
pathname=None,
prj_obj=None,
expand_link=False,
prj_dir=None,
server_service_files=None,
service_files=None,
progress_obj=None,
size_limit=None,
meta=False,
outdir=None,
):
try:
# the project we're in might be deleted.
# that'll throw an error then.
@ -5220,10 +5269,11 @@ def checkout_package(apiurl, project, package,
# exists
meta_data = b''.join(show_package_meta(apiurl, quote_plus(project), quote_plus(package)))
root = ET.fromstring(meta_data)
if root.find('scmsync') is not None and root.find('scmsync').text is not None:
scmsync_element = root.find("scmsync")
if scmsync_element is not None and scmsync_element.text is not None:
if not os.path.isfile('/usr/lib/obs/service/obs_scm_bridge'):
raise oscerr.OscIOError(None, 'Install the obs-scm-bridge package to work on packages managed in scm (git)!')
scm_url = root.find('scmsync').text
scm_url = scmsync_element.text
directory = make_dir(apiurl, project, package, pathname, prj_dir, conf.config['do_package_tracking'], outdir)
os.putenv("OSC_VERSION", get_osc_version())
run_external(['/usr/lib/obs/service/obs_scm_bridge', '--outdir', directory, '--url', scm_url])
@ -5885,7 +5935,9 @@ def get_repos_of_project(apiurl, prj):
yield Repo(node.get('name'), node2.text)
def get_binarylist(apiurl, prj, repo, arch, package=None, verbose=False, withccache=False):
def get_binarylist(
apiurl: str, prj: str, repo: str, arch: str, package: Optional[str] = None, verbose=False, withccache=False
):
what = package or '_repository'
query = {}
if withccache:
@ -5914,7 +5966,18 @@ def get_binarylist_published(apiurl, prj, repo, arch):
return r
def show_results_meta(apiurl, prj, package=None, lastbuild=None, repository=None, arch=None, oldstate=None, multibuild=False, locallink=False, code=None):
def show_results_meta(
apiurl: str,
prj: str,
package: Optional[str] = None,
lastbuild: Optional[str] = None,
repository: Optional[List[str]] = None,
arch: Optional[List[str]] = None,
oldstate: Optional[str] = None,
multibuild=False,
locallink=False,
code: Optional[str] = None,
):
repository = repository or []
arch = arch or []
query = []
@ -8140,7 +8203,7 @@ def filter_role(meta, user, role):
root.remove(node)
def find_default_project(apiurl=None, package=None):
def find_default_project(apiurl: Optional[str] = None, package: Optional[str] = None):
"""
look though the list of conf.config['getpac_default_project']
and find the first project where the given package exists in the build service.
@ -8174,7 +8237,7 @@ def utime(filename, arg, ignore_einval=True):
raise
def which(name):
def which(name: str):
"""Searches "name" in PATH."""
name = os.path.expanduser(name)
if os.path.isabs(name):
@ -8188,13 +8251,13 @@ def which(name):
return None
def get_comments(apiurl, kind, *args):
def get_comments(apiurl: str, kind, *args):
url = makeurl(apiurl, ('comments', kind) + args)
f = http_GET(url)
return ET.parse(f).getroot()
def print_comments(apiurl, kind, *args):
def print_comments(apiurl: str, kind, *args):
def print_rec(comments, indent=''):
for comment in comments:
print(indent, end='')
@ -8210,30 +8273,34 @@ def print_comments(apiurl, kind, *args):
print_rec(comments)
def create_comment(apiurl, kind, comment, *args, **kwargs):
def create_comment(apiurl: str, kind, comment, *args, **kwargs) -> Optional[str]:
query = {}
if kwargs.get('parent') is not None:
query = {'parent_id': kwargs['parent']}
u = makeurl(apiurl, ('comments', kind) + args, query=query)
f = http_POST(u, data=comment)
ret = ET.fromstring(f.read()).find('summary')
if ret is None:
return None
return ret.text
def delete_comment(apiurl, cid):
def delete_comment(apiurl: str, cid: str) -> Optional[str]:
u = makeurl(apiurl, ['comment', cid])
f = http_DELETE(u)
ret = ET.fromstring(f.read()).find('summary')
if ret is None:
return None
return ret.text
def get_rpmlint_log(apiurl, proj, pkg, repo, arch):
def get_rpmlint_log(apiurl: str, proj: str, pkg: str, repo: str, arch: str):
u = makeurl(apiurl, ['build', proj, repo, arch, pkg, 'rpmlint.log'])
f = http_GET(u)
return f.read()
def checkout_deleted_package(apiurl, proj, pkg, dst):
def checkout_deleted_package(apiurl: str, proj: str, pkg: str, dst):
pl = meta_get_filelist(apiurl, proj, pkg, deleted=True)
query = {}
query['deleted'] = 1
@ -8254,7 +8321,7 @@ def checkout_deleted_package(apiurl, proj, pkg, dst):
print('done.')
def vc_export_env(apiurl, quiet=False):
def vc_export_env(apiurl: str, quiet=False):
# try to set the env variables for the user's realname and email
# (the variables are used by the "vc" script or some source service)
tag2envs = {'realname': ['VC_REALNAME'],
@ -8288,7 +8355,7 @@ def vc_export_env(apiurl, quiet=False):
class MultibuildFlavorResolver:
def __init__(self, apiurl, project, package, use_local=False):
def __init__(self, apiurl: str, project: str, package: str, use_local=False):
self.apiurl = apiurl
self.project = project
self.package = package

0
osc/py.typed Normal file
View File

View File

@ -41,6 +41,15 @@ install_requires =
rpm
urllib3
[options.extras_require]
lint =
darker==1.5.1
mypy
[options.package_data]
osc =
py.typed
[options.entry_points]
console_scripts =
osc = osc.babysitter:main