1
0
mirror of https://github.com/openSUSE/osc.git synced 2025-08-23 22:58:53 +02:00

Implement 'git-obs pr review' command

This commit is contained in:
2025-02-18 20:58:54 +01:00
parent 104700230f
commit ea24281859
7 changed files with 517 additions and 11 deletions

View File

@@ -1,6 +1,9 @@
import os
import re
import subprocess
import urllib
from typing import Iterator
from typing import List
from typing import Optional
from typing import Tuple
@@ -9,11 +12,24 @@ class Git:
def __init__(self, workdir):
self.abspath = os.path.abspath(workdir)
def _run_git(self, args: list) -> str:
def _run_git(self, args: List[str], mute_stderr: bool = False) -> str:
# HACK: having 2 nearly identical commands is stupid, but it muted a mypy error
if mute_stderr:
return subprocess.check_output(["git"] + args, encoding="utf-8", cwd=self.abspath, stderr=subprocess.DEVNULL).strip()
return subprocess.check_output(["git"] + args, encoding="utf-8", cwd=self.abspath).strip()
def init(self, *, quiet=True):
def init(self, *, initial_branch: Optional[str] = None, quiet: bool = True, mute_stderr: bool = False):
cmd = ["init"]
if initial_branch:
cmd += ["-b", initial_branch]
if quiet:
cmd += ["-q"]
self._run_git(cmd, mute_stderr=mute_stderr)
def clone(self, url, directory: Optional[str] = None, quiet: bool = True):
cmd = ["clone", url]
if directory:
cmd += [directory]
if quiet:
cmd += ["-q"]
self._run_git(cmd)
@@ -25,10 +41,28 @@ class Git:
return self._run_git(["branch", "--show-current"])
def get_branch_head(self, branch: str) -> str:
return self._run_git(["rev-parse", branch])
return self._run_git(["rev-parse", f"refs/heads/{branch}"])
def switch(self, branch: str):
self._run_git(["switch", branch])
def branch_exists(self, branch: str) -> bool:
try:
self._run_git(["rev-parse", f"refs/heads/{branch}", "--"], mute_stderr=True)
except subprocess.CalledProcessError:
return False
return True
def commit_count(self, branch: str) -> int:
try:
commits = self._run_git(["rev-list", "--count", f"refs/heads/{branch}", "--"], mute_stderr=True)
return int(commits)
except subprocess.CalledProcessError:
return -1
def switch(self, branch: str, orphan: bool = False):
cmd = ["switch"]
if orphan:
cmd += ["--orphan"]
cmd += [branch]
self._run_git(cmd)
def fetch_pull_request(
self,
@@ -82,3 +116,65 @@ class Git:
path = path[:-4]
owner, repo = path.strip("/").split("/")[-2:]
return owner, repo
# LFS
def lfs_ls_files(self, ref: str = "HEAD") -> List[Tuple[str, str]]:
# TODO: --size; returns human readable string; can we somehow get the exact value in bytes instead?
out = self._run_git(["lfs", "ls-files", "--long", ref])
regex = re.compile(r"^(?P<checksum>[0-9a-f]+) [\*\-] (?P<filename>.*)$")
result = []
for line in out.splitlines():
match = regex.match(line)
if not match:
continue
result.append((match.group(2), match.group(1)))
return result
def lfs_cat_file(self, filename: str, ref: str = "HEAD"):
"""
A generator function that returns chunks of bytes of the requested file.
"""
with subprocess.Popen(["git", "cat-file", "--filters", f"{ref}:{filename}"], stdout=subprocess.PIPE, cwd=self.abspath) as proc:
assert proc.stdout is not None
while True:
# 1MiB chunks are probably a good balance between memory consumption and performance
data = proc.stdout.read(1024**2)
if not data:
break
yield data
# FILES
def add(self, files: List[str]):
self._run_git(["add", *files])
def commit(self, msg, *, allow_empty: bool = False):
cmd = ["commit", "-m", msg]
if allow_empty:
cmd += ["--allow-empty"]
self._run_git(cmd)
def diff(self, ref_old: str, ref_new: str, src_prefix: Optional[str] = None, dst_prefix: Optional[str] = None) -> Iterator[bytes]:
cmd = ["git", "diff", ref_old, ref_new]
if src_prefix:
src_prefix = src_prefix.rstrip("/") + "/"
cmd += [f"--src-prefix={src_prefix}"]
if dst_prefix:
dst_prefix = dst_prefix.rstrip("/") + "/"
cmd += [f"--dst-prefix={dst_prefix}"]
# 1MiB chunks are probably a good balance between memory consumption and performance
chunk_size = 1024**2
with subprocess.Popen(cmd, stdout=subprocess.PIPE, cwd=self.abspath) as proc:
assert proc.stdout is not None
while True:
# read a chunk of data, make sure it ends with a newline
# so we don't have to deal with split utf-8 characters and incomplete escape sequences later
chunk = proc.stdout.read(chunk_size)
chunk += proc.stdout.readline()
if not chunk:
break
yield chunk