Files
git-importer/lib/git.py
Adam Majer 5a28f62fb9 reconstruct state data
If the state file is missing, we can reconstruct which parts were
exported based on revision ids

Also, packages could have branches in Git, but not be in Git. We
need to check (project,package) tuple for this and not just abort
based on the package name alone.
2025-08-09 18:09:35 +02:00

295 lines
9.6 KiB
Python

import fnmatch
import logging
import os
import pathlib
import subprocess
import requests
from lib.binary import BINARY
LFS_SUFFIX = "filter=lfs diff=lfs merge=lfs -text"
class Git:
"""Local git repository"""
def __init__(self, path, committer=None, committer_email=None):
self.path = pathlib.Path(path)
self.committer = committer
self.committer_email = committer_email
def exists(self):
"""Check if the path is a valid git repository"""
return (self.path / ".git").exists()
def create(self):
"""Create a local git repository"""
self.path.mkdir(parents=True, exist_ok=True)
self.open()
def git_run(self, args, **kwargs):
"""Run a git command"""
if "env" in kwargs:
envs = kwargs["env"].copy()
del kwargs["env"]
else:
envs = os.environ.copy()
envs["GIT_LFS_SKIP_SMUDGE"] = "1"
envs["GIT_CONFIG_GLOBAL"] = "/dev/null"
return subprocess.run(
["git"] + args,
cwd=self.path,
check=True,
env=envs,
**kwargs,
)
def open(self):
if not self.exists():
self.git_run(["init", "--object-format=sha256", "-b", "factory"])
self.git_run(["config", "lfs.allowincompletepush", "true"])
def is_dirty(self):
"""Check if there is something to commit"""
status_str = self.git_run(
["status", "--porcelain=2"],
stdout=subprocess.PIPE,
).stdout.decode("utf-8")
return len(list(filter(None, status_str.split("\n")))) > 0
def branches(self):
br = (
self.git_run(
["for-each-ref", "--format=%(refname:short)", "refs/heads/"],
stdout=subprocess.PIPE,
)
.stdout.decode("utf-8")
.split()
)
if len(br) == 0:
br.append("factory") # unborn branch?
return br
def branch(self, branch, commit="HEAD"):
commit = (
self.git_run(
["rev-parse", "--verify", "--end-of-options", commit + "^{commit}"],
stdout=subprocess.PIPE,
)
.stdout.decode("utf-8")
.strip()
)
return self.git_run(["branch", branch, commit])
def checkout(self, branch):
"""Checkout into the branch HEAD"""
new_branch = False
if branch not in self.branches():
self.git_run(["switch", "-q", "--orphan", branch])
new_branch = True
else:
ref = f"refs/heads/{branch}"
if (self.path / ".git" / ref).exists():
self.git_run(["switch", "--no-guess", "-q", branch])
return new_branch
def commit(
self,
user,
user_email,
user_time,
message,
parents=None,
committer=None,
committer_email=None,
committer_time=None,
):
"""Add all the files and create a new commit in the current HEAD"""
if not committer:
committer = self.committer if self.committer else self.user
committer_email = (
self.committer_email if self.committer_email else self.user_email
)
committer_time = committer_time if committer_time else user_time
if self.is_dirty():
self.git_run(["add", "--all", "."])
tree_id = (
self.git_run(["write-tree"], stdout=subprocess.PIPE)
.stdout.decode("utf-8")
.strip()
)
parent_array = []
if isinstance(parents, list):
for parent in filter(None, parents):
parent_array = parent_array + ["-p", parent]
elif isinstance(parents, str):
parent_array = ["-p", parents]
commit_id = (
self.git_run(
["commit-tree"] + parent_array + [tree_id],
env={
"GIT_AUTHOR_NAME": user,
"GIT_AUTHOR_EMAIL": user_email,
"GIT_AUTHOR_DATE": f"{int(user_time.timestamp())} +0000",
"GIT_COMMITTER_NAME": committer,
"GIT_COMMITTER_EMAIL": committer_email,
"GIT_COMMITTER_DATE": f"{int(committer_time.timestamp())} +0000",
},
input=message.encode("utf-8"),
stdout=subprocess.PIPE,
)
.stdout.decode("utf-8")
.rstrip()
)
self.git_run(["reset", "--soft", commit_id])
return commit_id
def branch_head(self, branch="HEAD"):
return (
self.git_run(
["rev-parse", "--verify", "--end-of-options", branch],
stdout=subprocess.PIPE,
)
.stdout.decode("utf-8")
.strip()
)
def branch_commit(self, branch="HEAD"):
try:
return (self.git_run(["cat-file", "commit", branch], stdout=subprocess.PIPE).stdout.decode("utf-8").strip())
except:
return ''
def set_branch_head(self, branch, commit):
return self.git_run(["update-ref", f"refs/heads/{branch}", commit])
def gc(self):
logging.debug(f"Garbage recollect and repackage {self.path}")
self.git_run(
["gc", "--auto"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
# def clean(self):
# for path, _ in self.repo.status().items():
# logging.debug(f"Cleaning {path}")
# try:
# (self.path / path).unlink()
# self.repo.index.remove(path)
# except Exception as e:
# logging.warning(f"Error removing file {path}: {e}")
def add(self, filename):
self.git_run(["add", ":(literal)" + str(filename)])
def add_default_gitignore(self):
if not (self.path / ".gitignore").exists():
with (self.path / ".gitignore").open("w") as f:
f.write(".osc\n")
self.add(".gitignore")
def add_default_lfs_gitattributes(self, force=False):
if not (self.path / ".gitattributes").exists() or force:
with (self.path / ".gitattributes").open("w") as f:
content = ["## Default LFS"]
content += [f"*{b} {LFS_SUFFIX}" for b in sorted(BINARY)]
f.write("\n".join(content))
f.write("\n")
self.add(".gitattributes")
def add_specific_lfs_gitattributes(self, binaries):
self.add_default_lfs_gitattributes(force=True)
if binaries:
with (self.path / ".gitattributes").open("a") as f:
content = ["## Specific LFS patterns"]
content += [f"{b} {LFS_SUFFIX}" for b in sorted(binaries)]
f.write("\n".join(content))
f.write("\n")
self.add(".gitattributes")
def get_specific_lfs_gitattributes(self):
with (self.path / ".gitattributes").open() as f:
patterns = [
line.split()[0]
for line in f
if line.strip() and not line.startswith("#")
]
binary = {f"*{b}" for b in BINARY}
return [p for p in patterns if p not in binary]
def add_lfs(self, filename, sha256, size):
with (self.path / filename).open("w") as f:
f.write("version https://git-lfs.github.com/spec/v1\n")
f.write(f"oid sha256:{sha256}\n")
f.write(f"size {size}\n")
self.add(filename)
if not self.is_lfs_tracked(filename):
logging.debug(f"Add specific LFS file {filename}")
specific_patterns = self.get_specific_lfs_gitattributes()
specific_patterns.append(filename)
self.add_specific_lfs_gitattributes(specific_patterns)
def is_lfs_tracked(self, filename):
with (self.path / ".gitattributes").open() as f:
patterns = (
line.split()[0]
for line in f
if line.strip() and not line.startswith("#")
)
return any(fnmatch.fnmatch(filename, line) for line in patterns)
def remove(self, file: pathlib.Path):
self.git_run(
["rm", "-q", "-f", "--ignore-unmatch", ":(literal)" + file.name],
)
patterns = self.get_specific_lfs_gitattributes()
if file.name in patterns:
patterns.remove(file.name)
self.add_specific_lfs_gitattributes(patterns)
def add_gitea_remote(self, package):
repo_name = package.replace("+", "_")
org_name = "pool"
if not os.getenv("GITEA_TOKEN"):
logging.warning("Not adding a remote due to missing $GITEA_TOKEN")
return
url = f"https://src.opensuse.org/api/v1/org/{org_name}/repos"
response = requests.post(
url,
data={"name": repo_name, "object_format_name": "sha256"},
headers={"Authorization": f"token {os.getenv('GITEA_TOKEN')}"},
timeout=10,
)
# 409 Conflict (Already existing)
# 201 Created
if response.status_code not in (201, 409):
print(response.data)
url = f"gitea@src.opensuse.org:{org_name}/{repo_name}.git"
self.git_run(
["remote", "add", "origin", url],
)
def push(self, force=False):
if "origin" not in self.git_run(
["remote"],
stdout=subprocess.PIPE,
).stdout.decode("utf-8"):
logging.warning("Not pushing to remote because no 'origin' configured")
return
cmd = ["push"]
if force:
cmd.append("-f")
cmd += ["origin", "--all"]
self.git_run(cmd)