import fnmatch import logging import os import pathlib import subprocess import requests from lib.binary import BINARY LFS_SUFFIX = "filter=lfs diff=lfs merge=lfs -text" class Git: """Local git repository""" def __init__(self, path, committer=None, committer_email=None): self.path = pathlib.Path(path) self.committer = committer self.committer_email = committer_email def exists(self): """Check if the path is a valid git repository""" return (self.path / ".git").exists() def create(self): """Create a local git repository""" self.path.mkdir(parents=True, exist_ok=True) self.open() def open(self): subprocess.run( ['git', 'init', '--object-format=sha256', '-b', 'factory'], cwd=self.path, check=True, ) def is_dirty(self): """Check if there is something to commit""" status_str = subprocess.run( ['git', 'status', '--porcelain=2'], cwd=self.path, stdout=subprocess.PIPE, check=True ).stdout.decode('utf-8') return len(list(filter(None, status_str.split('\n')))) > 0 def branches(self): br=subprocess.run( ['git', 'for-each-ref', '--format=%(refname:short)', 'refs/heads/'], cwd=self.path, check=True, stdout=subprocess.PIPE ).stdout.decode('utf-8').split() if len(br) == 0: br.append('factory') # unborn branch? return br def branch(self, branch, commit='HEAD'): commit = subprocess.run( ['git', 'rev-parse', '--verify', '--end-of-options', commit + '^{commit}'], cwd=self.path, check=True, stdout=subprocess.PIPE ).stdout.decode('utf-8').strip() return subprocess.run(['git', 'branch', branch, commit], check=True) def checkout(self, branch): """Checkout into the branch HEAD""" new_branch = False if branch not in self.branches(): subprocess.run( ['git', 'branch', '-q', branch, 'HEAD'], cwd=self.path, check=True ) new_branch = True else: ref = f"refs/heads/{branch}" if (self.path/'.git'/ref).exists(): subprocess.run( ['git', 'checkout', '-q', branch], cwd=self.path, check=True ) return new_branch def commit( self, user, user_email, user_time, message, parents=None, committer=None, committer_email=None, committer_time=None, ): """Add all the files and create a new commit in the current HEAD""" if not committer: committer = self.committer if self.committer else self.user committer_email = ( self.committer_email if self.committer_email else self.user_email ) committer_time = committer_time if committer_time else user_time if self.is_dirty(): subprocess.run( ["git", "add", "--all", "."], cwd=self.path, check=True, ) tree_id = subprocess.run( ['git', 'write-tree'], cwd=self.path, check=True, stdout=subprocess.PIPE ).stdout.decode('utf-8').strip() parent_array = [] if isinstance(parents, list): for parent in filter(None, parents): parent_array = parent_array + ['-p', parent] elif isinstance(parents, str): parents_array = ['-p', parents] commit_id = subprocess.run( ['git', 'commit-tree'] + parent_array + [tree_id], cwd=self.path, env={ "GIT_AUTHOR_NAME": user, "GIT_AUTHOR_EMAIL": user_email, "GIT_AUTHOR_DATE": f"{int(user_time.timestamp())} +0000", "GIT_COMMITTER_NAME": committer, "GIT_COMMITTER_EMAIL": committer_email, "GIT_COMMITTER_DATE": f"{int(committer_time.timestamp())} +0000", }, input=message.encode('utf-8'), check=True, stdout=subprocess.PIPE ).stdout.decode('utf-8').rstrip() subprocess.run( ['git', 'reset', '--soft', commit_id], cwd=self.path, check=True, ) return commit_id def branch_head(self, branch='HEAD'): return subprocess.run( ['git', 'rev-parse', '--verify', '--end-of-options', branch], cwd=self.path, check=True, stdout=subprocess.PIPE ).stdout.decode('utf-8').strip() def set_branch_head(self, branch, commit): return subprocess.run( ['git', 'branch', '-f', branch, commit], cwd=self.path, check=True, ) def gc(self): logging.debug(f"Garbage recollect and repackage {self.path}") subprocess.run( ["git", "gc", "--auto"], cwd=self.path, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) # def clean(self): # for path, _ in self.repo.status().items(): # logging.debug(f"Cleaning {path}") # try: # (self.path / path).unlink() # self.repo.index.remove(path) # except Exception as e: # logging.warning(f"Error removing file {path}: {e}") def add(self, filename): subprocess.run( ['git', 'add', filename], cwd=self.path, check=True, ) def add_default_lfs_gitattributes(self, force=False): if not (self.path / ".gitattributes").exists() or force: with (self.path / ".gitattributes").open("w") as f: content = ["## Default LFS"] content += [f"*{b} {LFS_SUFFIX}" for b in sorted(BINARY)] f.write("\n".join(content)) f.write("\n") self.add(".gitattributes") def add_specific_lfs_gitattributes(self, binaries): self.add_default_lfs_gitattributes(force=True) if binaries: with (self.path / ".gitattributes").open("a") as f: content = ["## Specific LFS patterns"] content += [f"{b} {LFS_SUFFIX}" for b in sorted(binaries)] f.write("\n".join(content)) f.write("\n") self.add(".gitattributes") def get_specific_lfs_gitattributes(self): with (self.path / ".gitattributes").open() as f: patterns = [ line.split()[0] for line in f if line.strip() and not line.startswith("#") ] binary = {f"*{b}" for b in BINARY} return [p for p in patterns if p not in binary] def add_lfs(self, filename, sha256, size): with (self.path / filename).open("w") as f: f.write("version https://git-lfs.github.com/spec/v1\n") f.write(f"oid sha256:{sha256}\n") f.write(f"size {size}\n") self.add(filename) if not self.is_lfs_tracked(filename): logging.debug(f"Add specific LFS file {filename}") specific_patterns = self.get_specific_lfs_gitattributes() specific_patterns.append(filename) self.add_specific_lfs_gitattributes(specific_patterns) def is_lfs_tracked(self, filename): with (self.path / ".gitattributes").open() as f: patterns = ( line.split()[0] for line in f if line.strip() and not line.startswith("#") ) return any(fnmatch.fnmatch(filename, line) for line in patterns) def remove(self, file: pathlib.Path): subprocess.run( ['git', 'rm', '-q', '--ignore-unmatch', file.name], cwd=self.path, check=True, ) patterns = self.get_specific_lfs_gitattributes() if file.name in patterns: patterns.remove(file.name) self.add_specific_lfs_gitattributes(patterns) def add_gitea_remote(self, package): repo_name = package.replace("+", "_") org_name = "rpm" if not os.getenv("GITEA_TOKEN"): logging.warning("Not adding a remote due to missing $GITEA_TOKEN") return url = f"https://src.opensuse.org/api/v1/org/{org_name}/repos" response = requests.post( url, data={"name": repo_name}, headers={"Authorization": f"token {os.getenv('GITEA_TOKEN')}"}, timeout=10, ) # 409 Conflict (Already existing) # 201 Created if response.status_code not in (201, 409): print(response.data) url = f"gitea@src.opensuse.org:{org_name}/{repo_name}.git" subprocess.run( ['git', 'remote', 'add', 'origin', url], cwd=self.path, check=True, ) def push(self, force=False): cmd = ['git', 'push']; if force: cmd.append('-f') cmd.append('origin') cmd.append('refs/heads/factory'); cmd.append('refs/heads/devel'); subprocess.run( cmd, cwd=self.path, check=True, )