mirror of
				https://github.com/openSUSE/osc.git
				synced 2025-10-23 08:42:15 +02:00 
			
		
		
		
	
		
			
				
	
	
		
			433 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			433 lines
		
	
	
		
			15 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import os
 | |
| import re
 | |
| import subprocess
 | |
| import urllib
 | |
| from typing import Dict
 | |
| from typing import Iterator
 | |
| from typing import List
 | |
| from typing import Optional
 | |
| from typing import Tuple
 | |
| 
 | |
| 
 | |
| class SshParseResult(urllib.parse.ParseResult):
 | |
|     """
 | |
|     Class to distinguish parsed SSH URLs
 | |
|     """
 | |
| 
 | |
| 
 | |
| class Git:
 | |
|     @staticmethod
 | |
|     def urlparse(url: str) -> urllib.parse.ParseResult:
 | |
|         """
 | |
|         Parse git url.
 | |
| 
 | |
|         Supported formats:
 | |
|             - https://example.com/owner/repo.git
 | |
|             - https://example.com:1234/owner/repo.git
 | |
|             - example.com/owner/repo.git
 | |
|             - user@example.com:owner/repo.git
 | |
|             - user@example.com:1234:owner/repo.git"
 | |
|         """
 | |
|         # try ssh clone url first
 | |
|         pattern = r"(?P<netloc>[^@:]+@[^@:]+(:[0-9]+)?):(?P<path>.+)"
 | |
|         match = re.match(pattern, url)
 | |
|         if match:
 | |
|             scheme = ""
 | |
|             netloc = match.groupdict()["netloc"]
 | |
|             path = match.groupdict()["path"]
 | |
|             params = ''
 | |
|             query = ''
 | |
|             fragment = ''
 | |
|             result = SshParseResult(scheme, netloc, path, params, query, fragment)
 | |
|             return result
 | |
| 
 | |
|         result = urllib.parse.urlparse(url)
 | |
|         if not result.netloc:
 | |
|             # empty netloc is most likely an error, prepend and then discard scheme to trick urlparse()
 | |
|             result = urllib.parse.urlparse("https://" + url)
 | |
|             result = urllib.parse.ParseResult("", *list(result)[1:])
 | |
|         return result
 | |
| 
 | |
|     @staticmethod
 | |
|     def urljoin(url: str, path: str) -> str:
 | |
|         """
 | |
|         Append ``path`` to ``url``.
 | |
|         """
 | |
|         parts = Git.urlparse(url)
 | |
|         # we're using os.path.normpath() and os.path.join() for working with URL paths, which may not be ideal, but seems to be working fine (on Linux)
 | |
|         # we need to remove leading forward slash from ``parts.path`` because ``os.path.normpath("/../")`` resolves to "/" and we don't want that
 | |
|         new_path = os.path.normpath(os.path.join(parts.path.lstrip("/"), path.lstrip("/")))
 | |
| 
 | |
|         parts = parts._replace(path=new_path)
 | |
| 
 | |
|         if isinstance(parts, SshParseResult):
 | |
|             new_url = f"{parts.netloc}:{parts.path}"
 | |
|         else:
 | |
|             new_url = urllib.parse.urlunparse(parts)
 | |
| 
 | |
|         if new_path.startswith("../") or "/../" in new_path:
 | |
|             raise ValueError(f"URL must not contain relative path: {new_url}")
 | |
| 
 | |
|         return new_url
 | |
| 
 | |
|     def __init__(self, workdir):
 | |
|         self.abspath = os.path.abspath(workdir)
 | |
| 
 | |
|     def _run_git(self, args: List[str], mute_stderr: bool = False) -> str:
 | |
|         # HACK: having 2 nearly identical commands is stupid, but it muted a mypy error
 | |
|         if mute_stderr:
 | |
|             return subprocess.check_output(["git"] + args, encoding="utf-8", cwd=self.abspath, stderr=subprocess.DEVNULL).strip()
 | |
|         return subprocess.check_output(["git"] + args, encoding="utf-8", cwd=self.abspath).strip()
 | |
| 
 | |
|     @property
 | |
|     def topdir(self) -> Optional[str]:
 | |
|         """
 | |
|         A custom implementation to `git rev-parse --show-toplevel` to avoid executing git which is sometimes unnecessary expensive.
 | |
|         """
 | |
|         path = self.abspath
 | |
|         while path:
 | |
|             if os.path.exists(os.path.join(path, ".git")):
 | |
|                 break
 | |
| 
 | |
|             path, dirname = os.path.split(path)
 | |
| 
 | |
|             if (path, dirname) == ("/", ""):
 | |
|                 # no git repo found
 | |
|                 return None
 | |
| 
 | |
|         return path
 | |
| 
 | |
|     def init(self, *, initial_branch: Optional[str] = None, quiet: bool = True, mute_stderr: bool = False):
 | |
|         cmd = ["init"]
 | |
|         if initial_branch:
 | |
|             cmd += ["-b", initial_branch]
 | |
|         if quiet:
 | |
|             cmd += ["-q"]
 | |
|         self._run_git(cmd, mute_stderr=mute_stderr)
 | |
| 
 | |
|     def clone(self,
 | |
|         url: str,
 | |
|         *,
 | |
|         directory: Optional[str] = None,
 | |
|         reference: Optional[str] = None,
 | |
|         reference_if_able: Optional[str] = None,
 | |
|         quiet: bool = True
 | |
|     ):
 | |
|         cmd = ["clone", url]
 | |
|         if directory:
 | |
|             cmd += [directory]
 | |
|         if reference:
 | |
|             cmd += ["--reference", reference]
 | |
|         if reference_if_able:
 | |
|             cmd += ["--reference-if-able", reference_if_able]
 | |
|         if quiet:
 | |
|             cmd += ["-q"]
 | |
|         self._run_git(cmd)
 | |
| 
 | |
|     # BRANCHES
 | |
| 
 | |
|     @property
 | |
|     def current_branch(self) -> Optional[str]:
 | |
|         try:
 | |
|             return self._run_git(["branch", "--show-current"], mute_stderr=True)
 | |
|         except subprocess.CalledProcessError:
 | |
|             return None
 | |
| 
 | |
|     def branch_contains_commit(self, commit: str, branch: Optional[str] = None, remote: Optional[str] = None) -> bool:
 | |
|         if not branch:
 | |
|             branch = self.current_branch
 | |
| 
 | |
|         if remote:
 | |
|             try:
 | |
|                 self._run_git(["merge-base", "--is-ancestor", commit, f"{remote}/{branch}"], mute_stderr=True)
 | |
|                 return True
 | |
|             except subprocess.CalledProcessError:
 | |
|                 return False
 | |
| 
 | |
|         try:
 | |
|             stdout = self._run_git(["branch", branch, "--contains", commit, "--format", "%(objectname) %(objecttype) %(refname)"])
 | |
|             return stdout.strip() == f"{commit} commit refs/heads/{branch}"
 | |
|         except subprocess.CalledProcessError:
 | |
|             return False
 | |
| 
 | |
|     def get_branch_head(self, branch: Optional[str] = None) -> str:
 | |
|         if not branch:
 | |
|             branch = self.current_branch
 | |
| 
 | |
|         return self._run_git(["rev-parse", f"refs/heads/{branch}"])
 | |
| 
 | |
|     def branch_exists(self, branch: str) -> bool:
 | |
|         try:
 | |
|             self._run_git(["rev-parse", f"refs/heads/{branch}", "--"], mute_stderr=True)
 | |
|         except subprocess.CalledProcessError:
 | |
|             return False
 | |
|         return True
 | |
| 
 | |
|     def commit_count(self, branch: str) -> int:
 | |
|         try:
 | |
|             commits = self._run_git(["rev-list", "--count", f"refs/heads/{branch}", "--"], mute_stderr=True)
 | |
|             return int(commits)
 | |
|         except subprocess.CalledProcessError:
 | |
|             return -1
 | |
| 
 | |
|     def reset(self, commit: str, hard: bool = False):
 | |
|         cmd = ["reset", commit]
 | |
|         if hard:
 | |
|             cmd += ["--hard"]
 | |
|         self._run_git(cmd)
 | |
| 
 | |
|     def switch(self, branch: str, orphan: bool = False):
 | |
|         cmd = ["switch"]
 | |
|         if orphan:
 | |
|             cmd += ["--orphan"]
 | |
|         cmd += [branch]
 | |
|         self._run_git(cmd)
 | |
| 
 | |
|     def fetch_pull_request(
 | |
|         self,
 | |
|         pull_number: int,
 | |
|         *,
 | |
|         remote: Optional[str] = None,
 | |
|         commit: Optional[str] = None,
 | |
|         force: bool = False,
 | |
|     ):
 | |
|         """
 | |
|         Fetch pull/$pull_number/head to pull/$pull_number branch
 | |
|         """
 | |
|         target_branch = f"pull/{pull_number}"
 | |
| 
 | |
|         # if the branch exists and the head matches the expected commit, skip running 'git fetch'
 | |
|         if commit and self.branch_exists(target_branch) and self.get_branch_head(target_branch) == commit:
 | |
|             return target_branch
 | |
| 
 | |
|         if not remote:
 | |
|             remote = self.get_current_remote()
 | |
| 
 | |
|         cmd = ["fetch", remote, f"pull/{pull_number}/head:{target_branch}"]
 | |
|         if force:
 | |
|             cmd += [
 | |
|                 "--force",
 | |
|                 "--update-head-ok",
 | |
|             ]
 | |
|         self._run_git(cmd)
 | |
|         return target_branch
 | |
| 
 | |
|     # CONFIG
 | |
| 
 | |
|     def set_config(self, key: str, value: str):
 | |
|         self._run_git(["config", key, value])
 | |
| 
 | |
|     # REMOTES
 | |
| 
 | |
|     def get_remote_url(self, name: Optional[str] = None) -> Optional[str]:
 | |
|         if not name:
 | |
|             name = self.get_current_remote()
 | |
|         if not name:
 | |
|             return None
 | |
|         return self._run_git(["remote", "get-url", name])
 | |
| 
 | |
|     def add_remote(self, name: str, url: str):
 | |
|         self._run_git(["remote", "add", name, url])
 | |
| 
 | |
|     def get_current_remote(self, fallback_to_origin: bool = True) -> Optional[str]:
 | |
|         result = None
 | |
|         try:
 | |
|             result = self._run_git(["rev-parse", "--abbrev-ref", "@{u}"], mute_stderr=True)
 | |
|             if result:
 | |
|                 result = result.split("/")[0]
 | |
|         except subprocess.CalledProcessError:
 | |
|             pass
 | |
| 
 | |
|         # the tracking information isn't sometimes set
 | |
|         # let's fall back to 'origin' if available
 | |
|         if not result and fallback_to_origin:
 | |
|             try:
 | |
|                 self._run_git(["remote", "get-url", "origin"], mute_stderr=True)
 | |
|                 result = "origin"
 | |
|             except subprocess.CalledProcessError:
 | |
|                 pass
 | |
| 
 | |
|         return result
 | |
| 
 | |
|     def fetch(self, name: Optional[str] = None):
 | |
|         if name:
 | |
|             cmd = ["fetch", name]
 | |
|         else:
 | |
|             cmd = ["fetch", "--all"]
 | |
|         self._run_git(cmd)
 | |
| 
 | |
|     def get_owner_repo(self, remote: Optional[str] = None) -> Tuple[str, str]:
 | |
|         remote_url = self.get_remote_url(name=remote)
 | |
|         return self.get_owner_repo_from_url(remote_url)
 | |
| 
 | |
|     @staticmethod
 | |
|     def get_owner_repo_from_url(url: str) -> Tuple[str, str]:
 | |
|         if "@" in url:
 | |
|             # ssh://gitea@example.com:owner/repo.git
 | |
|             # ssh://gitea@example.com:22/owner/repo.git
 | |
|             url = url.rsplit("@", 1)[-1]
 | |
|         parsed_url = urllib.parse.urlparse(url)
 | |
|         path = parsed_url.path
 | |
|         if path.endswith(".git"):
 | |
|             path = path[:-4]
 | |
|         owner, repo = path.strip("/").split("/")[-2:]
 | |
|         return owner, repo
 | |
| 
 | |
|     # LFS
 | |
| 
 | |
|     def lfs_ls_files(self, ref: str = "HEAD", suffixes: Optional[List[str]] = None) -> Dict[str, str]:
 | |
|         # TODO: --size; returns human readable string; can we somehow get the exact value in bytes instead?
 | |
|         out = self._run_git(["lfs", "ls-files", "--long", ref])
 | |
|         regex = re.compile(r"^(?P<checksum>[0-9a-f]+) [\*\-] (?P<path>.*)$")
 | |
|         result = {}
 | |
|         for line in out.splitlines():
 | |
|             match = regex.match(line)
 | |
|             if not match:
 | |
|                 continue
 | |
| 
 | |
|             checksum = match.groupdict()["checksum"]
 | |
|             path = match.groupdict()["path"]
 | |
| 
 | |
|             if suffixes:
 | |
|                 found = False
 | |
|                 for suffix in suffixes:
 | |
|                     if path.endswith(suffix):
 | |
|                         found = True
 | |
|                         break
 | |
|                 if not found:
 | |
|                     continue
 | |
| 
 | |
|             result[path] = checksum
 | |
|         return result
 | |
| 
 | |
|     def lfs_cat_file(self, filename: str, ref: str = "HEAD"):
 | |
|         """
 | |
|         A generator function that returns chunks of bytes of the requested file.
 | |
|         """
 | |
|         with subprocess.Popen(["git", "cat-file", "--filters", f"{ref}:{filename}"], stdout=subprocess.PIPE, cwd=self.abspath) as proc:
 | |
|             assert proc.stdout is not None
 | |
|             while True:
 | |
|                 # 1MiB chunks are probably a good balance between memory consumption and performance
 | |
|                 data = proc.stdout.read(1024**2)
 | |
|                 if not data:
 | |
|                     break
 | |
|                 yield data
 | |
| 
 | |
|     # FILES
 | |
| 
 | |
|     def add(self, files: List[str]):
 | |
|         self._run_git(["add", *files])
 | |
| 
 | |
|     def commit(self, msg, *, allow_empty: bool = False):
 | |
|         cmd = ["commit", "-m", msg]
 | |
|         if allow_empty:
 | |
|             cmd += ["--allow-empty"]
 | |
|         self._run_git(cmd)
 | |
| 
 | |
|     def ls_files(self, ref: str = "HEAD", suffixes: Optional[List[str]] = None) -> Dict[str, str]:
 | |
|         out = self._run_git(["ls-tree", "-r", "--format=%(objectname) %(path)", ref])
 | |
|         regex = re.compile(r"^(?P<checksum>[0-9a-f]+) (?P<path>.*)$")
 | |
|         result = {}
 | |
|         for line in out.splitlines():
 | |
|             match = regex.match(line)
 | |
|             if not match:
 | |
|                 continue
 | |
| 
 | |
|             checksum = match.groupdict()["checksum"]
 | |
|             path = match.groupdict()["path"]
 | |
| 
 | |
|             if suffixes:
 | |
|                 found = False
 | |
|                 for suffix in suffixes:
 | |
|                     if path.endswith(suffix):
 | |
|                         found = True
 | |
|                         break
 | |
|                 if not found:
 | |
|                     continue
 | |
| 
 | |
|             result[path] = checksum
 | |
|         return result
 | |
| 
 | |
|     def diff(self, ref_old: str, ref_new: str, src_prefix: Optional[str] = None, dst_prefix: Optional[str] = None) -> Iterator[bytes]:
 | |
|         cmd = ["git", "diff", ref_old, ref_new]
 | |
| 
 | |
|         if src_prefix:
 | |
|             src_prefix = src_prefix.rstrip("/") + "/"
 | |
|             cmd += [f"--src-prefix={src_prefix}"]
 | |
| 
 | |
|         if dst_prefix:
 | |
|             dst_prefix = dst_prefix.rstrip("/") + "/"
 | |
|             cmd += [f"--dst-prefix={dst_prefix}"]
 | |
| 
 | |
|         # 1MiB chunks are probably a good balance between memory consumption and performance
 | |
|         chunk_size = 1024**2
 | |
|         with subprocess.Popen(cmd, stdout=subprocess.PIPE, cwd=self.abspath) as proc:
 | |
|             assert proc.stdout is not None
 | |
|             while True:
 | |
|                 # read a chunk of data, make sure it ends with a newline
 | |
|                 # so we don't have to deal with split utf-8 characters and incomplete escape sequences later
 | |
|                 chunk = proc.stdout.read(chunk_size)
 | |
|                 chunk += proc.stdout.readline()
 | |
|                 if not chunk:
 | |
|                     break
 | |
|                 yield chunk
 | |
| 
 | |
|     def status(self, *, porcelain: bool = False, untracked_files: bool = False):
 | |
|         cmd = ["status", "--renames"]
 | |
|         if untracked_files:
 | |
|             cmd += ["--untracked-files"]
 | |
|         if porcelain:
 | |
|             cmd += ["--porcelain"]
 | |
|         return self._run_git(cmd)
 | |
| 
 | |
|     # SUBMODULES
 | |
| 
 | |
|     def get_submodules(self) -> dict:
 | |
|         SUBMODULE_RE = re.compile(r"^submodule\.(?P<submodule>[^=]*)\.(?P<key>[^\.=]*)=(?P<value>.*)$")
 | |
|         STATUS_RE = re.compile(r"^.(?P<commit>[a-f0-9]+) (?P<submodule>[^ ]+).*$")
 | |
| 
 | |
|         result = {}
 | |
| 
 | |
|         try:
 | |
|             lines = self._run_git(["config", "--blob", "HEAD:.gitmodules", "--list"], mute_stderr=True).splitlines()
 | |
|         except subprocess.CalledProcessError:
 | |
|             # .gitmodules file is missing
 | |
|             return {}
 | |
| 
 | |
|         for line in lines:
 | |
|             match = SUBMODULE_RE.match(line)
 | |
|             if not match:
 | |
|                 continue
 | |
|             submodule = match.groupdict()["submodule"]
 | |
|             key = match.groupdict()["key"]
 | |
|             value = match.groupdict()["value"]
 | |
|             #if key == "url":
 | |
|             #    assert value.startswith("../../")
 | |
|             submodule_entry = result.setdefault(submodule, {})
 | |
|             submodule_entry[key] = value
 | |
| 
 | |
|         lines = self._run_git(["submodule", "status"]).splitlines()
 | |
| 
 | |
|         for line in lines:
 | |
|             match = STATUS_RE.match(line)
 | |
|             if not match:
 | |
|                 continue
 | |
|             submodule = match.groupdict()["submodule"]
 | |
|             commit = match.groupdict()["commit"]
 | |
|             result[submodule]["commit"] = commit
 | |
| 
 | |
|         remote_url = self.get_remote_url()
 | |
|         for submodule_entry in result.values():
 | |
|             url = submodule_entry["url"]
 | |
|             if not url.startswith("../../"):
 | |
|                 submodule_entry["clone_url"] = url
 | |
|                 continue
 | |
|  
 | |
|             clone_url = self.urljoin(remote_url, submodule_entry["url"])
 | |
|             owner, repo = self.get_owner_repo_from_url(clone_url)
 | |
|             submodule_entry["clone_url"] = clone_url
 | |
|             submodule_entry["owner"] = owner
 | |
|             submodule_entry["repo"] = repo
 | |
| 
 | |
|         return result
 |