git-importer/lib/git.py
Stephan Kulow 7b20c03256 Add force push for the devel branch
As devel branches can change in case of factory reverts we need to force
push. Factory branch shouldn't be affected, so not force pushing there
2022-12-02 09:12:11 +01:00

232 lines
7.3 KiB
Python

import fnmatch
import logging
import os
import pathlib
import subprocess
import pygit2
import requests
from lib.binary import BINARY
LFS_SUFFIX = "filter=lfs diff=lfs merge=lfs -text"
class Git:
"""Local git repository"""
def __init__(self, path, committer=None, committer_email=None):
self.path = pathlib.Path(path)
self.committer = committer
self.committer_email = committer_email
self.repo = None
def is_open(self):
return self.repo is not None
def exists(self):
"""Check if the path is a valid git repository"""
return (self.path / ".git").exists()
def create(self):
"""Create a local git repository"""
self.path.mkdir(parents=True, exist_ok=True)
self.open()
def open(self):
# Convert the path to string, to avoid some limitations in
# older pygit2
self.repo = pygit2.init_repository(str(self.path))
def is_dirty(self):
"""Check if there is something to commit"""
assert self.is_open()
return self.repo.status()
def branches(self):
return list(self.repo.branches)
def branch(self, branch, commit=None):
if not commit:
commit = self.repo.head
else:
commit = self.repo.get(commit)
self.repo.branches.local.create(branch, commit)
def checkout(self, branch):
"""Checkout into the branch HEAD"""
new_branch = False
ref = f"refs/heads/{branch}"
if branch not in self.branches():
self.repo.references["HEAD"].set_target(ref)
new_branch = True
else:
self.repo.checkout(ref)
return new_branch
def commit(
self,
user,
user_email,
user_time,
message,
parents=None,
committer=None,
committer_email=None,
committer_time=None,
):
"""Add all the files and create a new commit in the current HEAD"""
if not committer:
committer = self.committer if self.committer else self.user
committer_email = (
self.committer_email if self.committer_email else self.user_email
)
committer_time = committer_time if committer_time else user_time
if self.is_dirty():
self.repo.index.add_all()
self.repo.index.write()
author = pygit2.Signature(user, user_email, int(user_time.timestamp()))
committer = pygit2.Signature(
committer, committer_email, int(committer_time.timestamp())
)
tree = self.repo.index.write_tree()
return self.repo.create_commit(
"HEAD", author, committer, message, tree, parents
)
def last_commit(self):
try:
return self.repo.head.target
except:
return None
def branch_head(self, branch):
return self.repo.references["refs/heads/" + branch].target
def set_branch_head(self, branch, commit):
self.repo.references["refs/heads/" + branch].set_target(commit)
def gc(self):
logging.debug(f"Garbage recollect and repackage {self.path}")
subprocess.run(
["git", "gc", "--auto"],
cwd=self.path,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
def clean(self):
for path, _ in self.repo.status().items():
logging.debug(f"Cleaning {path}")
try:
(self.path / path).unlink()
self.repo.index.remove(path)
except Exception as e:
logging.warning(f"Error removing file {path}: {e}")
def add(self, filename):
self.repo.index.add(filename)
def add_default_lfs_gitattributes(self, force=False):
if not (self.path / ".gitattributes").exists() or force:
with (self.path / ".gitattributes").open("w") as f:
content = ["## Default LFS"]
content += [f"*{b} {LFS_SUFFIX}" for b in sorted(BINARY)]
f.write("\n".join(content))
f.write("\n")
self.add(".gitattributes")
def add_specific_lfs_gitattributes(self, binaries):
self.add_default_lfs_gitattributes(force=True)
if binaries:
with (self.path / ".gitattributes").open("a") as f:
content = ["## Specific LFS patterns"]
content += [f"{b} {LFS_SUFFIX}" for b in sorted(binaries)]
f.write("\n".join(content))
f.write("\n")
self.add(".gitattributes")
def get_specific_lfs_gitattributes(self):
with (self.path / ".gitattributes").open() as f:
patterns = [
line.split()[0]
for line in f
if line.strip() and not line.startswith("#")
]
binary = {f"*{b}" for b in BINARY}
return [p for p in patterns if p not in binary]
def add_lfs(self, filename, sha256, size):
with (self.path / filename).open("w") as f:
f.write("version https://git-lfs.github.com/spec/v1\n")
f.write(f"oid sha256:{sha256}\n")
f.write(f"size {size}\n")
self.add(filename)
if not self.is_lfs_tracked(filename):
logging.debug(f"Add specific LFS file {filename}")
specific_patterns = self.get_specific_lfs_gitattributes()
specific_patterns.append(filename)
self.add_specific_lfs_gitattributes(specific_patterns)
def is_lfs_tracked(self, filename):
with (self.path / ".gitattributes").open() as f:
patterns = (
line.split()[0]
for line in f
if line.strip() and not line.startswith("#")
)
return any(fnmatch.fnmatch(filename, line) for line in patterns)
def remove(self, file: pathlib.Path):
self.repo.index.remove(file.name)
(self.path / file).unlink()
patterns = self.get_specific_lfs_gitattributes()
if file.name in patterns:
patterns.remove(file.name)
self.add_specific_lfs_gitattributes(patterns)
def add_gitea_remote(self, package):
repo_name = package.replace("+", "_")
org_name = "rpm"
if not os.getenv("GITEA_TOKEN"):
logging.warning("Not adding a remote due to missing $GITEA_TOKEN")
return
url = f"https://gitea.opensuse.org/api/v1/org/{org_name}/repos"
response = requests.post(
url,
data={"name": repo_name},
headers={"Authorization": f"token {os.getenv('GITEA_TOKEN')}"},
timeout=10,
)
# 409 Conflict (Already existing)
# 201 Created
if response.status_code not in (201, 409):
print(response.data)
url = f"gitea@gitea.opensuse.org:{org_name}/{repo_name}.git"
self.repo.remotes.create("origin", url)
def push(self, force=False):
remo = self.repo.remotes["origin"]
keypair = pygit2.KeypairFromAgent("gitea")
callbacks = pygit2.RemoteCallbacks(credentials=keypair)
refspecs = ["refs/heads/factory"]
develspec = "refs/heads/devel"
if develspec in self.repo.references:
if force:
refspecs.append(f"+{develspec}:{develspec}")
else:
refspecs.append("{develspec}:{develspec}")
remo.push(refspecs, callbacks=callbacks)