1
0
mirror of https://github.com/openSUSE/osc.git synced 2025-07-11 11:48:48 +02:00
Files
Daniel Mach 9779c6cc4f Merge pull request #1760 from dmach/osc-build-project-from-_ObsPrj
Extend 'osc build' and 'osc fork' to use project.build file from Gitea, 'osc fork' also follows the devel project now
2025-04-02 15:06:27 +02:00

221 lines
7.6 KiB
Python

import os
import re
import subprocess
import urllib
from typing import Iterator
from typing import List
from typing import Optional
from typing import Tuple
class Git:
@staticmethod
def urlparse(url) -> urllib.parse.ParseResult:
"""
Parse git url.
Supported formats:
- https://example.com/owner/repo.git
- https://example.com:1234/owner/repo.git
- example.com/owner/repo.git
- user@example.com:owner/repo.git
- user@example.com:1234:owner/repo.git"
"""
# try ssh clone url first
pattern = r"(?P<netloc>[^@:]+@[^@:]+(:[0-9]+)?):(?P<path>.+)"
match = re.match(pattern, url)
if match:
scheme = ""
netloc = match.groupdict()["netloc"]
path = match.groupdict()["path"]
params = ''
query = ''
fragment = ''
result = urllib.parse.ParseResult(scheme, netloc, path, params, query, fragment)
return result
result = urllib.parse.urlparse(url)
if not result.netloc:
# empty netloc is most likely an error, prepend and then discard scheme to trick urlparse()
result = urllib.parse.urlparse("https://" + url)
result = urllib.parse.ParseResult("", *list(result)[1:])
return result
def __init__(self, workdir):
self.abspath = os.path.abspath(workdir)
def _run_git(self, args: List[str], mute_stderr: bool = False) -> str:
# HACK: having 2 nearly identical commands is stupid, but it muted a mypy error
if mute_stderr:
return subprocess.check_output(["git"] + args, encoding="utf-8", cwd=self.abspath, stderr=subprocess.DEVNULL).strip()
return subprocess.check_output(["git"] + args, encoding="utf-8", cwd=self.abspath).strip()
def init(self, *, initial_branch: Optional[str] = None, quiet: bool = True, mute_stderr: bool = False):
cmd = ["init"]
if initial_branch:
cmd += ["-b", initial_branch]
if quiet:
cmd += ["-q"]
self._run_git(cmd, mute_stderr=mute_stderr)
def clone(self, url, directory: Optional[str] = None, quiet: bool = True):
cmd = ["clone", url]
if directory:
cmd += [directory]
if quiet:
cmd += ["-q"]
self._run_git(cmd)
# BRANCHES
@property
def current_branch(self) -> str:
return self._run_git(["branch", "--show-current"])
def get_branch_head(self, branch: str) -> str:
return self._run_git(["rev-parse", f"refs/heads/{branch}"])
def branch_exists(self, branch: str) -> bool:
try:
self._run_git(["rev-parse", f"refs/heads/{branch}", "--"], mute_stderr=True)
except subprocess.CalledProcessError:
return False
return True
def commit_count(self, branch: str) -> int:
try:
commits = self._run_git(["rev-list", "--count", f"refs/heads/{branch}", "--"], mute_stderr=True)
return int(commits)
except subprocess.CalledProcessError:
return -1
def switch(self, branch: str, orphan: bool = False):
cmd = ["switch"]
if orphan:
cmd += ["--orphan"]
cmd += [branch]
self._run_git(cmd)
def fetch_pull_request(
self,
pull_number: int,
*,
remote: str = "origin",
force: bool = False,
):
"""
Fetch pull/$pull_number/head to pull/$pull_number branch
"""
target_branch = f"pull/{pull_number}"
cmd = ["fetch", remote, f"pull/{pull_number}/head:{target_branch}"]
if force:
cmd += [
"--force",
"--update-head-ok",
]
self._run_git(cmd)
return target_branch
# CONFIG
def set_config(self, key: str, value: str):
self._run_git(["config", key, value])
# REMOTES
def get_remote_url(self, name: str = "origin") -> str:
return self._run_git(["remote", "get-url", name])
def add_remote(self, name: str, url: str):
self._run_git(["remote", "add", name, url])
def fetch(self, name: Optional[str] = None):
if name:
cmd = ["fetch", name]
else:
cmd = ["fetch", "--all"]
self._run_git(cmd)
def get_owner_repo(self, remote: str = "origin") -> Tuple[str, str]:
remote_url = self.get_remote_url(name=remote)
if "@" in remote_url:
# ssh://gitea@example.com:owner/repo.git
# ssh://gitea@example.com:22/owner/repo.git
remote_url = remote_url.rsplit("@", 1)[-1]
parsed_remote_url = urllib.parse.urlparse(remote_url)
path = parsed_remote_url.path
if path.endswith(".git"):
path = path[:-4]
owner, repo = path.strip("/").split("/")[-2:]
return owner, repo
# LFS
def lfs_ls_files(self, ref: str = "HEAD") -> List[Tuple[str, str]]:
# TODO: --size; returns human readable string; can we somehow get the exact value in bytes instead?
out = self._run_git(["lfs", "ls-files", "--long", ref])
regex = re.compile(r"^(?P<checksum>[0-9a-f]+) [\*\-] (?P<filename>.*)$")
result = []
for line in out.splitlines():
match = regex.match(line)
if not match:
continue
result.append((match.group(2), match.group(1)))
return result
def lfs_cat_file(self, filename: str, ref: str = "HEAD"):
"""
A generator function that returns chunks of bytes of the requested file.
"""
with subprocess.Popen(["git", "cat-file", "--filters", f"{ref}:{filename}"], stdout=subprocess.PIPE, cwd=self.abspath) as proc:
assert proc.stdout is not None
while True:
# 1MiB chunks are probably a good balance between memory consumption and performance
data = proc.stdout.read(1024**2)
if not data:
break
yield data
# FILES
def add(self, files: List[str]):
self._run_git(["add", *files])
def commit(self, msg, *, allow_empty: bool = False):
cmd = ["commit", "-m", msg]
if allow_empty:
cmd += ["--allow-empty"]
self._run_git(cmd)
def diff(self, ref_old: str, ref_new: str, src_prefix: Optional[str] = None, dst_prefix: Optional[str] = None) -> Iterator[bytes]:
cmd = ["git", "diff", ref_old, ref_new]
if src_prefix:
src_prefix = src_prefix.rstrip("/") + "/"
cmd += [f"--src-prefix={src_prefix}"]
if dst_prefix:
dst_prefix = dst_prefix.rstrip("/") + "/"
cmd += [f"--dst-prefix={dst_prefix}"]
# 1MiB chunks are probably a good balance between memory consumption and performance
chunk_size = 1024**2
with subprocess.Popen(cmd, stdout=subprocess.PIPE, cwd=self.abspath) as proc:
assert proc.stdout is not None
while True:
# read a chunk of data, make sure it ends with a newline
# so we don't have to deal with split utf-8 characters and incomplete escape sequences later
chunk = proc.stdout.read(chunk_size)
chunk += proc.stdout.readline()
if not chunk:
break
yield chunk
def status(self, *, porcelain: bool = False, untracked_files: bool = False):
cmd = ["status", "--renames"]
if untracked_files:
cmd += ["--untracked-files"]
if porcelain:
cmd += ["--porcelain"]
return self._run_git(cmd)