From cf4a0d06a2a0244c50066d9d4c567c545539b8ff Mon Sep 17 00:00:00 2001 From: Daniel Mach Date: Thu, 14 Mar 2024 08:40:33 +0100 Subject: [PATCH 1/4] Add Keyinfo XML model --- osc/obs_api/__init__.py | 1 + osc/obs_api/keyinfo.py | 33 ++++++++++++++++ osc/obs_api/keyinfo_pubkey.py | 51 ++++++++++++++++++++++++ osc/obs_api/keyinfo_sslcert.py | 71 ++++++++++++++++++++++++++++++++++ 4 files changed, 156 insertions(+) create mode 100644 osc/obs_api/keyinfo.py create mode 100644 osc/obs_api/keyinfo_pubkey.py create mode 100644 osc/obs_api/keyinfo_sslcert.py diff --git a/osc/obs_api/__init__.py b/osc/obs_api/__init__.py index 37c7a52b..0084bff9 100644 --- a/osc/obs_api/__init__.py +++ b/osc/obs_api/__init__.py @@ -1,3 +1,4 @@ +from .keyinfo import Keyinfo from .package import Package from .package_sources import PackageSources from .person import Person diff --git a/osc/obs_api/keyinfo.py b/osc/obs_api/keyinfo.py new file mode 100644 index 00000000..7a7e9eed --- /dev/null +++ b/osc/obs_api/keyinfo.py @@ -0,0 +1,33 @@ +import textwrap + +from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-import +from .keyinfo_pubkey import KeyinfoPubkey +from .keyinfo_sslcert import KeyinfoSslcert + + +class Keyinfo(XmlModel): + XML_TAG = "keyinfo" + + project: str = Field( + xml_attribute=True, + description=textwrap.dedent( + """ + The name of the project. + """ + ), + ) + + pubkey_list: Optional[List[KeyinfoPubkey]] = Field( + xml_name="pubkey", + ) + + sslcert_list: Optional[List[KeyinfoSslcert]] = Field( + xml_name="sslcert", + ) + + @classmethod + def from_api(cls, apiurl: str, project: str) -> "Keyinfo": + url_path = ["source", project, "_keyinfo"] + url_query = {} + response = cls.xml_request("GET", apiurl, url_path, url_query) + return cls.from_file(response, apiurl=apiurl) diff --git a/osc/obs_api/keyinfo_pubkey.py b/osc/obs_api/keyinfo_pubkey.py new file mode 100644 index 00000000..a3c99515 --- /dev/null +++ b/osc/obs_api/keyinfo_pubkey.py @@ -0,0 +1,51 @@ +from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-import + + +class KeyinfoPubkey(XmlModel): + XML_TAG = "pubkey" + + keyid: str = Field( + xml_attribute=True, + ) + + userid: str = Field( + xml_attribute=True, + ) + + algo: str = Field( + xml_attribute=True, + ) + + keysize: str = Field( + xml_attribute=True, + ) + + expires: int = Field( + xml_attribute=True, + ) + + fingerprint: str = Field( + xml_attribute=True, + ) + + value: str = Field( + xml_set_text=True, + ) + + def get_expires_str(self) -> str: + import datetime + return datetime.datetime.fromtimestamp(self.expires).strftime("%Y-%m-%d %H:%M:%S") + + def to_human_readable_string(self) -> str: + """ + Render the object as a human readable string. + """ + from ..output import KeyValueTable + table = KeyValueTable() + table.add("Type", "GPG public key") + table.add("User ID", self.userid, color="bold") + table.add("Algorithm", self.algo) + table.add("Key size", self.keysize) + table.add("Expires", self.get_expires_str()) + table.add("Fingerprint", self.fingerprint) + return f"{table}\n{self.value}" diff --git a/osc/obs_api/keyinfo_sslcert.py b/osc/obs_api/keyinfo_sslcert.py new file mode 100644 index 00000000..8101056c --- /dev/null +++ b/osc/obs_api/keyinfo_sslcert.py @@ -0,0 +1,71 @@ +from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-import + + +class KeyinfoSslcert(XmlModel): + XML_TAG = "sslcert" + + keyid: str = Field( + xml_attribute=True, + ) + + serial: str = Field( + xml_attribute=True, + ) + + issuer: Optional[str] = Field( + xml_attribute=True, + ) + + subject: str = Field( + xml_attribute=True, + ) + + algo: str = Field( + xml_attribute=True, + ) + + keysize: str = Field( + xml_attribute=True, + ) + + begins: int = Field( + xml_attribute=True, + ) + + expires: int = Field( + xml_attribute=True, + ) + + fingerprint: str = Field( + xml_attribute=True, + ) + + value: str = Field( + xml_set_text=True, + ) + + def get_begins_str(self) -> str: + import datetime + return datetime.datetime.fromtimestamp(self.begins).strftime("%Y-%m-%d %H:%M:%S") + + def get_expires_str(self) -> str: + import datetime + return datetime.datetime.fromtimestamp(self.expires).strftime("%Y-%m-%d %H:%M:%S") + + def to_human_readable_string(self) -> str: + """ + Render the object as a human readable string. + """ + from ..output import KeyValueTable + table = KeyValueTable() + table.add("Type", "SSL certificate") + table.add("Subject", self.subject, color="bold") + table.add("Key ID", self.keyid) + table.add("Serial", self.serial) + table.add("Issuer", self.issuer) + table.add("Algorithm", self.algo) + table.add("Key size", self.keysize) + table.add("Begins", self.get_begins_str()) + table.add("Expires", self.get_expires_str()) + table.add("Fingerprint", self.fingerprint) + return f"{table}\n{self.value}" From e170b0d54cc5de9986ebdff64db517575cee0bb6 Mon Sep 17 00:00:00 2001 From: Daniel Mach Date: Thu, 14 Mar 2024 10:17:34 +0100 Subject: [PATCH 2/4] Add Keyinfo.get_{pubkey,sslcert}_deprecated() methods --- osc/obs_api/keyinfo.py | 77 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) diff --git a/osc/obs_api/keyinfo.py b/osc/obs_api/keyinfo.py index 7a7e9eed..7c607564 100644 --- a/osc/obs_api/keyinfo.py +++ b/osc/obs_api/keyinfo.py @@ -31,3 +31,80 @@ class Keyinfo(XmlModel): url_query = {} response = cls.xml_request("GET", apiurl, url_path, url_query) return cls.from_file(response, apiurl=apiurl) + + + @classmethod + def get_pubkey_deprecated(cls, apiurl: str, project: str, *, traverse: bool = True) -> Optional[Tuple[str, str]]: + """ + Old API for retrieving pubkey of the given ``project``. Use ``Keyinfo.from_api()`` instead if possible. + + :param traverse: If set to ``True`` and the key is not found, traverse project hierarchy for the first available key. + :return: (project, pubkey) or None + """ + from urllib.error import HTTPError + from ..connection import http_request + from ..core import makeurl + from ..output import print_msg + + while True: + url_path = ["source", project, "_pubkey"] + url_query = {} + url = makeurl(apiurl, url_path, url_query) + try: + response = http_request("GET", url) + pubkey = response.read().decode("utf-8") + return project, pubkey + except HTTPError as e: + if e.code != 404: + raise + + if not traverse: + return None + + parts = project.rsplit(":", 1) + if parts[0] != project: + print_msg(f"No pubkey found in project '{project}'. Trying the parent project '{parts[0]}'...", print_to="debug") + project = parts[0] + continue + + # we're at the top level, no key found + return None + + @classmethod + def get_sslcert_deprecated(cls, apiurl: str, project: str, *, traverse: bool = True) -> Optional[Tuple[str, str]]: + """ + Old API for retrieving sslcert of the given ``project``. Use ``Keyinfo.from_api()`` instead if possible. + + :param traverse: If set to ``True`` and the cert is not found, traverse project hierarchy for the first available cert. + :return: (project, sslcert) or None + """ + from urllib.error import HTTPError + from ..connection import http_request + from ..core import makeurl + from ..output import print_msg + + while True: + url_path = ["source", project, "_project", "_sslcert"] + url_query = { + "meta": 1, + } + url = makeurl(apiurl, url_path, url_query) + try: + response = http_request("GET", url) + sslcert = response.read().decode("utf-8") + return project, sslcert + except HTTPError as e: + if e.code != 404: + raise + + if not traverse: + return None + + parts = project.rsplit(":", 1) + if parts[0] != project: + print_msg(f"No sslcert found in project '{project}'. Trying the parent project '{parts[0]}'...", print_to="debug") + project = parts[0] + continue + + # we're at the top level, no cert found + return None From b8ab16945e182d46332051d9066b2122e0b6f1bb Mon Sep 17 00:00:00 2001 From: Daniel Mach Date: Thu, 14 Mar 2024 10:30:56 +0100 Subject: [PATCH 3/4] Migrate 'signkey' command to obs_api.Keyinfo --- osc/commandline.py | 47 ++++++++++++++++++++++++++-------------------- 1 file changed, 27 insertions(+), 20 deletions(-) diff --git a/osc/commandline.py b/osc/commandline.py index d7d63f60..b006a86d 100644 --- a/osc/commandline.py +++ b/osc/commandline.py @@ -9710,30 +9710,37 @@ Please submit there instead, or use --nodevelproject to force direct submission. url = makeurl(apiurl, ['source', prj, "_pubkey"]) f = http_DELETE(url) else: + from . import obs_api try: # use current api, supporting fallback to higher project and server side scripts - query = {} + keyinfo = obs_api.Keyinfo.from_api(apiurl, prj) + if opts.sslcert: - query['withsslcert'] = 1 - url = makeurl(apiurl, ['source', prj, '_keyinfo'], query) - f = http_GET(url) + for sslcert in keyinfo.sslcert_list or []: + print(sslcert.to_human_readable_string()) + print() + else: + for pubkey in keyinfo.pubkey_list or []: + print(pubkey.to_human_readable_string()) + print() + + return + except HTTPError as e: - # old way to do it - while True: - try: - url = makeurl(apiurl, ['source', prj, '_pubkey']) - if opts.sslcert: - url = makeurl(apiurl, ['source', prj, '_project', '_sslcert'], 'meta=1') - f = http_GET(url) - break - except HTTPError as e: - l = prj.rsplit(':', 1) - # try key from parent project - if not opts.notraverse and len(l) > 1 and l[0] and l[1] and e.code == 404: - print(f'{prj} has no key, trying {l[0]}') - prj = l[0] - else: - raise + if e.code != 404: + raise + + # the _keyinfo endpoint doesn't exist, use the old _pubkey/_sslcert instead + + if opts.sslcert: + result = obs_api.Keyinfo.get_sslcert_deprecated(apiurl, prj, traverse=not(opts.notraverse)) + else: + result = obs_api.Keyinfo.get_pubkey_deprecated(apiurl, prj, traverse=not(opts.notraverse)) + if result: + _, key = result + print(key) + + return while True: data = f.read(16384) From 7f6c0b3f8a13858b68b9299ff270e39c9bbed9a0 Mon Sep 17 00:00:00 2001 From: Daniel Mach Date: Thu, 14 Mar 2024 13:48:02 +0100 Subject: [PATCH 4/4] Migrate Fetcher.run() to obs_api.Keyinfo --- osc/fetch.py | 75 ++++++++++++++++++++++++++-------------------------- 1 file changed, 37 insertions(+), 38 deletions(-) diff --git a/osc/fetch.py b/osc/fetch.py index 8ed75a58..0cfbaf43 100644 --- a/osc/fetch.py +++ b/osc/fetch.py @@ -4,6 +4,7 @@ # either version 2, or (at your option) any later version. +import glob import os import re import shutil @@ -288,49 +289,47 @@ class Fetcher: self.__fetch_cpio(buildinfo.apiurl) prjs = list(buildinfo.projects.keys()) - for i in prjs: - dest = f"{self.cachedir}/{i}" - if not os.path.exists(dest): - os.makedirs(dest, mode=0o755) - dest += '/_pubkey' + for prj in prjs: + dest = os.path.join(self.cachedir, prj) + pubkey_path_base = os.path.join(dest, "_pubkey") + pubkey_paths = glob.glob(f"{pubkey_path_base}*") + + if self.offline: + # we're offline, only index the keys found on disk + if pubkey_paths: + for pubkey_path in pubkey_paths: + buildinfo.keys.append(pubkey_path) + buildinfo.prjkeys.append(prj) + continue + + from . import obs_api + + os.makedirs(dest, mode=0o755, exist_ok=True) + pubkeys = [] - url = makeurl(buildinfo.apiurl, ['source', i, '_pubkey']) - try_parent = False try: - if self.offline and not os.path.exists(dest): - # may need to try parent - try_parent = True - elif not self.offline: - OscFileGrabber().urlgrab(url, dest) - # not that many keys usually - if i not in buildinfo.prjkeys and not try_parent: - buildinfo.keys.append(dest) - buildinfo.prjkeys.append(i) - except KeyboardInterrupt: - print('Cancelled by user (ctrl-c)') - print('Exiting.') - if os.path.exists(dest): - os.unlink(dest) - sys.exit(0) + keyinfo = obs_api.Keyinfo.from_api(buildinfo.apiurl, prj) + for pubkey in keyinfo.pubkey_list or []: + pubkeys.append(pubkey.value) except HTTPError as e: - # Not found is okay, let's go to the next project - if e.code != 404: - print("Invalid answer from server", e, file=sys.stderr) - sys.exit(1) - try_parent = True + result = obs_api.Keyinfo.get_pubkey_deprecated(buildinfo.apiurl, prj, traverse=True) + if result: + # overwrite ``prj`` with the project that contains the key we're using + prj, pubkey = result + pubkeys.append(pubkey) - if try_parent: - if self.http_debug: - print(f"can't fetch key for {i}", file=sys.stderr) - print(f"url: {url}", file=sys.stderr) + # remove the existing files, we'll create new files with new contents + for pubkey_path in pubkey_paths: + os.unlink(pubkey_path) - if os.path.exists(dest): - os.unlink(dest) - - l = i.rsplit(':', 1) - # try key from parent project - if len(l) > 1 and l[1] and not l[0] in buildinfo.projects: - prjs.append(l[0]) + if pubkeys: + for num, pubkey in enumerate(pubkeys): + pubkey_path = f"{pubkey_path_base}-{num}" + with open(pubkey_path, "w") as f: + f.write(pubkey) + buildinfo.keys.append(pubkey_path) + if prj not in buildinfo.prjkeys: + buildinfo.prjkeys.append(prj) def verify_pacs_old(pac_list):