import fnmatch import logging import os import pathlib import subprocess import requests from lib.binary import BINARY LFS_SUFFIX = "filter=lfs diff=lfs merge=lfs -text" class Git: """Local git repository""" def __init__(self, path, committer=None, committer_email=None): self.path = pathlib.Path(path) self.committer = committer self.committer_email = committer_email def exists(self): """Check if the path is a valid git repository""" return (self.path / ".git").exists() def create(self): """Create a local git repository""" self.path.mkdir(parents=True, exist_ok=True) self.open() def git_run(self, args, **kwargs): """Run a git command""" if "env" in kwargs: envs = kwargs["env"].copy() del kwargs["env"] else: envs = os.environ.copy() envs["GIT_LFS_SKIP_SMUDGE"] = "1" envs["GIT_CONFIG_GLOBAL"] = "/dev/null" return subprocess.run( ["git"] + args, cwd=self.path, check=True, env=envs, **kwargs, ) def open(self): if not self.exists(): self.git_run(["init", "--object-format=sha256", "-b", "factory"]) def is_dirty(self): """Check if there is something to commit""" status_str = self.git_run( ["status", "--porcelain=2"], stdout=subprocess.PIPE, ).stdout.decode("utf-8") return len(list(filter(None, status_str.split("\n")))) > 0 def branches(self): br = ( self.git_run( ["for-each-ref", "--format=%(refname:short)", "refs/heads/"], stdout=subprocess.PIPE, ) .stdout.decode("utf-8") .split() ) if len(br) == 0: br.append("factory") # unborn branch? return br def branch(self, branch, commit="HEAD"): commit = ( self.git_run( ["rev-parse", "--verify", "--end-of-options", commit + "^{commit}"], stdout=subprocess.PIPE, ) .stdout.decode("utf-8") .strip() ) return self.git_run(["branch", branch, commit]) def checkout(self, branch): """Checkout into the branch HEAD""" new_branch = False if branch not in self.branches(): self.git_run(["branch", "-q", branch, "HEAD"]) new_branch = True else: ref = f"refs/heads/{branch}" if (self.path / ".git" / ref).exists(): self.git_run(["switch", "-q", branch]) return new_branch def commit( self, user, user_email, user_time, message, parents=None, committer=None, committer_email=None, committer_time=None, ): """Add all the files and create a new commit in the current HEAD""" if not committer: committer = self.committer if self.committer else self.user committer_email = ( self.committer_email if self.committer_email else self.user_email ) committer_time = committer_time if committer_time else user_time if self.is_dirty(): self.git_run(["add", "--all", "."]) tree_id = ( self.git_run(["write-tree"], stdout=subprocess.PIPE) .stdout.decode("utf-8") .strip() ) parent_array = [] if isinstance(parents, list): for parent in filter(None, parents): parent_array = parent_array + ["-p", parent] elif isinstance(parents, str): parent_array = ["-p", parents] commit_id = ( self.git_run( ["commit-tree"] + parent_array + [tree_id], env={ "GIT_AUTHOR_NAME": user, "GIT_AUTHOR_EMAIL": user_email, "GIT_AUTHOR_DATE": f"{int(user_time.timestamp())} +0000", "GIT_COMMITTER_NAME": committer, "GIT_COMMITTER_EMAIL": committer_email, "GIT_COMMITTER_DATE": f"{int(committer_time.timestamp())} +0000", }, input=message.encode("utf-8"), stdout=subprocess.PIPE, ) .stdout.decode("utf-8") .rstrip() ) self.git_run(["reset", "--soft", commit_id]) return commit_id def branch_head(self, branch="HEAD"): return ( self.git_run( ["rev-parse", "--verify", "--end-of-options", branch], stdout=subprocess.PIPE, ) .stdout.decode("utf-8") .strip() ) def set_branch_head(self, branch, commit): return self.git_run(["update-ref", f"refs/heads/{branch}", commit]) def gc(self): logging.debug(f"Garbage recollect and repackage {self.path}") self.git_run( ["gc", "--auto"], stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) # def clean(self): # for path, _ in self.repo.status().items(): # logging.debug(f"Cleaning {path}") # try: # (self.path / path).unlink() # self.repo.index.remove(path) # except Exception as e: # logging.warning(f"Error removing file {path}: {e}") def add(self, filename): self.git_run(["add", filename]) def add_default_lfs_gitattributes(self, force=False): if not (self.path / ".gitattributes").exists() or force: with (self.path / ".gitattributes").open("w") as f: content = ["## Default LFS"] content += [f"*{b} {LFS_SUFFIX}" for b in sorted(BINARY)] f.write("\n".join(content)) f.write("\n") self.add(".gitattributes") def add_specific_lfs_gitattributes(self, binaries): self.add_default_lfs_gitattributes(force=True) if binaries: with (self.path / ".gitattributes").open("a") as f: content = ["## Specific LFS patterns"] content += [f"{b} {LFS_SUFFIX}" for b in sorted(binaries)] f.write("\n".join(content)) f.write("\n") self.add(".gitattributes") def get_specific_lfs_gitattributes(self): with (self.path / ".gitattributes").open() as f: patterns = [ line.split()[0] for line in f if line.strip() and not line.startswith("#") ] binary = {f"*{b}" for b in BINARY} return [p for p in patterns if p not in binary] def add_lfs(self, filename, sha256, size): with (self.path / filename).open("w") as f: f.write("version https://git-lfs.github.com/spec/v1\n") f.write(f"oid sha256:{sha256}\n") f.write(f"size {size}\n") self.add(filename) if not self.is_lfs_tracked(filename): logging.debug(f"Add specific LFS file {filename}") specific_patterns = self.get_specific_lfs_gitattributes() specific_patterns.append(filename) self.add_specific_lfs_gitattributes(specific_patterns) def is_lfs_tracked(self, filename): with (self.path / ".gitattributes").open() as f: patterns = ( line.split()[0] for line in f if line.strip() and not line.startswith("#") ) return any(fnmatch.fnmatch(filename, line) for line in patterns) def remove(self, file: pathlib.Path): self.git_run( ["rm", "-q", "-f", "--ignore-unmatch", file.name], ) patterns = self.get_specific_lfs_gitattributes() if file.name in patterns: patterns.remove(file.name) self.add_specific_lfs_gitattributes(patterns) def add_gitea_remote(self, package): repo_name = package.replace("+", "_") org_name = "rpm" if not os.getenv("GITEA_TOKEN"): logging.warning("Not adding a remote due to missing $GITEA_TOKEN") return url = f"https://src.opensuse.org/api/v1/org/{org_name}/repos" response = requests.post( url, data={"name": repo_name}, headers={"Authorization": f"token {os.getenv('GITEA_TOKEN')}"}, timeout=10, ) # 409 Conflict (Already existing) # 201 Created if response.status_code not in (201, 409): print(response.data) url = f"gitea@src.opensuse.org:{org_name}/{repo_name}.git" self.git_run( ["remote", "add", "origin", url], ) def push(self, force=False): if "origin" not in self.git_run(["remote"]).stdout: logger.warning("Not pushing to remote because no 'origin' configured") return cmd = ["push"] if force: cmd.append("-f") cmd.append("origin") cmd.append("refs/heads/factory") cmd.append("refs/heads/devel") self.git_run(cmd)