1
0
mirror of https://github.com/openSUSE/osc.git synced 2025-01-25 22:36:13 +01:00

Merge pull request #1518 from dmach/keyinfo

Move from _pubkey to _keyinfo
This commit is contained in:
Daniel Mach 2024-04-10 16:39:49 +02:00 committed by GitHub
commit d42383551c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 297 additions and 58 deletions

View File

@ -9718,30 +9718,37 @@ Please submit there instead, or use --nodevelproject to force direct submission.
url = makeurl(apiurl, ['source', prj, "_pubkey"])
f = http_DELETE(url)
else:
from . import obs_api
try:
# use current api, supporting fallback to higher project and server side scripts
query = {}
keyinfo = obs_api.Keyinfo.from_api(apiurl, prj)
if opts.sslcert:
query['withsslcert'] = 1
url = makeurl(apiurl, ['source', prj, '_keyinfo'], query)
f = http_GET(url)
for sslcert in keyinfo.sslcert_list or []:
print(sslcert.to_human_readable_string())
print()
else:
for pubkey in keyinfo.pubkey_list or []:
print(pubkey.to_human_readable_string())
print()
return
except HTTPError as e:
# old way to do it
while True:
try:
url = makeurl(apiurl, ['source', prj, '_pubkey'])
if opts.sslcert:
url = makeurl(apiurl, ['source', prj, '_project', '_sslcert'], 'meta=1')
f = http_GET(url)
break
except HTTPError as e:
l = prj.rsplit(':', 1)
# try key from parent project
if not opts.notraverse and len(l) > 1 and l[0] and l[1] and e.code == 404:
print(f'{prj} has no key, trying {l[0]}')
prj = l[0]
else:
raise
if e.code != 404:
raise
# the _keyinfo endpoint doesn't exist, use the old _pubkey/_sslcert instead
if opts.sslcert:
result = obs_api.Keyinfo.get_sslcert_deprecated(apiurl, prj, traverse=not(opts.notraverse))
else:
result = obs_api.Keyinfo.get_pubkey_deprecated(apiurl, prj, traverse=not(opts.notraverse))
if result:
_, key = result
print(key)
return
while True:
data = f.read(16384)

View File

@ -4,6 +4,7 @@
# either version 2, or (at your option) any later version.
import glob
import os
import re
import shutil
@ -288,49 +289,47 @@ class Fetcher:
self.__fetch_cpio(buildinfo.apiurl)
prjs = list(buildinfo.projects.keys())
for i in prjs:
dest = f"{self.cachedir}/{i}"
if not os.path.exists(dest):
os.makedirs(dest, mode=0o755)
dest += '/_pubkey'
for prj in prjs:
dest = os.path.join(self.cachedir, prj)
pubkey_path_base = os.path.join(dest, "_pubkey")
pubkey_paths = glob.glob(f"{pubkey_path_base}*")
if self.offline:
# we're offline, only index the keys found on disk
if pubkey_paths:
for pubkey_path in pubkey_paths:
buildinfo.keys.append(pubkey_path)
buildinfo.prjkeys.append(prj)
continue
from . import obs_api
os.makedirs(dest, mode=0o755, exist_ok=True)
pubkeys = []
url = makeurl(buildinfo.apiurl, ['source', i, '_pubkey'])
try_parent = False
try:
if self.offline and not os.path.exists(dest):
# may need to try parent
try_parent = True
elif not self.offline:
OscFileGrabber().urlgrab(url, dest)
# not that many keys usually
if i not in buildinfo.prjkeys and not try_parent:
buildinfo.keys.append(dest)
buildinfo.prjkeys.append(i)
except KeyboardInterrupt:
print('Cancelled by user (ctrl-c)')
print('Exiting.')
if os.path.exists(dest):
os.unlink(dest)
sys.exit(0)
keyinfo = obs_api.Keyinfo.from_api(buildinfo.apiurl, prj)
for pubkey in keyinfo.pubkey_list or []:
pubkeys.append(pubkey.value)
except HTTPError as e:
# Not found is okay, let's go to the next project
if e.code != 404:
print("Invalid answer from server", e, file=sys.stderr)
sys.exit(1)
try_parent = True
result = obs_api.Keyinfo.get_pubkey_deprecated(buildinfo.apiurl, prj, traverse=True)
if result:
# overwrite ``prj`` with the project that contains the key we're using
prj, pubkey = result
pubkeys.append(pubkey)
if try_parent:
if self.http_debug:
print(f"can't fetch key for {i}", file=sys.stderr)
print(f"url: {url}", file=sys.stderr)
# remove the existing files, we'll create new files with new contents
for pubkey_path in pubkey_paths:
os.unlink(pubkey_path)
if os.path.exists(dest):
os.unlink(dest)
l = i.rsplit(':', 1)
# try key from parent project
if len(l) > 1 and l[1] and not l[0] in buildinfo.projects:
prjs.append(l[0])
if pubkeys:
for num, pubkey in enumerate(pubkeys):
pubkey_path = f"{pubkey_path_base}-{num}"
with open(pubkey_path, "w") as f:
f.write(pubkey)
buildinfo.keys.append(pubkey_path)
if prj not in buildinfo.prjkeys:
buildinfo.prjkeys.append(prj)
def verify_pacs_old(pac_list):

View File

@ -1,3 +1,4 @@
from .keyinfo import Keyinfo
from .package import Package
from .package_sources import PackageSources
from .person import Person

110
osc/obs_api/keyinfo.py Normal file
View File

@ -0,0 +1,110 @@
import textwrap
from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-import
from .keyinfo_pubkey import KeyinfoPubkey
from .keyinfo_sslcert import KeyinfoSslcert
class Keyinfo(XmlModel):
XML_TAG = "keyinfo"
project: str = Field(
xml_attribute=True,
description=textwrap.dedent(
"""
The name of the project.
"""
),
)
pubkey_list: Optional[List[KeyinfoPubkey]] = Field(
xml_name="pubkey",
)
sslcert_list: Optional[List[KeyinfoSslcert]] = Field(
xml_name="sslcert",
)
@classmethod
def from_api(cls, apiurl: str, project: str) -> "Keyinfo":
url_path = ["source", project, "_keyinfo"]
url_query = {}
response = cls.xml_request("GET", apiurl, url_path, url_query)
return cls.from_file(response, apiurl=apiurl)
@classmethod
def get_pubkey_deprecated(cls, apiurl: str, project: str, *, traverse: bool = True) -> Optional[Tuple[str, str]]:
"""
Old API for retrieving pubkey of the given ``project``. Use ``Keyinfo.from_api()`` instead if possible.
:param traverse: If set to ``True`` and the key is not found, traverse project hierarchy for the first available key.
:return: (project, pubkey) or None
"""
from urllib.error import HTTPError
from ..connection import http_request
from ..core import makeurl
from ..output import print_msg
while True:
url_path = ["source", project, "_pubkey"]
url_query = {}
url = makeurl(apiurl, url_path, url_query)
try:
response = http_request("GET", url)
pubkey = response.read().decode("utf-8")
return project, pubkey
except HTTPError as e:
if e.code != 404:
raise
if not traverse:
return None
parts = project.rsplit(":", 1)
if parts[0] != project:
print_msg(f"No pubkey found in project '{project}'. Trying the parent project '{parts[0]}'...", print_to="debug")
project = parts[0]
continue
# we're at the top level, no key found
return None
@classmethod
def get_sslcert_deprecated(cls, apiurl: str, project: str, *, traverse: bool = True) -> Optional[Tuple[str, str]]:
"""
Old API for retrieving sslcert of the given ``project``. Use ``Keyinfo.from_api()`` instead if possible.
:param traverse: If set to ``True`` and the cert is not found, traverse project hierarchy for the first available cert.
:return: (project, sslcert) or None
"""
from urllib.error import HTTPError
from ..connection import http_request
from ..core import makeurl
from ..output import print_msg
while True:
url_path = ["source", project, "_project", "_sslcert"]
url_query = {
"meta": 1,
}
url = makeurl(apiurl, url_path, url_query)
try:
response = http_request("GET", url)
sslcert = response.read().decode("utf-8")
return project, sslcert
except HTTPError as e:
if e.code != 404:
raise
if not traverse:
return None
parts = project.rsplit(":", 1)
if parts[0] != project:
print_msg(f"No sslcert found in project '{project}'. Trying the parent project '{parts[0]}'...", print_to="debug")
project = parts[0]
continue
# we're at the top level, no cert found
return None

View File

@ -0,0 +1,51 @@
from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-import
class KeyinfoPubkey(XmlModel):
XML_TAG = "pubkey"
keyid: str = Field(
xml_attribute=True,
)
userid: str = Field(
xml_attribute=True,
)
algo: str = Field(
xml_attribute=True,
)
keysize: str = Field(
xml_attribute=True,
)
expires: int = Field(
xml_attribute=True,
)
fingerprint: str = Field(
xml_attribute=True,
)
value: str = Field(
xml_set_text=True,
)
def get_expires_str(self) -> str:
import datetime
return datetime.datetime.fromtimestamp(self.expires).strftime("%Y-%m-%d %H:%M:%S")
def to_human_readable_string(self) -> str:
"""
Render the object as a human readable string.
"""
from ..output import KeyValueTable
table = KeyValueTable()
table.add("Type", "GPG public key")
table.add("User ID", self.userid, color="bold")
table.add("Algorithm", self.algo)
table.add("Key size", self.keysize)
table.add("Expires", self.get_expires_str())
table.add("Fingerprint", self.fingerprint)
return f"{table}\n{self.value}"

View File

@ -0,0 +1,71 @@
from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-import
class KeyinfoSslcert(XmlModel):
XML_TAG = "sslcert"
keyid: str = Field(
xml_attribute=True,
)
serial: str = Field(
xml_attribute=True,
)
issuer: Optional[str] = Field(
xml_attribute=True,
)
subject: str = Field(
xml_attribute=True,
)
algo: str = Field(
xml_attribute=True,
)
keysize: str = Field(
xml_attribute=True,
)
begins: int = Field(
xml_attribute=True,
)
expires: int = Field(
xml_attribute=True,
)
fingerprint: str = Field(
xml_attribute=True,
)
value: str = Field(
xml_set_text=True,
)
def get_begins_str(self) -> str:
import datetime
return datetime.datetime.fromtimestamp(self.begins).strftime("%Y-%m-%d %H:%M:%S")
def get_expires_str(self) -> str:
import datetime
return datetime.datetime.fromtimestamp(self.expires).strftime("%Y-%m-%d %H:%M:%S")
def to_human_readable_string(self) -> str:
"""
Render the object as a human readable string.
"""
from ..output import KeyValueTable
table = KeyValueTable()
table.add("Type", "SSL certificate")
table.add("Subject", self.subject, color="bold")
table.add("Key ID", self.keyid)
table.add("Serial", self.serial)
table.add("Issuer", self.issuer)
table.add("Algorithm", self.algo)
table.add("Key size", self.keysize)
table.add("Begins", self.get_begins_str())
table.add("Expires", self.get_expires_str())
table.add("Fingerprint", self.fingerprint)
return f"{table}\n{self.value}"