1
0
mirror of https://github.com/openSUSE/osc.git synced 2025-10-20 07:32:52 +02:00
Files

444 lines
15 KiB
Python

import os
import re
import subprocess
import urllib
from typing import Dict
from typing import Iterator
from typing import List
from typing import Optional
from typing import Tuple
from . import exceptions
class SshParseResult(urllib.parse.ParseResult):
"""
Class to distinguish parsed SSH URLs
"""
class Git:
@staticmethod
def urlparse(url: str) -> urllib.parse.ParseResult:
"""
Parse git url.
Supported formats:
- https://example.com/owner/repo.git
- https://example.com:1234/owner/repo.git
- example.com/owner/repo.git
- user@example.com:owner/repo.git
- user@example.com:1234:owner/repo.git"
"""
# try ssh clone url first
pattern = r"(?P<netloc>[^@:]+@[^@:]+(:[0-9]+)?):(?P<path>.+)"
match = re.match(pattern, url)
if match:
scheme = ""
netloc = match.groupdict()["netloc"]
path = match.groupdict()["path"]
params = ''
query = ''
fragment = ''
result = SshParseResult(scheme, netloc, path, params, query, fragment)
return result
result = urllib.parse.urlparse(url)
if not result.netloc:
# empty netloc is most likely an error, prepend and then discard scheme to trick urlparse()
result = urllib.parse.urlparse("https://" + url)
result = urllib.parse.ParseResult("", *list(result)[1:])
return result
@staticmethod
def urljoin(url: str, path: str) -> str:
"""
Append ``path`` to ``url``.
"""
parts = Git.urlparse(url)
# we're using os.path.normpath() and os.path.join() for working with URL paths, which may not be ideal, but seems to be working fine (on Linux)
# we need to remove leading forward slash from ``parts.path`` because ``os.path.normpath("/../")`` resolves to "/" and we don't want that
new_path = os.path.normpath(os.path.join(parts.path.lstrip("/"), path.lstrip("/")))
parts = parts._replace(path=new_path)
if isinstance(parts, SshParseResult):
new_url = f"{parts.netloc}:{parts.path}"
else:
new_url = urllib.parse.urlunparse(parts)
if new_path.startswith("../") or "/../" in new_path:
raise ValueError(f"URL must not contain relative path: {new_url}")
return new_url
def __init__(self, workdir):
self.abspath = os.path.abspath(workdir)
def _run_git(self, args: List[str], mute_stderr: bool = False) -> str:
# HACK: having 2 nearly identical commands is stupid, but it muted a mypy error
if mute_stderr:
return subprocess.check_output(["git"] + args, encoding="utf-8", cwd=self.abspath, stderr=subprocess.DEVNULL).strip()
return subprocess.check_output(["git"] + args, encoding="utf-8", cwd=self.abspath).strip()
@property
def topdir(self) -> Optional[str]:
"""
A custom implementation to `git rev-parse --show-toplevel` to avoid executing git which is sometimes unnecessary expensive.
"""
path = self.abspath
while path:
if os.path.exists(os.path.join(path, ".git")):
break
path, dirname = os.path.split(path)
if (path, dirname) == ("/", ""):
# no git repo found
return None
return path
def init(self, *, initial_branch: Optional[str] = None, quiet: bool = True, mute_stderr: bool = False):
cmd = ["init"]
if initial_branch:
cmd += ["-b", initial_branch]
if quiet:
cmd += ["-q"]
self._run_git(cmd, mute_stderr=mute_stderr)
def clone(self,
url: str,
*,
directory: Optional[str] = None,
reference: Optional[str] = None,
reference_if_able: Optional[str] = None,
quiet: bool = True
):
cmd = ["clone", url]
if directory:
cmd += [directory]
if reference:
cmd += ["--reference", reference]
if reference_if_able:
cmd += ["--reference-if-able", reference_if_able]
if quiet:
cmd += ["-q"]
self._run_git(cmd)
# BRANCHES
@property
def current_branch(self) -> Optional[str]:
try:
return self._run_git(["branch", "--show-current"], mute_stderr=True)
except subprocess.CalledProcessError:
return None
def branch_contains_commit(self, commit: str, branch: Optional[str] = None, remote: Optional[str] = None) -> bool:
if not branch:
branch = self.current_branch
if remote:
try:
self._run_git(["merge-base", "--is-ancestor", commit, f"{remote}/{branch}"], mute_stderr=True)
return True
except subprocess.CalledProcessError:
return False
try:
stdout = self._run_git(["branch", branch, "--contains", commit, "--format", "%(objectname) %(objecttype) %(refname)"])
return stdout.strip() == f"{commit} commit refs/heads/{branch}"
except subprocess.CalledProcessError:
return False
def get_branch_head(self, branch: Optional[str] = None) -> str:
if not branch:
branch = self.current_branch
try:
return self._run_git(["rev-parse", f"refs/heads/{branch}"], mute_stderr=True)
except subprocess.CalledProcessError:
raise exceptions.GitObsRuntimeError(f"Unable to retrieve HEAD from branch '{branch}'. Does the branch exist?")
def branch_exists(self, branch: str) -> bool:
try:
self._run_git(["rev-parse", f"refs/heads/{branch}", "--"], mute_stderr=True)
except subprocess.CalledProcessError:
return False
return True
def commit_count(self, branch: str) -> int:
try:
commits = self._run_git(["rev-list", "--count", f"refs/heads/{branch}", "--"], mute_stderr=True)
return int(commits)
except subprocess.CalledProcessError:
return -1
def reset(self, commit: Optional[str] = None, *, hard: bool = False):
cmd = ["reset"]
if commit:
cmd += [commit]
if hard:
cmd += ["--hard"]
self._run_git(cmd)
def switch(self, branch: str, *, orphan: bool = False, quiet: bool = False):
cmd = ["switch"]
if quiet:
cmd += ["--quiet"]
if orphan:
cmd += ["--orphan"]
cmd += [branch]
self._run_git(cmd)
def fetch_pull_request(
self,
pull_number: int,
*,
remote: Optional[str] = None,
commit: Optional[str] = None,
force: bool = False,
):
"""
Fetch pull/$pull_number/head to pull/$pull_number branch
"""
target_branch = f"pull/{pull_number}"
# if the branch exists and the head matches the expected commit, skip running 'git fetch'
if commit and self.branch_exists(target_branch) and self.get_branch_head(target_branch) == commit:
return target_branch
if not remote:
remote = self.get_current_remote()
cmd = ["fetch", remote, f"pull/{pull_number}/head:{target_branch}"]
if force:
cmd += [
"--force",
"--update-head-ok",
]
self._run_git(cmd)
return target_branch
# CONFIG
def set_config(self, key: str, value: str):
self._run_git(["config", key, value])
# REMOTES
def get_remote_url(self, name: Optional[str] = None) -> Optional[str]:
if not name:
name = self.get_current_remote()
if not name:
return None
return self._run_git(["remote", "get-url", name])
def add_remote(self, name: str, url: str):
self._run_git(["remote", "add", name, url])
def get_current_remote(self, fallback_to_origin: bool = True) -> Optional[str]:
result = None
try:
result = self._run_git(["rev-parse", "--abbrev-ref", "@{u}"], mute_stderr=True)
if result:
result = result.split("/")[0]
except subprocess.CalledProcessError:
pass
# the tracking information isn't sometimes set
# let's fall back to 'origin' if available
if not result and fallback_to_origin:
try:
self._run_git(["remote", "get-url", "origin"], mute_stderr=True)
result = "origin"
except subprocess.CalledProcessError:
pass
return result
def fetch(self, name: Optional[str] = None):
if name:
cmd = ["fetch", name]
else:
cmd = ["fetch", "--all"]
self._run_git(cmd)
def get_owner_repo(self, remote: Optional[str] = None) -> Tuple[str, str]:
remote_url = self.get_remote_url(name=remote)
if not remote_url:
raise exceptions.GitObsRuntimeError("Couldn't determine owner and repo due to a missing remote")
return self.get_owner_repo_from_url(remote_url)
@staticmethod
def get_owner_repo_from_url(url: str) -> Tuple[str, str]:
if "@" in url:
# ssh://gitea@example.com:owner/repo.git
# ssh://gitea@example.com:22/owner/repo.git
url = url.rsplit("@", 1)[-1]
parsed_url = urllib.parse.urlparse(url)
path = parsed_url.path
if path.endswith(".git"):
path = path[:-4]
owner, repo = path.strip("/").split("/")[-2:]
return owner, repo
# LFS
def lfs_ls_files(self, ref: str = "HEAD", suffixes: Optional[List[str]] = None) -> Dict[str, str]:
# TODO: --size; returns human readable string; can we somehow get the exact value in bytes instead?
out = self._run_git(["lfs", "ls-files", "--long", ref])
regex = re.compile(r"^(?P<checksum>[0-9a-f]+) [\*\-] (?P<path>.*)$")
result = {}
for line in out.splitlines():
match = regex.match(line)
if not match:
continue
checksum = match.groupdict()["checksum"]
path = match.groupdict()["path"]
if suffixes:
found = False
for suffix in suffixes:
if path.endswith(suffix):
found = True
break
if not found:
continue
result[path] = checksum
return result
def lfs_cat_file(self, filename: str, ref: str = "HEAD"):
"""
A generator function that returns chunks of bytes of the requested file.
"""
with subprocess.Popen(["git", "cat-file", "--filters", f"{ref}:{filename}"], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, cwd=self.abspath) as proc:
assert proc.stdout is not None
while True:
# 1MiB chunks are probably a good balance between memory consumption and performance
data = proc.stdout.read(1024**2)
if not data:
break
yield data
# FILES
def add(self, files: List[str]):
self._run_git(["add", *files])
def commit(self, msg, *, allow_empty: bool = False):
cmd = ["commit", "-m", msg]
if allow_empty:
cmd += ["--allow-empty"]
self._run_git(cmd)
def ls_files(self, ref: str = "HEAD", suffixes: Optional[List[str]] = None) -> Dict[str, str]:
out = self._run_git(["ls-tree", "-r", "--format=%(objectname) %(path)", ref])
regex = re.compile(r"^(?P<checksum>[0-9a-f]+) (?P<path>.*)$")
result = {}
for line in out.splitlines():
match = regex.match(line)
if not match:
continue
checksum = match.groupdict()["checksum"]
path = match.groupdict()["path"]
if suffixes:
found = False
for suffix in suffixes:
if path.endswith(suffix):
found = True
break
if not found:
continue
result[path] = checksum
return result
def diff(self, ref_old: str, ref_new: str, src_prefix: Optional[str] = None, dst_prefix: Optional[str] = None) -> Iterator[bytes]:
cmd = ["git", "diff", ref_old, ref_new]
if src_prefix:
src_prefix = src_prefix.rstrip("/") + "/"
cmd += [f"--src-prefix={src_prefix}"]
if dst_prefix:
dst_prefix = dst_prefix.rstrip("/") + "/"
cmd += [f"--dst-prefix={dst_prefix}"]
# 1MiB chunks are probably a good balance between memory consumption and performance
chunk_size = 1024**2
with subprocess.Popen(cmd, stdout=subprocess.PIPE, cwd=self.abspath) as proc:
assert proc.stdout is not None
while True:
# read a chunk of data, make sure it ends with a newline
# so we don't have to deal with split utf-8 characters and incomplete escape sequences later
chunk = proc.stdout.read(chunk_size)
chunk += proc.stdout.readline()
if not chunk:
break
yield chunk
def status(self, *, porcelain: bool = False, untracked_files: bool = False):
cmd = ["status", "--renames"]
if untracked_files:
cmd += ["--untracked-files"]
if porcelain:
cmd += ["--porcelain"]
return self._run_git(cmd)
# SUBMODULES
def get_submodules(self) -> dict:
SUBMODULE_RE = re.compile(r"^submodule\.(?P<submodule>[^=]*)\.(?P<key>[^\.=]*)=(?P<value>.*)$")
STATUS_RE = re.compile(r"^.(?P<commit>[a-f0-9]+) (?P<submodule>[^ ]+).*$")
result = {}
try:
lines = self._run_git(["config", "--blob", "HEAD:.gitmodules", "--list"], mute_stderr=True).splitlines()
except subprocess.CalledProcessError:
# .gitmodules file is missing
return {}
for line in lines:
match = SUBMODULE_RE.match(line)
if not match:
continue
submodule = match.groupdict()["submodule"]
key = match.groupdict()["key"]
value = match.groupdict()["value"]
#if key == "url":
# assert value.startswith("../../")
submodule_entry = result.setdefault(submodule, {})
submodule_entry[key] = value
lines = self._run_git(["submodule", "status"]).splitlines()
for line in lines:
match = STATUS_RE.match(line)
if not match:
continue
submodule = match.groupdict()["submodule"]
commit = match.groupdict()["commit"]
result[submodule]["commit"] = commit
remote_url = self.get_remote_url()
for submodule_entry in result.values():
url = submodule_entry["url"]
if not url.startswith("../../"):
submodule_entry["clone_url"] = url
continue
clone_url = self.urljoin(remote_url, submodule_entry["url"])
owner, repo = self.get_owner_repo_from_url(clone_url)
submodule_entry["clone_url"] = clone_url
submodule_entry["owner"] = owner
submodule_entry["repo"] = repo
return result