import fnmatch import logging import os import pathlib import subprocess import pygit2 import requests from lib.binary import BINARY LFS_SUFFIX = "filter=lfs diff=lfs merge=lfs -text" class Git: """Local git repository""" def __init__(self, path, committer=None, committer_email=None): self.path = pathlib.Path(path) self.committer = committer self.committer_email = committer_email self.repo = None def is_open(self): return self.repo is not None def exists(self): """Check if the path is a valid git repository""" return (self.path / ".git").exists() def create(self): """Create a local git repository""" self.path.mkdir(parents=True, exist_ok=True) self.open() def open(self): # Convert the path to string, to avoid some limitations in # older pygit2 self.repo = pygit2.init_repository(str(self.path)) def is_dirty(self): """Check if there is something to commit""" assert self.is_open() return self.repo.status() def branches(self): return list(self.repo.branches) def branch(self, branch, commit=None): if not commit: commit = self.repo.head else: commit = self.repo.get(commit) self.repo.branches.local.create(branch, commit) def checkout(self, branch): """Checkout into the branch HEAD""" new_branch = False ref = f"refs/heads/{branch}" if branch not in self.branches(): self.repo.references["HEAD"].set_target(ref) new_branch = True else: self.repo.checkout(ref) return new_branch def commit( self, user, user_email, user_time, message, parents=None, committer=None, committer_email=None, committer_time=None, ): """Add all the files and create a new commit in the current HEAD""" if not committer: committer = self.committer if self.committer else self.user committer_email = ( self.committer_email if self.committer_email else self.user_email ) committer_time = committer_time if committer_time else user_time if self.is_dirty(): self.repo.index.add_all() self.repo.index.write() author = pygit2.Signature(user, user_email, int(user_time.timestamp())) committer = pygit2.Signature( committer, committer_email, int(committer_time.timestamp()) ) tree = self.repo.index.write_tree() return self.repo.create_commit( "HEAD", author, committer, message, tree, parents ) def last_commit(self): try: return self.repo.head.target except: return None def branch_head(self, branch): return self.repo.references["refs/heads/" + branch].target def set_branch_head(self, branch, commit): self.repo.references["refs/heads/" + branch].set_target(commit) def gc(self): logging.debug(f"Garbage recollect and repackage {self.path}") subprocess.run( ["git", "gc", "--auto"], cwd=self.path, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) def clean(self): for path, _ in self.repo.status().items(): logging.debug(f"Cleaning {path}") try: (self.path / path).unlink() self.repo.index.remove(path) except Exception as e: logging.warning(f"Error removing file {path}: {e}") def add(self, filename): self.repo.index.add(filename) def add_default_lfs_gitattributes(self, force=False): if not (self.path / ".gitattributes").exists() or force: with (self.path / ".gitattributes").open("w") as f: content = ["## Default LFS"] content += [f"*{b} {LFS_SUFFIX}" for b in sorted(BINARY)] f.write("\n".join(content)) f.write("\n") self.add(".gitattributes") def add_specific_lfs_gitattributes(self, binaries): self.add_default_lfs_gitattributes(force=True) if binaries: with (self.path / ".gitattributes").open("a") as f: content = ["## Specific LFS patterns"] content += [f"{b} {LFS_SUFFIX}" for b in sorted(binaries)] f.write("\n".join(content)) f.write("\n") self.add(".gitattributes") def get_specific_lfs_gitattributes(self): with (self.path / ".gitattributes").open() as f: patterns = [ line.split()[0] for line in f if line.strip() and not line.startswith("#") ] binary = {f"*{b}" for b in BINARY} return [p for p in patterns if p not in binary] def add_lfs(self, filename, sha256, size): with (self.path / filename).open("w") as f: f.write("version https://git-lfs.github.com/spec/v1\n") f.write(f"oid sha256:{sha256}\n") f.write(f"size {size}\n") self.add(filename) if not self.is_lfs_tracked(filename): logging.debug(f"Add specific LFS file {filename}") specific_patterns = self.get_specific_lfs_gitattributes() specific_patterns.append(filename) self.add_specific_lfs_gitattributes(specific_patterns) def is_lfs_tracked(self, filename): with (self.path / ".gitattributes").open() as f: patterns = ( line.split()[0] for line in f if line.strip() and not line.startswith("#") ) return any(fnmatch.fnmatch(filename, line) for line in patterns) def remove(self, file: pathlib.Path): self.repo.index.remove(file.name) (self.path / file).unlink() patterns = self.get_specific_lfs_gitattributes() if file.name in patterns: patterns.remove(file.name) self.add_specific_lfs_gitattributes(patterns) def add_gitea_remote(self, package): repo_name = package.replace("+", "_") org_name = "rpm" if not os.getenv("GITEA_TOKEN"): logging.warning("Not adding a remote due to missing $GITEA_TOKEN") return url = f"https://gitea.opensuse.org/api/v1/org/{org_name}/repos" response = requests.post( url, data={"name": repo_name}, headers={"Authorization": f"token {os.getenv('GITEA_TOKEN')}"}, timeout=10, ) # 409 Conflict (Already existing) # 201 Created if response.status_code not in (201, 409): print(response.data) url = f"gitea@gitea.opensuse.org:{org_name}/{repo_name}.git" self.repo.remotes.create("origin", url) def push(self, force=False): remo = self.repo.remotes["origin"] keypair = pygit2.KeypairFromAgent("gitea") callbacks = pygit2.RemoteCallbacks(credentials=keypair) refspecs = ["refs/heads/factory"] develspec = "refs/heads/devel" if develspec in self.repo.references: if force: refspecs.append(f"+{develspec}:{develspec}") else: refspecs.append("{develspec}:{develspec}") remo.push(refspecs, callbacks=callbacks)