git-importer/lib/git.py
Stephan Kulow ed4b7367eb Reset branch if the devel branch is based on Factory
This happens in packages that change their devel project over time. Then
the commit in the devel project no longer has the parent in the devel branch
but is based on factory
2022-11-03 15:12:07 +01:00

270 lines
8.2 KiB
Python

import fnmatch
import logging
import pathlib
import subprocess
import pygit2
from lib.binary import BINARY
LFS_SUFFIX = "filter=lfs diff=lfs merge=lfs -text"
class Git:
"""Local git repository"""
def __init__(self, path, committer=None, committer_email=None):
self.path = pathlib.Path(path)
self.committer = committer
self.committer_email = committer_email
self.repo = None
def is_open(self):
return self.repo is not None
# TODO: Extend it to packages and files
def exists(self):
"""Check if the path is a valid git repository"""
return (self.path / ".git").exists()
def create(self):
"""Create a local git repository"""
self.path.mkdir(parents=True, exist_ok=True)
# Convert the path to string, to avoid some limitations in
# older pygit2
self.repo = pygit2.init_repository(str(self.path))
return self
def is_dirty(self):
"""Check if there is something to commit"""
assert self.is_open()
return self.repo.status()
def branches(self):
return list(self.repo.branches)
def branch(self, branch, commit=None):
if not commit:
commit = self.repo.head
else:
commit = self.repo.get(commit)
self.repo.branches.local.create(branch, commit)
def checkout(self, branch):
"""Checkout into the branch HEAD"""
new_branch = False
ref = f"refs/heads/{branch}"
if branch not in self.branches():
self.repo.references["HEAD"].set_target(ref)
new_branch = True
else:
self.repo.checkout(ref)
return new_branch
def commit(
self,
user,
user_email,
user_time,
message,
parents=None,
committer=None,
committer_email=None,
committer_time=None,
allow_empty=False,
):
"""Add all the files and create a new commit in the current HEAD"""
assert allow_empty or self.is_dirty()
if not committer:
committer = self.committer if self.committer else self.user
committer_email = (
self.committer_email if self.committer_email else self.user_email
)
committer_time = committer_time if committer_time else user_time
try:
self.repo.index.add_all()
except pygit2.GitError as e:
if not allow_empty:
raise e
self.repo.index.write()
author = pygit2.Signature(user, user_email, int(user_time.timestamp()))
committer = pygit2.Signature(
committer, committer_email, int(committer_time.timestamp())
)
if not parents:
try:
parents = [self.repo.head.target]
except pygit2.GitError as e:
parents = []
if not allow_empty:
raise e
tree = self.repo.index.write_tree()
return self.repo.create_commit(
"HEAD", author, committer, message, tree, parents
)
def merge(
self,
user,
user_email,
user_time,
message,
commit,
committer=None,
committer_email=None,
committer_time=None,
clean_on_conflict=True,
merged=False,
allow_empty=False,
):
new_branch = False
if not merged:
try:
self.repo.merge(commit)
except KeyError:
# If it is the first commit, we will have a missing
# "HEAD", but the files will be there. We can proceed
# to the commit directly.
new_branch = True
if not merged and self.repo.index.conflicts:
for conflict in self.repo.index.conflicts:
conflict = [c for c in conflict if c]
if conflict:
logging.info(f"CONFLICT {conflict[0].path}")
if clean_on_conflict:
self.clean()
# Now I miss Rust enums
return "CONFLICT"
# Some merges are empty in OBS (no changes, not sure
# why), for now we signal them
if not allow_empty and not self.is_dirty():
# I really really do miss Rust enums
return "EMPTY"
if new_branch:
parents = [commit]
else:
parents = [
self.repo.head.target,
commit,
]
commit = self.commit(
user,
user_email,
user_time,
message,
parents,
committer,
committer_email,
committer_time,
allow_empty=allow_empty,
)
return commit
def merge_abort(self):
self.repo.state_cleanup()
def last_commit(self):
try:
return self.repo.head.target
except:
return None
def branch_head(self, branch):
return self.repo.references["refs/heads/" + branch].target
def set_branch_head(self, branch, commit):
self.repo.references["refs/heads/" + branch].set_target(commit)
def gc(self):
logging.info(f"Garbage recollect and repackage {self.path}")
subprocess.run(
["git", "gc", "--auto"],
cwd=self.path,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
def clean(self):
for path, _ in self.repo.status().items():
logging.debug(f"Cleaning {path}")
try:
(self.path / path).unlink()
self.repo.index.remove(path)
except Exception as e:
logging.warning(f"Error removing file {path}: {e}")
def add(self, filename):
self.repo.index.add(filename)
def add_default_lfs_gitattributes(self, force=False):
if not (self.path / ".gitattributes").exists() or force:
with (self.path / ".gitattributes").open("w") as f:
content = ["## Default LFS"]
content += [f"*{b} {LFS_SUFFIX}" for b in sorted(BINARY)]
f.write("\n".join(content))
f.write("\n")
self.add(".gitattributes")
def add_specific_lfs_gitattributes(self, binaries):
self.add_default_lfs_gitattributes(force=True)
if binaries:
with (self.path / ".gitattributes").open("a") as f:
content = ["## Specific LFS patterns"]
content += [f"{b} {LFS_SUFFIX}" for b in sorted(binaries)]
f.write("\n".join(content))
f.write("\n")
self.add(".gitattributes")
def get_specific_lfs_gitattributes(self):
with (self.path / ".gitattributes").open() as f:
patterns = [
line.split()[0]
for line in f
if line.strip() and not line.startswith("#")
]
binary = {f"*{b}" for b in BINARY}
return [p for p in patterns if p not in binary]
def add_lfs(self, filename, sha256, size):
with (self.path / filename).open("w") as f:
f.write("version https://git-lfs.github.com/spec/v1\n")
f.write(f"oid sha256:{sha256}\n")
f.write(f"size {size}\n")
self.add(filename)
if not self.is_lfs_tracked(filename):
logging.debug(f"Add specific LFS file {filename}")
specific_patterns = self.get_specific_lfs_gitattributes()
specific_patterns.append(filename)
self.add_specific_lfs_gitattributes(specific_patterns)
def is_lfs_tracked(self, filename):
with (self.path / ".gitattributes").open() as f:
patterns = (
line.split()[0]
for line in f
if line.strip() and not line.startswith("#")
)
return any(fnmatch.fnmatch(filename, line) for line in patterns)
def remove(self, file: pathlib.Path):
self.repo.index.remove(file.name)
(self.path / file).unlink()
patterns = self.get_specific_lfs_gitattributes()
if file.name in patterns:
patterns.remove(file.name)
self.add_specific_lfs_gitattributes(patterns)