git-importer/lib/importer.py
2022-11-02 07:20:53 +01:00

381 lines
14 KiB
Python

import logging
import os
import xml.etree.ElementTree as ET
import yaml
from lib.binary import is_binary_or_large
from lib.db import DB
from lib.db_revision import DBRevision
from lib.git import Git
from lib.obs import OBS
from lib.obs_revision import OBSRevision
from lib.proxy_sha256 import ProxySHA256, md5
from lib.tree_builder import AbstractWalker, TreeBuilder
from lib.user import User
class Importer:
def __init__(self, projects, package, repodir):
# The idea is to create each commit in order, and draw the
# same graph described by the revisions timeline. For that we
# need first to fetch all the revisions and sort them
# linearly, based on the timestamp.
#
# After that we recreate the commits, and if one revision is a
# request that contains a target inside the projects in the
# "history", we create a merge commit.
#
# Optionally, if a flag is set, we will try to find a common
# "Initial commit" from a reference branch (the first one in
# "projects", that is safe to assume to be "openSUSE:Factory".
# This is not always a good idea. For example, in a normal
# situation the "devel" project history is older than
# "factory", and we can root the tree on it. But for some
# other projects we lost partially the "devel" history project
# (could be moved), and "factory" is not the root.
self.package = package
self.obs = OBS()
self.git = Git(
repodir,
committer="Git OBS Bridge",
committer_email="obsbridge@suse.de",
).create()
self.state_file = os.path.join(self.git.path, ".git", "_flat_state.yaml")
self.proxy_sha256 = ProxySHA256(self.obs, enabled=True)
self.gc_interval = 200
# Add the "devel" project
(project, branch, api_url) = projects[0]
assert project == "openSUSE:Factory"
self.obs.change_url(api_url)
devel_project = self.obs.devel_project(project, package)
if devel_project:
self.projects = [(devel_project, "devel", api_url)] + projects
else:
self.projects = projects
# Associate the branch and api_url information per project
self.projects_info = {
project: (branch, api_url) for (project, branch, api_url) in self.projects
}
def download(self, revision):
obs_files = self.obs.files(revision.project, revision.package, revision.srcmd5)
git_files = {
(f.name, f.stat().st_size, md5(f))
for f in self.git.path.iterdir()
if f.is_file() and f.name not in (".gitattributes")
}
# Overwrite ".gitattributes" with the
self.git.add_default_lfs_gitattributes(force=True)
# Download each file in OBS if it is not a binary (or large)
# file
for (name, size, file_md5) in obs_files:
# this file creates easily 100k commits and is just useless data :(
# unfortunately it's stored in the same meta package as the project config
if revision.package == "_project" and name == "_staging_workflow":
continue
# have such files been detected as text mimetype before?
is_text = self.proxy_sha256.is_text(name)
if not is_text and is_binary_or_large(name, size):
file_sha256 = self.proxy_sha256.get_or_put(
revision.project,
revision.package,
name,
revision.srcmd5,
file_md5,
size,
)
self.git.add_lfs(name, file_sha256["sha256"], size)
else:
if (name, size, file_md5) not in git_files:
logging.debug(f"Download {name}")
self.obs.download(
revision.project,
revision.package,
name,
revision.srcmd5,
self.git.path,
)
# Validate the MD5 of the downloaded file
if md5(self.git.path / name) != file_md5:
raise Exception(f"Download error in {name}")
self.git.add(name)
# Remove extra files
obs_names = {n for (n, _, _) in obs_files}
git_names = {n for (n, _, _) in git_files}
for name in git_names - obs_names:
logging.debug(f"Remove {name}")
self.git.remove(name)
def set_gc_interval(self, gc):
self.gc_interval = gc
def update_db_package(self, db, project, package):
root = self.obs._history(project, package)
if root is None:
return
latest = DBRevision.latest_revision(db, project, package)
for r in root.findall("revision"):
rev = OBSRevision(self.obs, project, package).parse(r)
if not latest or rev.rev > latest.rev:
dbrev = DBRevision.import_obs_rev(db, rev)
try:
root = rev.read_link()
except ET.ParseError:
dbrev.set_broken(db)
continue
if root is not None:
tprj = root.get("project") or project
tpkg = root.get("package") or package
dbrev.links_to(db, tprj, tpkg)
def find_linked_revs(self, db):
with db.cursor() as cur:
cur.execute(
"""SELECT * from revisions WHERE id in (SELECT l.revision_id FROM links l
LEFT JOIN linked_revs lrevs ON lrevs.revision_id=l.revision_id
WHERE lrevs.id IS NULL) and broken is FALSE;"""
)
for row in cur.fetchall():
rev = DBRevision(row)
linked_rev = rev.linked_rev(db)
if not linked_rev:
logging.debug(f"No link {rev}")
continue
cur.execute(
"""INSERT INTO linked_revs (revision_id, linked_id)
VALUES (%s,%s)""",
(rev.dbid, linked_rev.dbid),
)
def fetch_all_linked_packages(self, db, project, package):
with db.cursor() as cur:
cur.execute(
"""SELECT DISTINCT l.project, l.package from links l JOIN revisions r
on r.id=l.revision_id WHERE r.project=%s AND r.package=%s""",
(project, package),
)
for row in cur.fetchall():
(lproject, lpackage) = row
self.update_db_package(db, lproject, lpackage)
def find_fake_revisions(self, db):
with db.cursor() as cur:
cur.execute(
"SELECT * from revisions WHERE id in (SELECT linked_id from linked_revs WHERE considered=FALSE)"
)
for row in cur.fetchall():
self._find_fake_revision(db, DBRevision(row))
def _find_fake_revision(self, db, rev):
prev = rev.previous_commit(db)
if not prev:
with db.cursor() as cur:
cur.execute(
"UPDATE linked_revs SET considered=TRUE where linked_id=%s",
(rev.dbid,),
)
return
with db.cursor() as cur:
cur.execute(
"""SELECT * FROM revisions WHERE id IN
(SELECT revision_id from linked_revs WHERE linked_id=%s)
AND commit_time <= %s ORDER BY commit_time""",
(prev.dbid, rev.commit_time),
)
last_linked = None
for linked in cur.fetchall():
linked = DBRevision(linked)
nextrev = linked.next_commit(db)
if nextrev and nextrev.commit_time < rev.commit_time:
continue
last_linked = linked
cur.execute(
"UPDATE linked_revs SET considered=TRUE where linked_id=%s",
(rev.dbid,),
)
if not last_linked:
return
with db.cursor() as cur:
linked = last_linked
cur.execute(
"SELECT 1 FROM fake_revs where revision_id=%s AND linked_id=%s",
(rev.dbid, linked.dbid),
)
if cur.fetchone():
cur.execute(
"UPDATE linked_revs SET considered=TRUE where linked_id=%s",
(rev.dbid,),
)
return
fake_rev = linked.rev + rev.rev / 1000.0
comment = f"Updating link to change in {rev.project}/{rev.package} revision {rev.rev}"
cur.execute(
"""INSERT INTO revisions (project,package,rev,unexpanded_srcmd5,
commit_time, userid, comment) VALUES(%s,%s,%s,%s,%s,%s,%s) RETURNING id""",
(
linked.project,
linked.package,
fake_rev,
linked.unexpanded_srcmd5,
rev.commit_time,
"buildservice-autocommit",
comment,
),
)
new_id = cur.fetchone()[0]
cur.execute(
"""INSERT INTO linked_revs (revision_id, linked_id) VALUES (%s,%s)""",
(new_id, rev.dbid),
)
cur.execute(
"""INSERT INTO fake_revs (revision_id, linked_id) VALUES (%s,%s)""",
(rev.dbid, linked.dbid),
)
def revisions_without_files(self, db):
with db.cursor() as cur:
cur.execute(
"SELECT * FROM revisions WHERE broken=FALSE AND expanded_srcmd5 IS NULL"
)
return [DBRevision(row) for row in cur.fetchall()]
def export_as_git(self):
db = DB()
tree = TreeBuilder(db).build(self.package)
flats = tree.as_flat_list()
branch_state = {"factory": None, "devel": None}
state_data = dict()
if os.path.exists(self.state_file):
with open(self.state_file, "r") as f:
state_data = yaml.safe_load(f)
if type(state_data) != dict:
state_data = {}
left_to_commit = []
for flat in reversed(flats):
found_state = False
for branch in ["factory", "devel"]:
if flat.commit.dbid == state_data.get(branch):
branch_state[branch] = flat.commit
flat.commit.git_commit = self.git.branch_head(branch)
logging.debug(
f"Found {self.git.path}'s {branch} branch in state {flat}"
)
left_to_commit = []
found_state = True
if not found_state:
left_to_commit.append(flat)
gc_cnt = self.gc_interval
if len(left_to_commit) > 0:
self.git.gc()
for flat in left_to_commit:
gc_cnt -= 1
if gc_cnt <= 0 and self.gc_interval:
self.git.gc()
gc_cnt = self.gc_interval
logging.debug(f"Committing {flat}")
self.commit_flat(db, flat, branch_state)
def limit_download(self, file):
if file.endswith(".spec") or file.endswith(".changes"):
return True
return False
def commit_flat(self, db, flat, branch_state):
parents = []
self.git.checkout(flat.branch)
if flat.parent1:
parents.append(flat.parent1.git_commit)
if flat.parent2:
parents.append(flat.parent2.git_commit)
to_download, to_delete = flat.commit.calc_delta(db, branch_state[flat.branch])
for file in to_delete:
if not self.limit_download(file):
continue
self.git.remove(file)
for file in to_download:
if not self.limit_download(file):
continue
self.obs.download(
flat.commit.project,
flat.commit.package,
file,
flat.commit.expanded_srcmd5,
self.git.path,
)
self.git.add(file)
commit = self.git.commit(
f"OBS User {flat.commit.userid}",
"null@suse.de",
flat.commit.commit_time,
# TODO: Normalize better the commit message
f"{flat.commit.comment}\n\n{flat.commit}",
allow_empty=True,
parents=parents,
)
flat.commit.git_commit = commit
branch_state[flat.branch] = flat.commit
with open(self.state_file, "w") as f:
data = {}
for branch in ["factory", "devel"]:
commit = branch_state[branch]
if commit:
data[branch] = commit.dbid
yaml.dump(data, f)
def import_into_db(self):
db = DB()
for project, _, api_url in self.projects:
self.obs.change_url(api_url)
self.update_db_package(db, project, self.package)
self.fetch_all_linked_packages(db, project, self.package)
# all remaining, no filtering here
self.find_linked_revs(db)
missing_users = User.missing_users(db)
for userid in missing_users:
missing_user = self.obs.user(userid)
if missing_user:
missing_user.import_into_db(db)
self.find_fake_revisions(db)
for rev in self.revisions_without_files(db):
with db.cursor() as cur:
cur.execute(
"""SELECT unexpanded_srcmd5 from revisions WHERE
id=(SELECT linked_id FROM linked_revs WHERE revision_id=%s)""",
(rev.dbid,),
)
linked_rev = cur.fetchone()
if linked_rev:
linked_rev = linked_rev[0]
list = self.obs.list(
rev.project, rev.package, rev.unexpanded_srcmd5, linked_rev
)
if list:
rev.import_dir_list(db, list)
md5 = rev.calculate_files_hash(db)
with db.cursor() as cur:
cur.execute(
"UPDATE revisions SET files_hash=%s WHERE id=%s",
(md5, rev.dbid),
)
else:
rev.set_broken(db)
for number in DBRevision.requests_to_fetch(db):
self.obs.request(number).import_into_db(db)
db.conn.commit()