forked from importers/git-importer
Adam Majer
eba3fece91
pathspec in git has special characters that we should not trigger. Assume every filespec as literal
290 lines
9.4 KiB
Python
290 lines
9.4 KiB
Python
import fnmatch
|
|
import logging
|
|
import os
|
|
import pathlib
|
|
import subprocess
|
|
|
|
import requests
|
|
|
|
from lib.binary import BINARY
|
|
|
|
LFS_SUFFIX = "filter=lfs diff=lfs merge=lfs -text"
|
|
|
|
|
|
class Git:
|
|
"""Local git repository"""
|
|
|
|
def __init__(self, path, committer=None, committer_email=None):
|
|
self.path = pathlib.Path(path)
|
|
self.committer = committer
|
|
self.committer_email = committer_email
|
|
|
|
def exists(self):
|
|
"""Check if the path is a valid git repository"""
|
|
return (self.path / ".git").exists()
|
|
|
|
def create(self):
|
|
"""Create a local git repository"""
|
|
self.path.mkdir(parents=True, exist_ok=True)
|
|
self.open()
|
|
|
|
def git_run(self, args, **kwargs):
|
|
"""Run a git command"""
|
|
if "env" in kwargs:
|
|
envs = kwargs["env"].copy()
|
|
del kwargs["env"]
|
|
else:
|
|
envs = os.environ.copy()
|
|
envs["GIT_LFS_SKIP_SMUDGE"] = "1"
|
|
envs["GIT_CONFIG_GLOBAL"] = "/dev/null"
|
|
return subprocess.run(
|
|
["git"] + args,
|
|
cwd=self.path,
|
|
check=True,
|
|
env=envs,
|
|
**kwargs,
|
|
)
|
|
|
|
def open(self):
|
|
if not self.exists():
|
|
self.git_run(["init", "--object-format=sha256", "-b", "factory"])
|
|
|
|
def is_dirty(self):
|
|
"""Check if there is something to commit"""
|
|
status_str = self.git_run(
|
|
["status", "--porcelain=2"],
|
|
stdout=subprocess.PIPE,
|
|
).stdout.decode("utf-8")
|
|
return len(list(filter(None, status_str.split("\n")))) > 0
|
|
|
|
def branches(self):
|
|
br = (
|
|
self.git_run(
|
|
["for-each-ref", "--format=%(refname:short)", "refs/heads/"],
|
|
stdout=subprocess.PIPE,
|
|
)
|
|
.stdout.decode("utf-8")
|
|
.split()
|
|
)
|
|
if len(br) == 0:
|
|
br.append("factory") # unborn branch?
|
|
return br
|
|
|
|
def branch(self, branch, commit="HEAD"):
|
|
commit = (
|
|
self.git_run(
|
|
["rev-parse", "--verify", "--end-of-options", commit + "^{commit}"],
|
|
stdout=subprocess.PIPE,
|
|
)
|
|
.stdout.decode("utf-8")
|
|
.strip()
|
|
)
|
|
return self.git_run(["branch", branch, commit])
|
|
|
|
def checkout(self, branch):
|
|
"""Checkout into the branch HEAD"""
|
|
new_branch = False
|
|
if branch not in self.branches():
|
|
self.git_run(["switch", "-q", "--orphan", branch])
|
|
new_branch = True
|
|
else:
|
|
ref = f"refs/heads/{branch}"
|
|
if (self.path / ".git" / ref).exists():
|
|
self.git_run(["switch", "--no-guess", "-q", branch])
|
|
return new_branch
|
|
|
|
def commit(
|
|
self,
|
|
user,
|
|
user_email,
|
|
user_time,
|
|
message,
|
|
parents=None,
|
|
committer=None,
|
|
committer_email=None,
|
|
committer_time=None,
|
|
):
|
|
"""Add all the files and create a new commit in the current HEAD"""
|
|
|
|
if not committer:
|
|
committer = self.committer if self.committer else self.user
|
|
committer_email = (
|
|
self.committer_email if self.committer_email else self.user_email
|
|
)
|
|
committer_time = committer_time if committer_time else user_time
|
|
|
|
if self.is_dirty():
|
|
self.git_run(["add", "--all", "."])
|
|
|
|
tree_id = (
|
|
self.git_run(["write-tree"], stdout=subprocess.PIPE)
|
|
.stdout.decode("utf-8")
|
|
.strip()
|
|
)
|
|
|
|
parent_array = []
|
|
if isinstance(parents, list):
|
|
for parent in filter(None, parents):
|
|
parent_array = parent_array + ["-p", parent]
|
|
elif isinstance(parents, str):
|
|
parent_array = ["-p", parents]
|
|
|
|
commit_id = (
|
|
self.git_run(
|
|
["commit-tree"] + parent_array + [tree_id],
|
|
env={
|
|
"GIT_AUTHOR_NAME": user,
|
|
"GIT_AUTHOR_EMAIL": user_email,
|
|
"GIT_AUTHOR_DATE": f"{int(user_time.timestamp())} +0000",
|
|
"GIT_COMMITTER_NAME": committer,
|
|
"GIT_COMMITTER_EMAIL": committer_email,
|
|
"GIT_COMMITTER_DATE": f"{int(committer_time.timestamp())} +0000",
|
|
},
|
|
input=message.encode("utf-8"),
|
|
stdout=subprocess.PIPE,
|
|
)
|
|
.stdout.decode("utf-8")
|
|
.rstrip()
|
|
)
|
|
self.git_run(["reset", "--soft", commit_id])
|
|
return commit_id
|
|
|
|
def branch_head(self, branch="HEAD"):
|
|
return (
|
|
self.git_run(
|
|
["rev-parse", "--verify", "--end-of-options", branch],
|
|
stdout=subprocess.PIPE,
|
|
)
|
|
.stdout.decode("utf-8")
|
|
.strip()
|
|
)
|
|
|
|
def set_branch_head(self, branch, commit):
|
|
return self.git_run(["update-ref", f"refs/heads/{branch}", commit])
|
|
|
|
def gc(self):
|
|
logging.debug(f"Garbage recollect and repackage {self.path}")
|
|
self.git_run(
|
|
["gc", "--auto"],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
)
|
|
|
|
# def clean(self):
|
|
# for path, _ in self.repo.status().items():
|
|
# logging.debug(f"Cleaning {path}")
|
|
# try:
|
|
# (self.path / path).unlink()
|
|
# self.repo.index.remove(path)
|
|
# except Exception as e:
|
|
# logging.warning(f"Error removing file {path}: {e}")
|
|
|
|
def add(self, filename):
|
|
self.git_run(["add", ":(literal)" + filename])
|
|
|
|
def add_default_gitignore(self):
|
|
if not (self.path / ".gitignore").exists():
|
|
with (self.path / ".gitignore").open("w") as f:
|
|
f.write(".osc\n")
|
|
self.add(".gitignore")
|
|
|
|
def add_default_lfs_gitattributes(self, force=False):
|
|
if not (self.path / ".gitattributes").exists() or force:
|
|
with (self.path / ".gitattributes").open("w") as f:
|
|
content = ["## Default LFS"]
|
|
content += [f"*{b} {LFS_SUFFIX}" for b in sorted(BINARY)]
|
|
f.write("\n".join(content))
|
|
f.write("\n")
|
|
self.add(".gitattributes")
|
|
|
|
def add_specific_lfs_gitattributes(self, binaries):
|
|
self.add_default_lfs_gitattributes(force=True)
|
|
if binaries:
|
|
with (self.path / ".gitattributes").open("a") as f:
|
|
content = ["## Specific LFS patterns"]
|
|
content += [f"{b} {LFS_SUFFIX}" for b in sorted(binaries)]
|
|
f.write("\n".join(content))
|
|
f.write("\n")
|
|
self.add(".gitattributes")
|
|
|
|
def get_specific_lfs_gitattributes(self):
|
|
with (self.path / ".gitattributes").open() as f:
|
|
patterns = [
|
|
line.split()[0]
|
|
for line in f
|
|
if line.strip() and not line.startswith("#")
|
|
]
|
|
binary = {f"*{b}" for b in BINARY}
|
|
return [p for p in patterns if p not in binary]
|
|
|
|
def add_lfs(self, filename, sha256, size):
|
|
with (self.path / filename).open("w") as f:
|
|
f.write("version https://git-lfs.github.com/spec/v1\n")
|
|
f.write(f"oid sha256:{sha256}\n")
|
|
f.write(f"size {size}\n")
|
|
self.add(filename)
|
|
|
|
if not self.is_lfs_tracked(filename):
|
|
logging.debug(f"Add specific LFS file {filename}")
|
|
specific_patterns = self.get_specific_lfs_gitattributes()
|
|
specific_patterns.append(filename)
|
|
self.add_specific_lfs_gitattributes(specific_patterns)
|
|
|
|
def is_lfs_tracked(self, filename):
|
|
with (self.path / ".gitattributes").open() as f:
|
|
patterns = (
|
|
line.split()[0]
|
|
for line in f
|
|
if line.strip() and not line.startswith("#")
|
|
)
|
|
return any(fnmatch.fnmatch(filename, line) for line in patterns)
|
|
|
|
def remove(self, file: pathlib.Path):
|
|
self.git_run(
|
|
["rm", "-q", "-f", "--ignore-unmatch", ":(literal)" + file.name],
|
|
)
|
|
patterns = self.get_specific_lfs_gitattributes()
|
|
if file.name in patterns:
|
|
patterns.remove(file.name)
|
|
self.add_specific_lfs_gitattributes(patterns)
|
|
|
|
def add_gitea_remote(self, package):
|
|
repo_name = package.replace("+", "_")
|
|
org_name = "rpm"
|
|
|
|
if not os.getenv("GITEA_TOKEN"):
|
|
logging.warning("Not adding a remote due to missing $GITEA_TOKEN")
|
|
return
|
|
|
|
url = f"https://src.opensuse.org/api/v1/org/{org_name}/repos"
|
|
response = requests.post(
|
|
url,
|
|
data={"name": repo_name},
|
|
headers={"Authorization": f"token {os.getenv('GITEA_TOKEN')}"},
|
|
timeout=10,
|
|
)
|
|
# 409 Conflict (Already existing)
|
|
# 201 Created
|
|
if response.status_code not in (201, 409):
|
|
print(response.data)
|
|
url = f"gitea@src.opensuse.org:{org_name}/{repo_name}.git"
|
|
self.git_run(
|
|
["remote", "add", "origin", url],
|
|
)
|
|
|
|
def push(self, force=False):
|
|
if "origin" not in self.git_run(
|
|
["remote"],
|
|
stdout=subprocess.PIPE,
|
|
).stdout.decode("utf-8"):
|
|
logging.warning("Not pushing to remote because no 'origin' configured")
|
|
return
|
|
|
|
cmd = ["push"]
|
|
if force:
|
|
cmd.append("-f")
|
|
cmd.append("origin")
|
|
cmd.append("refs/heads/factory")
|
|
cmd.append("refs/heads/devel")
|
|
self.git_run(cmd)
|