import fnmatch import logging import pathlib import subprocess import pygit2 from lib.binary import BINARY LFS_SUFFIX = "filter=lfs diff=lfs merge=lfs -text" class Git: """Local git repository""" def __init__(self, path, committer=None, committer_email=None): self.path = pathlib.Path(path) self.committer = committer self.committer_email = committer_email self.repo = None def is_open(self): return self.repo is not None # TODO: Extend it to packages and files def exists(self): """Check if the path is a valid git repository""" return (self.path / ".git").exists() def create(self): """Create a local git repository""" self.path.mkdir(parents=True, exist_ok=True) # Convert the path to string, to avoid some limitations in # older pygit2 self.repo = pygit2.init_repository(str(self.path)) return self def is_dirty(self): """Check if there is something to commit""" assert self.is_open() return self.repo.status() def branches(self): return list(self.repo.branches) def branch(self, branch, commit=None): if not commit: commit = self.repo.head else: commit = self.repo.get(commit) self.repo.branches.local.create(branch, commit) def checkout(self, branch): """Checkout into the branch HEAD""" new_branch = False ref = f"refs/heads/{branch}" if branch not in self.branches(): self.repo.references["HEAD"].set_target(ref) new_branch = True else: self.repo.checkout(ref) return new_branch def commit( self, user, user_email, user_time, message, parents=None, committer=None, committer_email=None, committer_time=None, allow_empty=False, ): """Add all the files and create a new commit in the current HEAD""" assert allow_empty or self.is_dirty() if not committer: committer = self.committer if self.committer else self.user committer_email = ( self.committer_email if self.committer_email else self.user_email ) committer_time = committer_time if committer_time else user_time try: self.repo.index.add_all() except pygit2.GitError as e: if not allow_empty: raise e self.repo.index.write() author = pygit2.Signature(user, user_email, int(user_time.timestamp())) committer = pygit2.Signature( committer, committer_email, int(committer_time.timestamp()) ) if not parents: try: parents = [self.repo.head.target] except pygit2.GitError as e: parents = [] if not allow_empty: raise e tree = self.repo.index.write_tree() return self.repo.create_commit( "HEAD", author, committer, message, tree, parents ) def merge( self, user, user_email, user_time, message, commit, committer=None, committer_email=None, committer_time=None, clean_on_conflict=True, merged=False, allow_empty=False, ): new_branch = False if not merged: try: self.repo.merge(commit) except KeyError: # If it is the first commit, we will have a missing # "HEAD", but the files will be there. We can proceed # to the commit directly. new_branch = True if not merged and self.repo.index.conflicts: for conflict in self.repo.index.conflicts: conflict = [c for c in conflict if c] if conflict: logging.info(f"CONFLICT {conflict[0].path}") if clean_on_conflict: self.clean() # Now I miss Rust enums return "CONFLICT" # Some merges are empty in OBS (no changes, not sure # why), for now we signal them if not allow_empty and not self.is_dirty(): # I really really do miss Rust enums return "EMPTY" if new_branch: parents = [commit] else: parents = [ self.repo.head.target, commit, ] commit = self.commit( user, user_email, user_time, message, parents, committer, committer_email, committer_time, allow_empty=allow_empty, ) return commit def merge_abort(self): self.repo.state_cleanup() def last_commit(self): try: return self.repo.head.target except: return None def branch_head(self, branch): return self.repo.references["refs/heads/" + branch].target def set_branch_head(self, branch, commit): self.repo.references["refs/heads/" + branch].set_target(commit) def gc(self): logging.info(f"Garbage recollect and repackage {self.path}") subprocess.run( ["git", "gc", "--auto"], cwd=self.path, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ) def clean(self): for path, _ in self.repo.status().items(): logging.debug(f"Cleaning {path}") try: (self.path / path).unlink() self.repo.index.remove(path) except Exception as e: logging.warning(f"Error removing file {path}: {e}") def add(self, filename): self.repo.index.add(filename) def add_default_lfs_gitattributes(self, force=False): if not (self.path / ".gitattributes").exists() or force: with (self.path / ".gitattributes").open("w") as f: content = ["## Default LFS"] content += [f"*{b} {LFS_SUFFIX}" for b in sorted(BINARY)] f.write("\n".join(content)) f.write("\n") self.add(".gitattributes") def add_specific_lfs_gitattributes(self, binaries): self.add_default_lfs_gitattributes(force=True) if binaries: with (self.path / ".gitattributes").open("a") as f: content = ["## Specific LFS patterns"] content += [f"{b} {LFS_SUFFIX}" for b in sorted(binaries)] f.write("\n".join(content)) f.write("\n") self.add(".gitattributes") def get_specific_lfs_gitattributes(self): with (self.path / ".gitattributes").open() as f: patterns = [ line.split()[0] for line in f if line.strip() and not line.startswith("#") ] binary = {f"*{b}" for b in BINARY} return [p for p in patterns if p not in binary] def add_lfs(self, filename, sha256, size): with (self.path / filename).open("w") as f: f.write("version https://git-lfs.github.com/spec/v1\n") f.write(f"oid sha256:{sha256}\n") f.write(f"size {size}\n") self.add(filename) if not self.is_lfs_tracked(filename): logging.debug(f"Add specific LFS file {filename}") specific_patterns = self.get_specific_lfs_gitattributes() specific_patterns.append(filename) self.add_specific_lfs_gitattributes(specific_patterns) def is_lfs_tracked(self, filename): with (self.path / ".gitattributes").open() as f: patterns = ( line.split()[0] for line in f if line.strip() and not line.startswith("#") ) return any(fnmatch.fnmatch(filename, line) for line in patterns) def remove(self, file: pathlib.Path): self.repo.index.remove(file.name) (self.path / file).unlink() patterns = self.get_specific_lfs_gitattributes() if file.name in patterns: patterns.remove(file.name) self.add_specific_lfs_gitattributes(patterns)