mirror of
https://github.com/openSUSE/osc.git
synced 2025-07-11 11:48:48 +02:00
Extend 'osc build' and 'osc fork' to use project.build file from Gitea, 'osc fork' also follows the devel project now
221 lines
7.6 KiB
Python
221 lines
7.6 KiB
Python
import os
|
|
import re
|
|
import subprocess
|
|
import urllib
|
|
from typing import Iterator
|
|
from typing import List
|
|
from typing import Optional
|
|
from typing import Tuple
|
|
|
|
|
|
class Git:
|
|
@staticmethod
|
|
def urlparse(url) -> urllib.parse.ParseResult:
|
|
"""
|
|
Parse git url.
|
|
|
|
Supported formats:
|
|
- https://example.com/owner/repo.git
|
|
- https://example.com:1234/owner/repo.git
|
|
- example.com/owner/repo.git
|
|
- user@example.com:owner/repo.git
|
|
- user@example.com:1234:owner/repo.git"
|
|
"""
|
|
# try ssh clone url first
|
|
pattern = r"(?P<netloc>[^@:]+@[^@:]+(:[0-9]+)?):(?P<path>.+)"
|
|
match = re.match(pattern, url)
|
|
if match:
|
|
scheme = ""
|
|
netloc = match.groupdict()["netloc"]
|
|
path = match.groupdict()["path"]
|
|
params = ''
|
|
query = ''
|
|
fragment = ''
|
|
result = urllib.parse.ParseResult(scheme, netloc, path, params, query, fragment)
|
|
return result
|
|
|
|
result = urllib.parse.urlparse(url)
|
|
if not result.netloc:
|
|
# empty netloc is most likely an error, prepend and then discard scheme to trick urlparse()
|
|
result = urllib.parse.urlparse("https://" + url)
|
|
result = urllib.parse.ParseResult("", *list(result)[1:])
|
|
return result
|
|
|
|
def __init__(self, workdir):
|
|
self.abspath = os.path.abspath(workdir)
|
|
|
|
def _run_git(self, args: List[str], mute_stderr: bool = False) -> str:
|
|
# HACK: having 2 nearly identical commands is stupid, but it muted a mypy error
|
|
if mute_stderr:
|
|
return subprocess.check_output(["git"] + args, encoding="utf-8", cwd=self.abspath, stderr=subprocess.DEVNULL).strip()
|
|
return subprocess.check_output(["git"] + args, encoding="utf-8", cwd=self.abspath).strip()
|
|
|
|
def init(self, *, initial_branch: Optional[str] = None, quiet: bool = True, mute_stderr: bool = False):
|
|
cmd = ["init"]
|
|
if initial_branch:
|
|
cmd += ["-b", initial_branch]
|
|
if quiet:
|
|
cmd += ["-q"]
|
|
self._run_git(cmd, mute_stderr=mute_stderr)
|
|
|
|
def clone(self, url, directory: Optional[str] = None, quiet: bool = True):
|
|
cmd = ["clone", url]
|
|
if directory:
|
|
cmd += [directory]
|
|
if quiet:
|
|
cmd += ["-q"]
|
|
self._run_git(cmd)
|
|
|
|
# BRANCHES
|
|
|
|
@property
|
|
def current_branch(self) -> str:
|
|
return self._run_git(["branch", "--show-current"])
|
|
|
|
def get_branch_head(self, branch: str) -> str:
|
|
return self._run_git(["rev-parse", f"refs/heads/{branch}"])
|
|
|
|
def branch_exists(self, branch: str) -> bool:
|
|
try:
|
|
self._run_git(["rev-parse", f"refs/heads/{branch}", "--"], mute_stderr=True)
|
|
except subprocess.CalledProcessError:
|
|
return False
|
|
return True
|
|
|
|
def commit_count(self, branch: str) -> int:
|
|
try:
|
|
commits = self._run_git(["rev-list", "--count", f"refs/heads/{branch}", "--"], mute_stderr=True)
|
|
return int(commits)
|
|
except subprocess.CalledProcessError:
|
|
return -1
|
|
|
|
def switch(self, branch: str, orphan: bool = False):
|
|
cmd = ["switch"]
|
|
if orphan:
|
|
cmd += ["--orphan"]
|
|
cmd += [branch]
|
|
self._run_git(cmd)
|
|
|
|
def fetch_pull_request(
|
|
self,
|
|
pull_number: int,
|
|
*,
|
|
remote: str = "origin",
|
|
force: bool = False,
|
|
):
|
|
"""
|
|
Fetch pull/$pull_number/head to pull/$pull_number branch
|
|
"""
|
|
target_branch = f"pull/{pull_number}"
|
|
cmd = ["fetch", remote, f"pull/{pull_number}/head:{target_branch}"]
|
|
if force:
|
|
cmd += [
|
|
"--force",
|
|
"--update-head-ok",
|
|
]
|
|
self._run_git(cmd)
|
|
return target_branch
|
|
|
|
# CONFIG
|
|
|
|
def set_config(self, key: str, value: str):
|
|
self._run_git(["config", key, value])
|
|
|
|
# REMOTES
|
|
|
|
def get_remote_url(self, name: str = "origin") -> str:
|
|
return self._run_git(["remote", "get-url", name])
|
|
|
|
def add_remote(self, name: str, url: str):
|
|
self._run_git(["remote", "add", name, url])
|
|
|
|
def fetch(self, name: Optional[str] = None):
|
|
if name:
|
|
cmd = ["fetch", name]
|
|
else:
|
|
cmd = ["fetch", "--all"]
|
|
self._run_git(cmd)
|
|
|
|
def get_owner_repo(self, remote: str = "origin") -> Tuple[str, str]:
|
|
remote_url = self.get_remote_url(name=remote)
|
|
if "@" in remote_url:
|
|
# ssh://gitea@example.com:owner/repo.git
|
|
# ssh://gitea@example.com:22/owner/repo.git
|
|
remote_url = remote_url.rsplit("@", 1)[-1]
|
|
parsed_remote_url = urllib.parse.urlparse(remote_url)
|
|
path = parsed_remote_url.path
|
|
if path.endswith(".git"):
|
|
path = path[:-4]
|
|
owner, repo = path.strip("/").split("/")[-2:]
|
|
return owner, repo
|
|
|
|
# LFS
|
|
|
|
def lfs_ls_files(self, ref: str = "HEAD") -> List[Tuple[str, str]]:
|
|
# TODO: --size; returns human readable string; can we somehow get the exact value in bytes instead?
|
|
out = self._run_git(["lfs", "ls-files", "--long", ref])
|
|
regex = re.compile(r"^(?P<checksum>[0-9a-f]+) [\*\-] (?P<filename>.*)$")
|
|
result = []
|
|
for line in out.splitlines():
|
|
match = regex.match(line)
|
|
if not match:
|
|
continue
|
|
result.append((match.group(2), match.group(1)))
|
|
return result
|
|
|
|
def lfs_cat_file(self, filename: str, ref: str = "HEAD"):
|
|
"""
|
|
A generator function that returns chunks of bytes of the requested file.
|
|
"""
|
|
with subprocess.Popen(["git", "cat-file", "--filters", f"{ref}:{filename}"], stdout=subprocess.PIPE, cwd=self.abspath) as proc:
|
|
assert proc.stdout is not None
|
|
while True:
|
|
# 1MiB chunks are probably a good balance between memory consumption and performance
|
|
data = proc.stdout.read(1024**2)
|
|
if not data:
|
|
break
|
|
yield data
|
|
|
|
# FILES
|
|
|
|
def add(self, files: List[str]):
|
|
self._run_git(["add", *files])
|
|
|
|
def commit(self, msg, *, allow_empty: bool = False):
|
|
cmd = ["commit", "-m", msg]
|
|
if allow_empty:
|
|
cmd += ["--allow-empty"]
|
|
self._run_git(cmd)
|
|
|
|
def diff(self, ref_old: str, ref_new: str, src_prefix: Optional[str] = None, dst_prefix: Optional[str] = None) -> Iterator[bytes]:
|
|
cmd = ["git", "diff", ref_old, ref_new]
|
|
|
|
if src_prefix:
|
|
src_prefix = src_prefix.rstrip("/") + "/"
|
|
cmd += [f"--src-prefix={src_prefix}"]
|
|
|
|
if dst_prefix:
|
|
dst_prefix = dst_prefix.rstrip("/") + "/"
|
|
cmd += [f"--dst-prefix={dst_prefix}"]
|
|
|
|
# 1MiB chunks are probably a good balance between memory consumption and performance
|
|
chunk_size = 1024**2
|
|
with subprocess.Popen(cmd, stdout=subprocess.PIPE, cwd=self.abspath) as proc:
|
|
assert proc.stdout is not None
|
|
while True:
|
|
# read a chunk of data, make sure it ends with a newline
|
|
# so we don't have to deal with split utf-8 characters and incomplete escape sequences later
|
|
chunk = proc.stdout.read(chunk_size)
|
|
chunk += proc.stdout.readline()
|
|
if not chunk:
|
|
break
|
|
yield chunk
|
|
|
|
def status(self, *, porcelain: bool = False, untracked_files: bool = False):
|
|
cmd = ["status", "--renames"]
|
|
if untracked_files:
|
|
cmd += ["--untracked-files"]
|
|
if porcelain:
|
|
cmd += ["--porcelain"]
|
|
return self._run_git(cmd)
|