mirror of
https://github.com/openSUSE/osc.git
synced 2025-01-11 16:36:14 +01:00
Add 'gitea_api' module
Co-authored-by: Dan Čermák <dcermak@suse.com>
This commit is contained in:
parent
50a203fedb
commit
699f4d860e
10
.github/workflows/tests.yaml
vendored
10
.github/workflows/tests.yaml
vendored
@ -25,7 +25,7 @@ jobs:
|
||||
run: |
|
||||
sudo apt-get -y update
|
||||
sudo apt-get -y --no-install-recommends install git-lfs
|
||||
sudo apt-get -y --no-install-recommends install diffstat diffutils git-core python3 python3-cryptography python3-pip python3-rpm python3-setuptools python3-urllib3
|
||||
sudo apt-get -y --no-install-recommends install diffstat diffutils git-core python3 python3-cryptography python3-pip python3-rpm python3-ruamel.yaml python3-setuptools python3-urllib3
|
||||
|
||||
- uses: actions/checkout@v3
|
||||
|
||||
@ -85,7 +85,7 @@ jobs:
|
||||
zypper -n lr --details
|
||||
grep -qi tumbleweed /etc/os-release && zypper -n dist-upgrade || zypper -n patch || zypper -n patch
|
||||
zypper -n install git-lfs
|
||||
zypper -n install diffstat diffutils git-core python3 python3-cryptography python3-pip python3-rpm python3-setuptools python3-urllib3
|
||||
zypper -n install diffstat diffutils git-core python3 python3-cryptography python3-pip python3-rpm python3-ruamel.yaml python3-setuptools python3-urllib3
|
||||
|
||||
- name: 'Install packages (Fedora/CentOS)'
|
||||
if: ${{ contains(matrix.container, '/fedora:') || contains(matrix.container, '/centos:') }}
|
||||
@ -93,7 +93,7 @@ jobs:
|
||||
dnf -y makecache
|
||||
dnf -y distro-sync
|
||||
dnf -y install git-lfs
|
||||
dnf -y install diffstat diffutils git-core python3 python3-cryptography python3-pip python3-rpm python3-setuptools python3-urllib3
|
||||
dnf -y install diffstat diffutils git-core python3 python3-cryptography python3-pip python3-rpm python3-ruamel-yaml python3-setuptools python3-urllib3
|
||||
|
||||
- name: 'Install packages (Debian/Ubuntu)'
|
||||
if: ${{ contains(matrix.container, '/debian:') || contains(matrix.container, '/ubuntu:') }}
|
||||
@ -101,7 +101,7 @@ jobs:
|
||||
apt-get -y update
|
||||
apt-get -y upgrade
|
||||
apt-get -y --no-install-recommends install git-lfs
|
||||
apt-get -y --no-install-recommends install diffstat diffutils git-core python3 python3-cryptography python3-pip python3-rpm python3-setuptools python3-urllib3
|
||||
apt-get -y --no-install-recommends install diffstat diffutils git-core python3 python3-cryptography python3-pip python3-rpm python3-ruamel.yaml python3-setuptools python3-urllib3
|
||||
|
||||
- uses: actions/checkout@v3
|
||||
|
||||
@ -131,7 +131,7 @@ jobs:
|
||||
run: |
|
||||
sudo sh -c '. /etc/os-release; echo "deb [trusted=yes] http://download.opensuse.org/repositories/openSUSE:Tools/xUbuntu_${VERSION_ID} ./" > /etc/apt/sources.list.d/openSUSE-Tools.list'
|
||||
sudo apt-get -y update
|
||||
sudo apt-get -y --no-install-recommends install python3-behave diffstat diffutils python3 python3-cryptography python3-pip python3-rpm python3-setuptools python3-urllib3 obs-build obs-service-set-version
|
||||
sudo apt-get -y --no-install-recommends install python3-behave diffstat diffutils python3 python3-cryptography python3-pip python3-rpm python3-ruamel.yaml python3-setuptools python3-urllib3 obs-build obs-service-set-version
|
||||
|
||||
- name: "Checkout sources"
|
||||
uses: actions/checkout@v3
|
||||
|
@ -40,12 +40,14 @@
|
||||
%define ssh_add_pkg openssh-clients
|
||||
%define ssh_keygen_pkg openssh
|
||||
%define sphinx_pkg %{use_python_pkg}-sphinx
|
||||
%define ruamel_yaml_pkg %{use_python_pkg}-ruamel-yaml
|
||||
|
||||
%if 0%{?suse_version}
|
||||
%define argparse_manpage_pkg %{use_python_pkg}-argparse-manpage
|
||||
%define obs_build_pkg build
|
||||
%define ssh_keygen_pkg openssh-common
|
||||
%define sphinx_pkg %{use_python_pkg}-Sphinx
|
||||
%define ruamel_yaml_pkg %{use_python_pkg}-ruamel.yaml
|
||||
%endif
|
||||
|
||||
Name: osc
|
||||
@ -85,6 +87,7 @@ BuildRequires: git-core
|
||||
Requires: %{use_python_pkg}-cryptography
|
||||
Requires: %{use_python_pkg}-rpm
|
||||
Requires: %{use_python_pkg}-urllib3
|
||||
Requires: %{ruamel_yaml_pkg}
|
||||
|
||||
# needed for showing download progressbars
|
||||
Recommends: %{use_python_pkg}-progressbar
|
||||
|
12
osc/gitea_api/__init__.py
Normal file
12
osc/gitea_api/__init__.py
Normal file
@ -0,0 +1,12 @@
|
||||
from .connection import Connection
|
||||
from .exceptions import BranchDoesNotExist
|
||||
from .exceptions import BranchExists
|
||||
from .exceptions import ForkExists
|
||||
from .exceptions import GiteaException
|
||||
from .branch import Branch
|
||||
from .conf import Config
|
||||
from .conf import Login
|
||||
from .fork import Fork
|
||||
from .ssh_key import SSHKey
|
||||
from .repo import Repo
|
||||
from .user import User
|
86
osc/gitea_api/branch.py
Normal file
86
osc/gitea_api/branch.py
Normal file
@ -0,0 +1,86 @@
|
||||
from typing import Optional
|
||||
|
||||
from .connection import Connection
|
||||
from .connection import GiteaHTTPResponse
|
||||
from .exceptions import BranchDoesNotExist
|
||||
from .exceptions import BranchExists
|
||||
from .exceptions import GiteaException
|
||||
|
||||
|
||||
class Branch:
|
||||
@classmethod
|
||||
def get(
|
||||
cls,
|
||||
conn: Connection,
|
||||
owner: str,
|
||||
repo: str,
|
||||
branch: str,
|
||||
) -> GiteaHTTPResponse:
|
||||
"""
|
||||
Retrieve details about a repository branch.
|
||||
|
||||
:param conn: Gitea ``Connection`` instance.
|
||||
:param owner: Owner of the repo.
|
||||
:param repo: Name of the repo.
|
||||
:param branch: Name of the branch.
|
||||
"""
|
||||
url = conn.makeurl("repos", owner, repo, "branches", branch)
|
||||
try:
|
||||
return conn.request("GET", url)
|
||||
except GiteaException as e:
|
||||
if e.status == 404:
|
||||
raise BranchDoesNotExist(e.response, owner, repo, branch) from None
|
||||
raise
|
||||
|
||||
@classmethod
|
||||
def list(
|
||||
cls,
|
||||
conn: Connection,
|
||||
owner: str,
|
||||
repo: str,
|
||||
) -> GiteaHTTPResponse:
|
||||
"""
|
||||
Retrieve details about all repository branches.
|
||||
|
||||
:param conn: Gitea ``Connection`` instance.
|
||||
:param owner: Owner of the repo.
|
||||
:param repo: Name of the repo.
|
||||
"""
|
||||
url = conn.makeurl("repos", owner, repo, "branches")
|
||||
# XXX: returns 'null' when there are no branches; an empty list would be a better API
|
||||
return conn.request("GET", url)
|
||||
|
||||
@classmethod
|
||||
def create(
|
||||
cls,
|
||||
conn: Connection,
|
||||
owner: str,
|
||||
repo: str,
|
||||
*,
|
||||
old_ref_name: Optional[str] = None,
|
||||
new_branch_name: str,
|
||||
exist_ok: bool = False,
|
||||
) -> GiteaHTTPResponse:
|
||||
"""
|
||||
Create a new branch in a repository.
|
||||
|
||||
:param conn: Gitea ``Connection`` instance.
|
||||
:param owner: Owner of the repo.
|
||||
:param repo: Name of the repo.
|
||||
:param old_ref_name: Name of the old branch/tag/commit to create from.
|
||||
:param new_branch_name: Name of the branch to create.
|
||||
:param exist_ok: A ``BranchExists`` exception is raised when the target exists. Set to ``True`` to avoid throwing the exception.
|
||||
"""
|
||||
json_data = {
|
||||
"new_branch_name": new_branch_name,
|
||||
"old_ref_name": old_ref_name,
|
||||
}
|
||||
url = conn.makeurl("repos", owner, repo, "branches")
|
||||
try:
|
||||
return conn.request("POST", url, json_data=json_data)
|
||||
except GiteaException as e:
|
||||
if e.status == 409:
|
||||
if exist_ok:
|
||||
return cls.get(conn, owner, repo, new_branch_name)
|
||||
raise BranchExists(e.response, owner, repo, new_branch_name) from None
|
||||
raise
|
183
osc/gitea_api/conf.py
Normal file
183
osc/gitea_api/conf.py
Normal file
@ -0,0 +1,183 @@
|
||||
import io
|
||||
import os
|
||||
from typing import Any
|
||||
from typing import Dict
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
|
||||
import ruamel.yaml
|
||||
|
||||
from osc import oscerr
|
||||
from osc.util.models import BaseModel
|
||||
from osc.util.models import Field
|
||||
|
||||
|
||||
class Login(BaseModel):
|
||||
name: str = Field() # type: ignore[assignment]
|
||||
url: str = Field() # type: ignore[assignment]
|
||||
user: str = Field() # type: ignore[assignment]
|
||||
token: str = Field() # type: ignore[assignment]
|
||||
ssh_key: Optional[str] = Field() # type: ignore[assignment]
|
||||
default: Optional[bool] = Field() # type: ignore[assignment]
|
||||
|
||||
class AlreadyExists(oscerr.OscBaseError):
|
||||
def __init__(self, name):
|
||||
super().__init__()
|
||||
self.name = name
|
||||
|
||||
def __str__(self):
|
||||
return f"Gitea config entry with name '{self.name}' already exists"
|
||||
|
||||
class DoesNotExist(oscerr.OscBaseError):
|
||||
def __init__(self, **kwargs):
|
||||
super().__init__()
|
||||
self.kwargs = kwargs
|
||||
|
||||
def __str__(self):
|
||||
if self.kwargs == {"name": None}:
|
||||
return "Could not find a default Gitea config entry"
|
||||
kwargs_str = ", ".join([f"{key}={value}" for key, value in self.kwargs.items()])
|
||||
return f"Could not find a matching Gitea config entry: {kwargs_str}"
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
# ignore extra fields
|
||||
for key in list(kwargs):
|
||||
if key not in self.__fields__:
|
||||
kwargs.pop(key, None)
|
||||
super().__init__(**kwargs)
|
||||
|
||||
def to_human_readable_string(self, *, show_token: bool = False):
|
||||
from osc.output import KeyValueTable
|
||||
|
||||
table = KeyValueTable()
|
||||
table.add("Name", self.name, color="bold")
|
||||
if self.default:
|
||||
table.add("Default", "true", color="bold")
|
||||
table.add("URL", self.url)
|
||||
table.add("User", self.user)
|
||||
if self.ssh_key:
|
||||
table.add("SSH Key", self.ssh_key)
|
||||
if show_token:
|
||||
# tokens are stored in the plain text, there's not reason to protect them too much
|
||||
# let's only hide them from the output by default
|
||||
table.add("Token", self.token)
|
||||
return f"{table}"
|
||||
|
||||
|
||||
class Config:
|
||||
"""
|
||||
Manage the tea config.yml file.
|
||||
No data is cached in the objects, all changes are in sync with the file on disk.
|
||||
"""
|
||||
|
||||
def __init__(self, path: Optional[str] = None):
|
||||
if not path:
|
||||
path = os.path.expanduser("~/.config/tea/config.yml")
|
||||
self.path = os.path.abspath(path)
|
||||
|
||||
self.logins: List[Login] = []
|
||||
|
||||
def _read(self) -> Dict[str, Any]:
|
||||
try:
|
||||
with open(self.path, "r") as f:
|
||||
yaml = ruamel.yaml.YAML()
|
||||
return yaml.load(f)
|
||||
except FileNotFoundError:
|
||||
return {}
|
||||
|
||||
def _write(self, data):
|
||||
yaml = ruamel.yaml.YAML()
|
||||
yaml.default_flow_style = False
|
||||
buf = io.StringIO()
|
||||
yaml.dump(data, buf)
|
||||
buf.seek(0)
|
||||
text = buf.read()
|
||||
|
||||
os.makedirs(os.path.dirname(self.path), mode=0o700, exist_ok=True)
|
||||
with open(self.path, "w") as f:
|
||||
f.write(text)
|
||||
|
||||
def list_logins(self) -> List[Login]:
|
||||
data = self._read()
|
||||
result = []
|
||||
for i in data.get("logins", []):
|
||||
login = Login(**i)
|
||||
result.append(login)
|
||||
return result
|
||||
|
||||
def get_login(self, name: Optional[str] = None) -> Login:
|
||||
"""
|
||||
Return ``Login`` object for the given ``name``.
|
||||
If ``name`` equals to ``None``, return the default ``Login``.
|
||||
"""
|
||||
for login in self.list_logins():
|
||||
if name is None and login.default:
|
||||
return login
|
||||
if login.name == name:
|
||||
return login
|
||||
raise Login.DoesNotExist(name=name)
|
||||
|
||||
def get_login_by_url_user(self, url: str, user: str) -> Login:
|
||||
"""
|
||||
Return ``Login`` object for the given ``url`` and ``user``.
|
||||
"""
|
||||
for login in self.list_logins():
|
||||
if (login.url, login.user) == (url, user):
|
||||
return login
|
||||
raise Login.DoesNotExist(url=url, user=user)
|
||||
|
||||
def add_login(self, login: Login):
|
||||
data = self._read()
|
||||
# print("DDD", data)
|
||||
data.setdefault("logins", [])
|
||||
for i in data["logins"]:
|
||||
if i.get("name", None) == login.name:
|
||||
raise Login.AlreadyExists(login.name)
|
||||
data["logins"].append(login.dict())
|
||||
self._write(data)
|
||||
|
||||
def remove_login(self, name: str) -> Login:
|
||||
# throw an exception if the login name doesn't exist
|
||||
login = self.get_login(name)
|
||||
|
||||
data = self._read()
|
||||
for num, entry in enumerate(list(data["logins"])):
|
||||
if entry.get("name", None) == login.name:
|
||||
data["logins"].pop(num)
|
||||
self._write(data)
|
||||
return login
|
||||
|
||||
def update_login(
|
||||
self,
|
||||
name: str,
|
||||
new_name: Optional[str] = None,
|
||||
new_url: Optional[str] = None,
|
||||
new_user: Optional[str] = None,
|
||||
new_token: Optional[str] = None,
|
||||
new_ssh_key: Optional[str] = None,
|
||||
set_as_default: Optional[bool] = None,
|
||||
) -> Login:
|
||||
login = self.get_login(name)
|
||||
|
||||
if new_name is not None:
|
||||
login.name = new_name
|
||||
if new_url is not None:
|
||||
login.url = new_url
|
||||
if new_user is not None:
|
||||
login.user = new_user
|
||||
if new_token is not None:
|
||||
login.token = new_token
|
||||
if new_ssh_key is not None:
|
||||
login.ssh_key = new_ssh_key
|
||||
|
||||
if not login.has_changed():
|
||||
return login
|
||||
|
||||
data = self._read()
|
||||
for num, entry in enumerate(data["logins"]):
|
||||
if entry.get("name", None) == name:
|
||||
data["logins"][num].update(login.dict())
|
||||
self._write(data)
|
||||
|
||||
return login
|
||||
# TODO: set_as_default
|
116
osc/gitea_api/connection.py
Normal file
116
osc/gitea_api/connection.py
Normal file
@ -0,0 +1,116 @@
|
||||
import copy
|
||||
import http.client
|
||||
import json
|
||||
import urllib.parse
|
||||
from typing import Optional
|
||||
|
||||
import urllib3
|
||||
import urllib3.response
|
||||
|
||||
from .conf import Login
|
||||
|
||||
|
||||
# TODO: retry, backoff, connection pool?
|
||||
|
||||
|
||||
class GiteaHTTPResponse:
|
||||
"""
|
||||
A ``urllib3.response.HTTPResponse`` wrapper
|
||||
that ensures compatibility with older versions of urllib3.
|
||||
"""
|
||||
|
||||
def __init__(self, response: urllib3.response.HTTPResponse):
|
||||
self.__dict__["_response"] = response
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self._response, name)
|
||||
|
||||
def json(self):
|
||||
if hasattr(self._response, "json"):
|
||||
return self._response.json()
|
||||
return json.loads(self._response.data)
|
||||
|
||||
|
||||
class Connection:
|
||||
def __init__(self, login: Login, alternative_port: Optional[int] = None):
|
||||
"""
|
||||
:param login: ``Login`` object with Gitea url and credentials.
|
||||
:param alternative_port: Use an alternative port for the connection. This is needed for testing when gitea runs on a random port.
|
||||
"""
|
||||
self.login = login
|
||||
|
||||
parsed_url = urllib.parse.urlparse(self.login.url, scheme="https")
|
||||
if parsed_url.scheme == "http":
|
||||
ConnectionClass = urllib3.connection.HTTPConnection
|
||||
elif parsed_url.scheme == "https":
|
||||
ConnectionClass = urllib3.connection.HTTPSConnection
|
||||
else:
|
||||
raise ValueError(f"Unsupported scheme in Gitea url '{self.login.url}'")
|
||||
|
||||
self.host = parsed_url.hostname
|
||||
assert self.host is not None
|
||||
self.port = alternative_port if alternative_port else parsed_url.port
|
||||
self.conn = ConnectionClass(host=self.host, port=self.port)
|
||||
|
||||
if hasattr(self.conn, "set_cert"):
|
||||
# needed to avoid: AttributeError: 'HTTPSConnection' object has no attribute 'assert_hostname'. Did you mean: 'server_hostname'?
|
||||
self.conn.set_cert()
|
||||
|
||||
def makeurl(self, *path: str, query: Optional[dict] = None):
|
||||
"""
|
||||
Return relative url prefixed with "/api/v1/" followed with concatenated ``*path``.
|
||||
"""
|
||||
url_path = ["", "api", "v1"] + [urllib.parse.quote(i, safe="/:") for i in path]
|
||||
url_path_str = "/".join(url_path)
|
||||
|
||||
if query is None:
|
||||
query = {}
|
||||
query = copy.deepcopy(query)
|
||||
|
||||
for key in list(query):
|
||||
value = query[key]
|
||||
|
||||
if value in (None, [], ()):
|
||||
# remove items with value equal to None or [] or ()
|
||||
del query[key]
|
||||
elif isinstance(value, bool):
|
||||
# convert boolean values to "0" or "1"
|
||||
query[key] = str(int(value))
|
||||
|
||||
url_query_str = urllib.parse.urlencode(query, doseq=True)
|
||||
return urllib.parse.urlunsplit(("", "", url_path_str, url_query_str, ""))
|
||||
|
||||
def request(self, method, url, json_data: Optional[dict] = None) -> GiteaHTTPResponse:
|
||||
"""
|
||||
Make a request and return ``GiteaHTTPResponse``.
|
||||
"""
|
||||
headers = {
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
if self.login.token:
|
||||
headers["Authorization"] = f"token {self.login.token}"
|
||||
|
||||
if json_data:
|
||||
json_data = dict(((key, value) for key, value in json_data.items() if value is not None))
|
||||
|
||||
body = json.dumps(json_data) if json_data else None
|
||||
|
||||
self.conn.request(method, url, body, headers)
|
||||
response = self.conn.getresponse()
|
||||
|
||||
if isinstance(response, http.client.HTTPResponse):
|
||||
result = GiteaHTTPResponse(urllib3.response.HTTPResponse.from_httplib(response))
|
||||
else:
|
||||
result = GiteaHTTPResponse(response)
|
||||
|
||||
if not hasattr(response, "status"):
|
||||
from .exceptions import GiteaException # pylint: disable=import-outside-toplevel,cyclic-import
|
||||
|
||||
raise GiteaException(result)
|
||||
|
||||
if response.status // 100 != 2:
|
||||
from .exceptions import GiteaException # pylint: disable=import-outside-toplevel,cyclic-import
|
||||
|
||||
raise GiteaException(result)
|
||||
|
||||
return result
|
64
osc/gitea_api/exceptions.py
Normal file
64
osc/gitea_api/exceptions.py
Normal file
@ -0,0 +1,64 @@
|
||||
import re
|
||||
|
||||
from .. import oscerr
|
||||
from .connection import GiteaHTTPResponse
|
||||
|
||||
|
||||
class GiteaException(oscerr.OscBaseError):
|
||||
def __init__(self, response: GiteaHTTPResponse):
|
||||
self.response = response
|
||||
|
||||
@property
|
||||
def status(self):
|
||||
return self.response.status
|
||||
|
||||
@property
|
||||
def reason(self):
|
||||
return self.response.reason
|
||||
|
||||
def __str__(self):
|
||||
result = f"{self.status} {self.reason}"
|
||||
if self.response.data:
|
||||
result += f": {self.response.data}"
|
||||
return result
|
||||
|
||||
|
||||
class BranchDoesNotExist(GiteaException):
|
||||
def __init__(self, response: GiteaHTTPResponse, owner: str, repo: str, branch: str):
|
||||
super().__init__(response)
|
||||
self.owner = owner
|
||||
self.repo = repo
|
||||
self.branch = branch
|
||||
|
||||
def __str__(self):
|
||||
result = f"Repo '{self.owner}/{self.repo}' does not contain branch '{self.branch}'"
|
||||
return result
|
||||
|
||||
|
||||
class BranchExists(GiteaException):
|
||||
def __init__(self, response: GiteaHTTPResponse, owner: str, repo: str, branch: str):
|
||||
super().__init__(response)
|
||||
self.owner = owner
|
||||
self.repo = repo
|
||||
self.branch = branch
|
||||
|
||||
def __str__(self):
|
||||
result = f"Repo '{self.owner}/{self.repo}' already contains branch '{self.branch}'"
|
||||
return result
|
||||
|
||||
|
||||
class ForkExists(GiteaException):
|
||||
def __init__(self, response: GiteaHTTPResponse, owner: str, repo: str):
|
||||
super().__init__(response)
|
||||
self.owner = owner
|
||||
self.repo = repo
|
||||
|
||||
regex = re.compile(r".*fork path: (?P<owner>[^/]+)/(?P<repo>[^\]]+)\].*")
|
||||
match = regex.match(self.response.json()["message"])
|
||||
assert match is not None
|
||||
self.fork_owner = match.groupdict()["owner"]
|
||||
self.fork_repo = match.groupdict()["repo"]
|
||||
|
||||
def __str__(self):
|
||||
result = f"Repo '{self.owner}/{self.repo}' is already forked as '{self.fork_owner}/{self.fork_repo}'"
|
||||
return result
|
65
osc/gitea_api/fork.py
Normal file
65
osc/gitea_api/fork.py
Normal file
@ -0,0 +1,65 @@
|
||||
from typing import Optional
|
||||
|
||||
from .connection import Connection
|
||||
from .connection import GiteaHTTPResponse
|
||||
from .exceptions import ForkExists
|
||||
from .exceptions import GiteaException
|
||||
|
||||
|
||||
class Fork:
|
||||
@classmethod
|
||||
def list(
|
||||
cls,
|
||||
conn: Connection,
|
||||
owner: str,
|
||||
repo: str,
|
||||
) -> GiteaHTTPResponse:
|
||||
"""
|
||||
List forks of a repository.
|
||||
|
||||
:param conn: Gitea ``Connection`` instance.
|
||||
:param owner: Owner of the repo.
|
||||
:param repo: Name of the repo.
|
||||
"""
|
||||
|
||||
url = conn.makeurl("repos", owner, repo, "forks")
|
||||
return conn.request("GET", url)
|
||||
|
||||
@classmethod
|
||||
def create(
|
||||
cls,
|
||||
conn: Connection,
|
||||
owner: str,
|
||||
repo: str,
|
||||
*,
|
||||
new_repo_name: Optional[str] = None,
|
||||
target_org: Optional[str] = None,
|
||||
exist_ok: bool = False,
|
||||
) -> GiteaHTTPResponse:
|
||||
"""
|
||||
Fork a repository.
|
||||
|
||||
:param conn: Gitea ``Connection`` instance.
|
||||
:param owner: Owner of the repo.
|
||||
:param repo: Name of the repo.
|
||||
:param new_repo_name: Name of the forked repository.
|
||||
:param target_org: Name of the organization, if forking into organization.
|
||||
:param exist_ok: A ``ForkExists`` exception is raised when the target exists. Set to ``True`` to avoid throwing the exception.
|
||||
"""
|
||||
|
||||
json_data = {
|
||||
"name": new_repo_name,
|
||||
"organization": target_org,
|
||||
}
|
||||
url = conn.makeurl("repos", owner, repo, "forks")
|
||||
try:
|
||||
return conn.request("POST", url, json_data=json_data)
|
||||
except GiteaException as e:
|
||||
# use ForkExists exception to parse fork_owner and fork_repo from the response
|
||||
fork_exists_exception = ForkExists(e.response, owner, repo)
|
||||
if e.status == 409:
|
||||
if exist_ok:
|
||||
from . import Repo
|
||||
return Repo.get(conn, fork_exists_exception.fork_owner, fork_exists_exception.fork_repo)
|
||||
raise fork_exists_exception from None
|
||||
raise
|
107
osc/gitea_api/repo.py
Normal file
107
osc/gitea_api/repo.py
Normal file
@ -0,0 +1,107 @@
|
||||
import os
|
||||
import subprocess
|
||||
from typing import Optional
|
||||
|
||||
from .connection import Connection
|
||||
from .connection import GiteaHTTPResponse
|
||||
from .exceptions import BranchDoesNotExist
|
||||
from .exceptions import BranchExists
|
||||
from .exceptions import ForkExists
|
||||
from .exceptions import GiteaException
|
||||
from .user import User
|
||||
|
||||
|
||||
class Repo:
|
||||
@classmethod
|
||||
def get(
|
||||
cls,
|
||||
conn: Connection,
|
||||
owner: str,
|
||||
repo: str,
|
||||
) -> GiteaHTTPResponse:
|
||||
"""
|
||||
Retrieve details about a repository.
|
||||
|
||||
:param conn: Gitea ``Connection`` instance.
|
||||
:param owner: Owner of the repo.
|
||||
:param repo: Name of the repo.
|
||||
"""
|
||||
url = conn.makeurl("repos", owner, repo)
|
||||
return conn.request("GET", url)
|
||||
|
||||
@classmethod
|
||||
def clone(
|
||||
cls,
|
||||
conn: Connection,
|
||||
owner: str,
|
||||
repo: str,
|
||||
*,
|
||||
directory: Optional[str] = None,
|
||||
cwd: Optional[str] = None,
|
||||
anonymous: bool = False,
|
||||
add_remotes: bool = False,
|
||||
ssh_private_key_path: Optional[str] = None,
|
||||
ssh_strict_host_key_checking: bool = True,
|
||||
) -> str:
|
||||
"""
|
||||
Clone a repository using 'git clone' command, return absolute path to it.
|
||||
|
||||
:param conn: Gitea ``Connection`` instance.
|
||||
:param owner: Owner of the repo.
|
||||
:param repo: Name of the repo.
|
||||
:param directory: The name of a new directory to clone into. Defaults to the repo name.
|
||||
:param cwd: Working directory. Defaults to the current working directory.
|
||||
:param anonymous: Whether to use``clone_url`` for an anonymous access or use authenticated ``ssh_url``.
|
||||
:param add_remotes: Determine and add 'parent' or 'fork' remotes to the cloned repo.
|
||||
"""
|
||||
import shlex
|
||||
|
||||
cwd = os.path.abspath(cwd) if cwd else os.getcwd()
|
||||
directory = directory if directory else repo
|
||||
# it's perfectly fine to use os.path.join() here because git can take an absolute path
|
||||
directory_abspath = os.path.join(cwd, directory)
|
||||
|
||||
repo_data = cls.get(conn, owner, repo).json()
|
||||
clone_url = repo_data["clone_url"] if anonymous else repo_data["ssh_url"]
|
||||
|
||||
remotes = {}
|
||||
if add_remotes:
|
||||
user = User.get(conn).json()
|
||||
if repo_data["owner"]["login"] == user["login"]:
|
||||
# we're cloning our own repo, setting remote to the parent (if exists)
|
||||
parent = repo_data["parent"]
|
||||
remotes["parent"] = parent["clone_url"] if anonymous else parent["ssh_url"]
|
||||
else:
|
||||
# we're cloning someone else's repo, setting remote to our fork (if exists)
|
||||
from . import Fork
|
||||
forks = Fork.list(conn, owner, repo).json()
|
||||
forks = [i for i in forks if i["owner"]["login"] == user["login"]]
|
||||
if forks:
|
||||
assert len(forks) == 1
|
||||
fork = forks[0]
|
||||
remotes["fork"] = fork["clone_url"] if anonymous else fork["ssh_url"]
|
||||
|
||||
env = os.environ.copy()
|
||||
ssh_args = []
|
||||
if ssh_private_key_path:
|
||||
ssh_args += [f"-i {shlex.quote(ssh_private_key_path)}"]
|
||||
if not ssh_strict_host_key_checking:
|
||||
ssh_args += [
|
||||
"-o StrictHostKeyChecking=no",
|
||||
"-o UserKnownHostsFile=/dev/null",
|
||||
"-o LogLevel=ERROR",
|
||||
]
|
||||
if ssh_args:
|
||||
env["GIT_SSH_COMMAND"] = f"ssh {' '.join(ssh_args)}"
|
||||
|
||||
# clone
|
||||
cmd = ["git", "clone", clone_url, directory]
|
||||
|
||||
subprocess.run(cmd, cwd=cwd, env=env, check=True)
|
||||
|
||||
# setup remotes
|
||||
for name, url in remotes.items():
|
||||
cmd = ["git", "-C", directory_abspath, "remote", "add", name, url]
|
||||
subprocess.run(cmd, cwd=cwd, check=True)
|
||||
|
||||
return directory_abspath
|
75
osc/gitea_api/ssh_key.py
Normal file
75
osc/gitea_api/ssh_key.py
Normal file
@ -0,0 +1,75 @@
|
||||
from typing import Optional
|
||||
|
||||
from .connection import Connection
|
||||
from .connection import GiteaHTTPResponse
|
||||
|
||||
|
||||
class SSHKey:
|
||||
@classmethod
|
||||
def get(cls, conn: Connection, id: int) -> GiteaHTTPResponse:
|
||||
"""
|
||||
Get an authenticated user's public key by its ``id``.
|
||||
|
||||
:param conn: Gitea ``Connection`` instance.
|
||||
:param id: key numeric id
|
||||
"""
|
||||
url = conn.makeurl("user", "keys", str(id))
|
||||
return conn.request("GET", url)
|
||||
|
||||
@classmethod
|
||||
def list(cls, conn: Connection) -> GiteaHTTPResponse:
|
||||
"""
|
||||
List the authenticated user's public keys.
|
||||
|
||||
:param conn: Gitea ``Connection`` instance.
|
||||
"""
|
||||
url = conn.makeurl("user", "keys")
|
||||
return conn.request("GET", url)
|
||||
|
||||
@classmethod
|
||||
def _split_key(cls, key):
|
||||
import re
|
||||
return re.split(" +", key, maxsplit=2)
|
||||
|
||||
@classmethod
|
||||
def create(cls, conn: Connection, key: str, title: Optional[str] = None) -> GiteaHTTPResponse:
|
||||
"""
|
||||
Create a public key.
|
||||
|
||||
:param conn: Gitea ``Connection`` instance.
|
||||
:param key: An armored SSH key to add.
|
||||
:param title: Title of the key to add. Derived from the key if not specified.
|
||||
"""
|
||||
url = conn.makeurl("user", "keys")
|
||||
|
||||
# TODO: validate that we're sending a public ssh key
|
||||
|
||||
if not title:
|
||||
title = cls._split_key(key)[2]
|
||||
|
||||
data = {
|
||||
"key": key,
|
||||
"title": title,
|
||||
}
|
||||
return conn.request("POST", url, json_data=data)
|
||||
|
||||
@classmethod
|
||||
def delete(cls, conn: Connection, id: int):
|
||||
"""
|
||||
Delete a public key
|
||||
|
||||
:param conn: Gitea ``Connection`` instance.
|
||||
:param id: Id of key to delete.
|
||||
"""
|
||||
|
||||
url = conn.makeurl("user", "keys", str(id))
|
||||
return conn.request("DELETE", url)
|
||||
|
||||
@classmethod
|
||||
def to_human_readable_string(cls, data):
|
||||
from osc.output import KeyValueTable
|
||||
table = KeyValueTable()
|
||||
table.add("ID", f"{data['id']}", color="bold")
|
||||
table.add("Title", f"{data['title']}")
|
||||
table.add("Key", f"{data['key']}")
|
||||
return str(table)
|
17
osc/gitea_api/user.py
Normal file
17
osc/gitea_api/user.py
Normal file
@ -0,0 +1,17 @@
|
||||
from .connection import Connection
|
||||
from .connection import GiteaHTTPResponse
|
||||
|
||||
|
||||
class User:
|
||||
@classmethod
|
||||
def get(
|
||||
cls,
|
||||
conn: Connection,
|
||||
) -> GiteaHTTPResponse:
|
||||
"""
|
||||
Retrieve details about the current user.
|
||||
|
||||
:param conn: Gitea ``Connection`` instance.
|
||||
"""
|
||||
url = conn.makeurl("user")
|
||||
return conn.request("GET", url)
|
@ -36,6 +36,7 @@ packages =
|
||||
osc._private
|
||||
osc.commands
|
||||
osc.git_scm
|
||||
osc.gitea_api
|
||||
osc.obs_api
|
||||
osc.obs_scm
|
||||
osc.output
|
||||
@ -44,6 +45,7 @@ install_requires =
|
||||
cryptography
|
||||
# rpm is not available on pip, install a matching package manually prior installing osc
|
||||
rpm
|
||||
ruamel.yaml
|
||||
urllib3
|
||||
|
||||
[options.extras_require]
|
||||
|
Loading…
Reference in New Issue
Block a user