mirror of
https://github.com/openSUSE/osc.git
synced 2026-02-21 03:05:28 +01:00
This change introduces support for authenticating against the Gitea API using an SSH agent. This provides a more secure and convenient alternative to storing private keys or API tokens directly in the configuration file. Key changes include: - A new HttpSigner class that can sign HTTP requests using either a local private key or a key from an SSH agent. - The osc git-obs login add and login update commands now accept --ssh-agent and --ssh-key-agent-pub arguments to configure SSH agent-based authentication. - The paramiko library is now used for SSH agent communication. It is an optional dependency and only required when using the SSH agent feature. - The Gitea API connection logic is updated to use the HttpSigner when SSH authentication is configured. This improves user experience by allowing them to leverage their existing SSH agent setup for Gitea authentication, avoiding the need to handle sensitive credentials manually.
205 lines
7.0 KiB
Python
205 lines
7.0 KiB
Python
import copy
|
|
import http.client
|
|
import json
|
|
import re
|
|
import time
|
|
import urllib.parse
|
|
from typing import Dict
|
|
from typing import Generator
|
|
from typing import Optional
|
|
|
|
import urllib3
|
|
import urllib3.exceptions
|
|
import urllib3.response
|
|
|
|
from .conf import Login
|
|
from .http_signer import HttpSigner
|
|
|
|
RE_HTTP_HEADER_LINK = re.compile('<(?P<url>.*?)>; rel="(?P<rel>.*?)",?')
|
|
|
|
|
|
def parse_http_header_link(link: str) -> Dict[str, str]:
|
|
"""
|
|
Parse RFC8288 "link" http headers into {"rel": "url"}
|
|
"""
|
|
result = {}
|
|
for match in RE_HTTP_HEADER_LINK.findall(link):
|
|
result[match[1]] = match[0]
|
|
return result
|
|
|
|
|
|
class GiteaHTTPResponse:
|
|
"""
|
|
A ``urllib3.response.HTTPResponse`` wrapper
|
|
that ensures compatibility with older versions of urllib3.
|
|
"""
|
|
|
|
def __init__(self, response: urllib3.response.HTTPResponse):
|
|
self.__dict__["_response"] = response
|
|
|
|
def __getattr__(self, name):
|
|
return getattr(self._response, name)
|
|
|
|
def json(self):
|
|
if hasattr(self._response, "json"):
|
|
return self._response.json()
|
|
return json.loads(self._response.data)
|
|
|
|
|
|
class Connection:
|
|
def __init__(self, login: Login, alternative_port: Optional[int] = None):
|
|
"""
|
|
:param login: ``Login`` object with Gitea url and credentials.
|
|
:param alternative_port: Use an alternative port for the connection. This is needed for testing when gitea runs on a random port.
|
|
"""
|
|
self.login = login
|
|
|
|
parsed_url = urllib.parse.urlparse(self.login.url, scheme="https")
|
|
if parsed_url.scheme == "http":
|
|
ConnectionClass = urllib3.connection.HTTPConnection
|
|
elif parsed_url.scheme == "https":
|
|
ConnectionClass = urllib3.connection.HTTPSConnection
|
|
else:
|
|
raise ValueError(f"Unsupported scheme in Gitea url '{self.login.url}'")
|
|
|
|
self.scheme = parsed_url.scheme
|
|
self.host = parsed_url.hostname
|
|
assert self.host is not None
|
|
self.port = alternative_port if alternative_port else parsed_url.port
|
|
|
|
conn_kwargs = {}
|
|
|
|
if urllib3.__version__.startswith("1."):
|
|
# workaround for urllib3 v1: TypeError: 'object' object cannot be interpreted as an integer
|
|
conn_kwargs["timeout"] = 60
|
|
|
|
self.conn = ConnectionClass(host=self.host, port=self.port, **conn_kwargs)
|
|
|
|
# retries; variables are named according to urllib3
|
|
self.retry_count = 3
|
|
self.retry_backoff_factor = 2
|
|
self.retry_status_forcelist = (
|
|
500, # Internal Server Error
|
|
502, # Bad Gateway
|
|
503, # Service Unavailable
|
|
504, # Gateway Timeout
|
|
)
|
|
|
|
if hasattr(self.conn, "set_cert"):
|
|
# needed to avoid: AttributeError: 'HTTPSConnection' object has no attribute 'assert_hostname'. Did you mean: 'server_hostname'?
|
|
self.conn.set_cert()
|
|
|
|
def makeurl(self, *path: str, query: Optional[dict] = None):
|
|
"""
|
|
Return relative url prefixed with "/api/v1/" followed with concatenated ``*path``.
|
|
"""
|
|
url_path = ["", "api", "v1"] + [urllib.parse.quote(i.lstrip("/"), safe="/:") for i in path]
|
|
url_path_str = "/".join(url_path)
|
|
|
|
if query is None:
|
|
query = {}
|
|
query = copy.deepcopy(query)
|
|
|
|
for key in list(query):
|
|
value = query[key]
|
|
|
|
if value in (None, [], ()):
|
|
# remove items with value equal to None or [] or ()
|
|
del query[key]
|
|
elif isinstance(value, bool):
|
|
# convert boolean values to "0" or "1"
|
|
query[key] = str(int(value))
|
|
|
|
url_query_str = urllib.parse.urlencode(query, doseq=True)
|
|
return urllib.parse.urlunsplit(("", "", url_path_str, url_query_str, ""))
|
|
|
|
def request(
|
|
self,
|
|
method,
|
|
url,
|
|
json_data: Optional[dict] = None,
|
|
*,
|
|
context: Optional[dict] = None,
|
|
exception_map: Optional[dict] = None,
|
|
) -> GiteaHTTPResponse:
|
|
"""
|
|
Make a request and return ``GiteaHTTPResponse``.
|
|
|
|
:param context: Additional parameters passed as **kwargs to an exception if raised
|
|
"""
|
|
headers = {
|
|
"Content-Type": "application/json",
|
|
}
|
|
if self.login.ssh_key or self.login.ssh_agent:
|
|
try:
|
|
headers.update(HttpSigner(self.login).get_signed_header(method, url))
|
|
except Exception as e:
|
|
raise RuntimeError(f"Failed to sign the request using SSH credentials: {e}")
|
|
elif self.login.token:
|
|
headers["Authorization"] = f"token {self.login.token}"
|
|
|
|
if json_data and isinstance(json_data, dict):
|
|
json_data = dict(((key, value) for key, value in json_data.items() if value is not None))
|
|
|
|
body = json.dumps(json_data) if json_data else None
|
|
|
|
for retry in range(1 + self.retry_count):
|
|
# 1 regular request + ``self.retry_count`` retries
|
|
try:
|
|
self.conn.request(method, url, body, headers)
|
|
response = self.conn.getresponse()
|
|
|
|
if response.status not in self.retry_status_forcelist:
|
|
# we are happy with the response status -> use the response
|
|
break
|
|
|
|
if retry >= self.retry_count:
|
|
# we have reached maximum number of retries -> use the response
|
|
break
|
|
|
|
except (urllib3.exceptions.HTTPError, ConnectionResetError):
|
|
if retry >= self.retry_count:
|
|
raise
|
|
|
|
# {backoff factor} * (2 ** ({number of previous retries}))
|
|
time.sleep(self.retry_backoff_factor * (2 ** retry))
|
|
self.conn.close()
|
|
|
|
if isinstance(response, http.client.HTTPResponse):
|
|
response = GiteaHTTPResponse(urllib3.response.HTTPResponse.from_httplib(response))
|
|
else:
|
|
response = GiteaHTTPResponse(response)
|
|
|
|
if not hasattr(response, "status"):
|
|
from .exceptions import GiteaException # pylint: disable=import-outside-toplevel,cyclic-import
|
|
|
|
raise GiteaException(response)
|
|
|
|
if response.status // 100 != 2:
|
|
from .exceptions import response_to_exception
|
|
|
|
raise response_to_exception(response, context=context, exception_map=exception_map)
|
|
|
|
return response
|
|
|
|
def request_all_pages(
|
|
self, method, url, json_data: Optional[dict] = None, *, context: Optional[dict] = None
|
|
) -> Generator[GiteaHTTPResponse, None, None]:
|
|
"""
|
|
Make a request and yield ``GiteaHTTPResponse`` instances for each page.
|
|
Arguments are forwarded to the underlying ``request()`` call.
|
|
"""
|
|
|
|
while True:
|
|
response = self.request(method, url, json_data=json_data, context=context)
|
|
yield response
|
|
|
|
if "link" not in response.headers:
|
|
break
|
|
|
|
links = parse_http_header_link(response.headers["link"])
|
|
if "next" in links:
|
|
url = links["next"]
|
|
else:
|
|
break
|