import functools import logging import os import xml.etree.ElementTree as ET import yaml from lib.binary import is_binary_or_large from lib.db import DB from lib.db_revision import DBRevision from lib.git import Git from lib.obs import OBS from lib.obs_revision import OBSRevision from lib.proxy_sha256 import ProxySHA256, md5, sha256 from lib.tree_builder import AbstractWalker, TreeBuilder, TreeNode from lib.user import User class Importer: def __init__(self, projects, package, repodir): # The idea is to create each commit in order, and draw the # same graph described by the revisions timeline. For that we # need first to fetch all the revisions and sort them # linearly, based on the timestamp. # # After that we recreate the commits, and if one revision is a # request that contains a target inside the projects in the # "history", we create a merge commit. # # Optionally, if a flag is set, we will try to find a common # "Initial commit" from a reference branch (the first one in # "projects", that is safe to assume to be "openSUSE:Factory". # This is not always a good idea. For example, in a normal # situation the "devel" project history is older than # "factory", and we can root the tree on it. But for some # other projects we lost partially the "devel" history project # (could be moved), and "factory" is not the root. self.package = package self.obs = OBS() self.git = Git( repodir, committer="Git OBS Bridge", committer_email="obsbridge@suse.de", ).create() self.state_file = os.path.join(self.git.path, ".git", "_flat_state.yaml") self.proxy_sha256 = ProxySHA256(self.obs, enabled=True) self.gc_interval = 200 # Add the "devel" project (project, branch, api_url) = projects[0] assert project == "openSUSE:Factory" self.obs.change_url(api_url) devel_project = self.obs.devel_project(project, package) if devel_project: self.projects = [(devel_project, "devel", api_url)] + projects else: self.projects = projects # Associate the branch and api_url information per project self.projects_info = { project: (branch, api_url) for (project, branch, api_url) in self.projects } def download(self, revision): obs_files = self.obs.files(revision.project, revision.package, revision.srcmd5) git_files = { (f.name, f.stat().st_size, md5(f)) for f in self.git.path.iterdir() if f.is_file() and f.name not in (".gitattributes") } # Overwrite ".gitattributes" with the self.git.add_default_lfs_gitattributes(force=True) # Download each file in OBS if it is not a binary (or large) # file for (name, size, file_md5) in obs_files: # this file creates easily 100k commits and is just useless data :( # unfortunately it's stored in the same meta package as the project config if revision.package == "_project" and name == "_staging_workflow": continue # have such files been detected as text mimetype before? is_text = self.proxy_sha256.is_text(name) if not is_text and is_binary_or_large(name, size): file_sha256 = self.proxy_sha256.get_or_put( revision.project, revision.package, name, revision.srcmd5, file_md5, size, ) self.git.add_lfs(name, file_sha256["sha256"], size) else: if (name, size, file_md5) not in git_files: logging.debug(f"Download {name}") self.obs.download( revision.project, revision.package, name, revision.srcmd5, self.git.path, ) # Validate the MD5 of the downloaded file if md5(self.git.path / name) != file_md5: raise Exception(f"Download error in {name}") self.git.add(name) # Remove extra files obs_names = {n for (n, _, _) in obs_files} git_names = {n for (n, _, _) in git_files} for name in git_names - obs_names: logging.debug(f"Remove {name}") self.git.remove(name) def set_gc_interval(self, gc): self.gc_interval = gc def update_db_package(self, db, project, package): root = self.obs._history(project, package) if root is None: return latest = DBRevision.latest_revision(db, project, package) for r in root.findall("revision"): rev = OBSRevision(self.obs, project, package).parse(r) if not latest or rev.rev > latest.rev: dbrev = DBRevision.import_obs_rev(db, rev) try: root = rev.read_link() except ET.ParseError: dbrev.set_broken(db) continue if root is not None: tprj = root.get("project") or project tpkg = root.get("package") or package dbrev.links_to(db, tprj, tpkg) def find_linked_revs(self, db): with db.cursor() as cur: cur.execute( """SELECT * from revisions WHERE id in (SELECT l.revision_id FROM links l LEFT JOIN linked_revs lrevs ON lrevs.revision_id=l.revision_id WHERE lrevs.id IS NULL) and broken is FALSE;""" ) for row in cur.fetchall(): rev = DBRevision(row) linked_rev = rev.linked_rev(db) if not linked_rev: logging.debug(f"No link {rev}") continue cur.execute( """INSERT INTO linked_revs (revision_id, linked_id) VALUES (%s,%s)""", (rev.dbid, linked_rev.dbid), ) def fetch_all_linked_packages(self, db, project, package): with db.cursor() as cur: cur.execute( """SELECT DISTINCT l.project, l.package from links l JOIN revisions r on r.id=l.revision_id WHERE r.project=%s AND r.package=%s""", (project, package), ) for row in cur.fetchall(): (lproject, lpackage) = row self.update_db_package(db, lproject, lpackage) def find_fake_revisions(self, db): with db.cursor() as cur: cur.execute( "SELECT * from revisions WHERE id in (SELECT linked_id from linked_revs WHERE considered=FALSE)" ) for row in cur.fetchall(): self._find_fake_revision(db, DBRevision(row)) def _find_fake_revision(self, db, rev): prev = rev.previous_commit(db) if not prev: with db.cursor() as cur: cur.execute( "UPDATE linked_revs SET considered=TRUE where linked_id=%s", (rev.dbid,), ) return with db.cursor() as cur: cur.execute( """SELECT * FROM revisions WHERE id IN (SELECT revision_id from linked_revs WHERE linked_id=%s) AND commit_time <= %s ORDER BY commit_time""", (prev.dbid, rev.commit_time), ) last_linked = None for linked in cur.fetchall(): linked = DBRevision(linked) nextrev = linked.next_commit(db) if nextrev and nextrev.commit_time < rev.commit_time: continue last_linked = linked cur.execute( "UPDATE linked_revs SET considered=TRUE where linked_id=%s", (rev.dbid,), ) if not last_linked: return with db.cursor() as cur: linked = last_linked cur.execute( "SELECT 1 FROM fake_revs where revision_id=%s AND linked_id=%s", (rev.dbid, linked.dbid), ) if cur.fetchone(): cur.execute( "UPDATE linked_revs SET considered=TRUE where linked_id=%s", (rev.dbid,), ) return fake_rev = linked.rev + rev.rev / 1000.0 comment = f"Updating link to change in {rev.project}/{rev.package} revision {rev.rev}" cur.execute( """INSERT INTO revisions (project,package,rev,unexpanded_srcmd5, commit_time, userid, comment) VALUES(%s,%s,%s,%s,%s,%s,%s) RETURNING id""", ( linked.project, linked.package, fake_rev, linked.unexpanded_srcmd5, rev.commit_time, "buildservice-autocommit", comment, ), ) new_id = cur.fetchone()[0] cur.execute( """INSERT INTO linked_revs (revision_id, linked_id) VALUES (%s,%s)""", (new_id, rev.dbid), ) cur.execute( """INSERT INTO fake_revs (revision_id, linked_id) VALUES (%s,%s)""", (rev.dbid, linked.dbid), ) def revisions_without_files(self, db): with db.cursor() as cur: cur.execute( "SELECT * FROM revisions WHERE broken=FALSE AND expanded_srcmd5 IS NULL" ) return [DBRevision(row) for row in cur.fetchall()] def export_as_git(self): db = DB() tree = TreeBuilder(db).build(self.package) class FlatNode: def __init__(self, branch, commit, parent1=None, parent2=None) -> None: self.branch = branch self.commit = commit self.parent1 = parent1 self.parent2 = parent2 def __str__(self) -> str: p1_str = "" if self.parent1: p1_str = f" p1:{self.parent1.short_string()}" p2_str = "" if self.parent2: p2_str = f" p2:{self.parent2.short_string()}" return f"{self.branch} c:{self.commit.short_string()}{p1_str}{p2_str}" class FlatTreeWalker(AbstractWalker): """While walking the tree, record the commits to do one after the other. These FlatNodes are in the end in the flats array.""" def __init__(self, rebase_devel=False) -> None: super().__init__() self.flats = [] # the rebase_devel won't work as such as rebasing the branch needs an explicit action self.rebase_devel = rebase_devel # remember the last merge point so we can know the parent of it for the root of the sources self.last_merge = None def add(self, branch, commit, parent1=None, parent2=None): self.flats.append(FlatNode(branch, commit, parent1, parent2)) def handle_source_node(self, node) -> None: if self.rebase_devel and node.parent and node.parent.merged_into: self.add("devel", node.revision, node.parent.merged_into.revision) return if node.parent: self.add("devel", node.revision, node.parent.revision) elif self.last_merge: self.add("devel", node.revision, self.last_merge.parent.revision) def call(self, node, is_source) -> None: if is_source: self.handle_source_node(node) return if not node.parent: self.add("factory", node.revision) return if not node.merged: self.add("factory", node.revision, node.parent.revision) return self.add( "factory", node.revision, node.parent.revision, node.merged.revision ) self.last_merge = node ftw = FlatTreeWalker() tree.walk(ftw) branch_state = {"factory": None, "devel": None} state_data = dict() if os.path.exists(self.state_file): with open(self.state_file, "r") as f: state_data = yaml.safe_load(f) if type(state_data) != dict: state_data = {} left_to_commit = [] for flat in reversed(ftw.flats): found_state = False for branch in ["factory", "devel"]: if flat.commit.dbid == state_data.get(branch): branch_state[branch] = flat.commit flat.commit.git_commit = self.git.branch_head(branch) logging.debug( f"Found {self.git.path}'s {branch} branch in state {flat}" ) left_to_commit = [] found_state = True if not found_state: left_to_commit.append(flat) gc_cnt = self.gc_interval if len(left_to_commit) > 0: self.git.gc() for flat in left_to_commit: gc_cnt -= 1 if gc_cnt <= 0 and self.gc_interval: self.git.gc() gc_cnt = self.gc_interval logging.debug(f"Committing {flat}") self.commit_flat(db, flat, branch_state) def limit_download(self, file): if file.endswith(".spec") or file.endswith(".changes"): return True return False def commit_flat(self, db, flat, branch_state): parents = [] self.git.checkout(flat.branch) if flat.parent1: parents.append(flat.parent1.git_commit) if flat.parent2: parents.append(flat.parent2.git_commit) to_download, to_delete = flat.commit.calc_delta(db, branch_state[flat.branch]) for file in to_delete: if not self.limit_download(file): continue self.git.remove(file) for file in to_download: if not self.limit_download(file): continue self.obs.download( flat.commit.project, flat.commit.package, file, flat.commit.expanded_srcmd5, self.git.path, ) self.git.add(file) commit = self.git.commit( f"OBS User {flat.commit.userid}", "null@suse.de", flat.commit.commit_time, # TODO: Normalize better the commit message f"{flat.commit.comment}\n\n{flat.commit}", allow_empty=True, parents=parents, ) flat.commit.git_commit = commit branch_state[flat.branch] = flat.commit with open(self.state_file, "w") as f: data = {} for branch in ["factory", "devel"]: commit = branch_state[branch] if commit: data[branch] = commit.dbid yaml.dump(data, f) def import_into_db(self): db = DB() for project, _, api_url in self.projects: self.obs.change_url(api_url) self.update_db_package(db, project, self.package) self.fetch_all_linked_packages(db, project, self.package) # all remaining, no filtering here self.find_linked_revs(db) missing_users = User.missing_users(db) for userid in missing_users: missing_user = self.obs.user(userid) if missing_user: missing_user.import_into_db(db) self.find_fake_revisions(db) for rev in self.revisions_without_files(db): with db.cursor() as cur: cur.execute( """SELECT unexpanded_srcmd5 from revisions WHERE id=(SELECT linked_id FROM linked_revs WHERE revision_id=%s)""", (rev.dbid,), ) linked_rev = cur.fetchone() if linked_rev: linked_rev = linked_rev[0] list = self.obs.list( rev.project, rev.package, rev.unexpanded_srcmd5, linked_rev ) if list: rev.import_dir_list(db, list) md5 = rev.calculate_files_hash(db) with db.cursor() as cur: cur.execute( "UPDATE revisions SET files_hash=%s WHERE id=%s", (md5, rev.dbid), ) else: rev.set_broken(db) for number in DBRevision.requests_to_fetch(db): self.obs.request(number).import_into_db(db) db.conn.commit()