import concurrent.futures import logging import xml.etree.ElementTree as ET from lib.db import DB from lib.db_revision import DBRevision from lib.obs import OBS from lib.obs_revision import OBSRevision from lib.user import User def refresh_package(importer, project, package): importer.refresh_package(project, package) def import_request(importer, number): importer.import_request(number) def import_rev(importer, rev): importer.import_rev(rev) class Importer: def __init__(self, api_url, project, packages): # Import multiple Factory packages into the database self.packages = packages self.project = project self.db = DB() self.obs = OBS() assert project == "openSUSE:Factory" self.obs.change_url(api_url) self.refreshed_packages = set() def import_request(self, number): self.obs.request(number).import_into_db(self.db) def update_db_package(self, project, package): root = self.obs._history(project, package) if root is None: return latest = DBRevision.max_rev(self.db, project, package) for r in root.findall("revision"): rev = OBSRevision(self.obs, project, package).parse(r) if not latest or rev.rev > latest: dbrev = DBRevision.import_obs_rev(self.db, rev) try: root = rev.read_link() except ET.ParseError: dbrev.set_broken() continue if root is not None: tprj = root.get("project") or project tpkg = root.get("package") or package dbrev.links_to(tprj, tpkg) def find_linked_revs(self): with self.db.cursor() as cur: cur.execute( """SELECT * from revisions WHERE id in (SELECT l.revision_id FROM links l LEFT JOIN linked_revs lrevs ON lrevs.revision_id=l.revision_id WHERE lrevs.id IS NULL) and broken is FALSE;""" ) for row in cur.fetchall(): rev = DBRevision(self.db, row) linked_rev = rev.linked_rev() if not linked_rev: logging.debug(f"No link {rev}") continue cur.execute( """INSERT INTO linked_revs (revision_id, linked_id) VALUES (%s,%s)""", (rev.dbid, linked_rev.dbid), ) def fetch_all_linked_packages(self, project, package): with self.db.cursor() as cur: cur.execute( """SELECT DISTINCT l.project, l.package from links l JOIN revisions r on r.id=l.revision_id WHERE r.project=%s AND r.package=%s""", (project, package), ) for row in cur.fetchall(): (lproject, lpackage) = row # recurse self.refresh_package(lproject, lpackage) def find_fake_revisions(self): with self.db.cursor() as cur: cur.execute( "SELECT * from revisions WHERE id in (SELECT linked_id from linked_revs WHERE considered=FALSE)" ) for row in cur.fetchall(): self._find_fake_revision(DBRevision(self.db, row)) def _find_fake_revision(self, rev): prev = rev.previous_commit() if not prev: with self.db.cursor() as cur: cur.execute( "UPDATE linked_revs SET considered=TRUE where linked_id=%s", (rev.dbid,), ) return with self.db.cursor() as cur: cur.execute( """SELECT * FROM revisions WHERE id IN (SELECT revision_id from linked_revs WHERE linked_id=%s) AND commit_time <= %s ORDER BY commit_time""", (prev.dbid, rev.commit_time), ) last_linked = None for linked in cur.fetchall(): linked = DBRevision(self.db, linked) nextrev = linked.next_commit() if nextrev and nextrev.commit_time < rev.commit_time: continue last_linked = linked cur.execute( "UPDATE linked_revs SET considered=TRUE where linked_id=%s", (rev.dbid,), ) if not last_linked: return with self.db.cursor() as cur: linked = last_linked cur.execute( "SELECT 1 FROM fake_revs where revision_id=%s AND linked_id=%s", (rev.dbid, linked.dbid), ) if cur.fetchone(): cur.execute( "UPDATE linked_revs SET considered=TRUE where linked_id=%s", (rev.dbid,), ) return fake_rev = linked.rev + rev.rev / 1000.0 comment = f"Updating link to change in {rev.project}/{rev.package} revision {rev.rev}" cur.execute( """INSERT INTO revisions (project,package,rev,unexpanded_srcmd5, commit_time, userid, comment) VALUES(%s,%s,%s,%s,%s,%s,%s) RETURNING id""", ( linked.project, linked.package, fake_rev, linked.unexpanded_srcmd5, rev.commit_time, "buildservice-autocommit", comment, ), ) new_id = cur.fetchone()[0] cur.execute( """INSERT INTO linked_revs (revision_id, linked_id) VALUES (%s,%s)""", (new_id, rev.dbid), ) cur.execute( """INSERT INTO fake_revs (revision_id, linked_id) VALUES (%s,%s)""", (rev.dbid, linked.dbid), ) def revisions_without_files(self): with self.db.cursor() as cur: cur.execute( "SELECT * FROM revisions WHERE broken=FALSE AND expanded_srcmd5 IS NULL" ) return [DBRevision(self.db, row) for row in cur.fetchall()] def import_rev(self, rev): with self.db.cursor() as cur: cur.execute( """SELECT unexpanded_srcmd5 from revisions WHERE id=(SELECT linked_id FROM linked_revs WHERE revision_id=%s)""", (rev.dbid,), ) linked_rev = cur.fetchone() if linked_rev: linked_rev = linked_rev[0] list = self.obs.list( rev.project, rev.package, rev.unexpanded_srcmd5, linked_rev ) if list: rev.import_dir_list(list) md5 = rev.calculate_files_hash() with self.db.cursor() as cur: cur.execute( "UPDATE revisions SET files_hash=%s WHERE id=%s", (md5, rev.dbid), ) else: rev.set_broken() def fill_file_lists(self): self.find_linked_revs() self.find_fake_revisions() with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: fs = [ executor.submit(import_rev, self, rev) for rev in self.revisions_without_files() ] concurrent.futures.wait(fs) def refresh_package(self, project, package): key = f"{project}/{package}" if key in self.refreshed_packages: # refreshing once is good enough return logging.info(f"Refresh {project}/{package}") self.refreshed_packages.add(key) self.update_db_package(project, package) self.fetch_all_linked_packages(project, package) def import_into_db(self): with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: fs = [ executor.submit(refresh_package, self, self.project, package) for package in self.packages ] concurrent.futures.wait(fs) self.db.conn.commit() fs = [ executor.submit(import_request, self, number) for number in DBRevision.requests_to_fetch(self.db) ] concurrent.futures.wait(fs) self.db.conn.commit() with self.db.cursor() as cur: cur.execute( """SELECT DISTINCT source_project,source_package FROM requests WHERE id IN (SELECT request_id FROM revisions WHERE project=%s and package = ANY(%s));""", (self.project, self.packages), ) fs = [ executor.submit(refresh_package, self, project, package) for project, package in cur.fetchall() ] concurrent.futures.wait(fs) self.db.conn.commit() missing_users = User.missing_users(self.db) for userid in missing_users: missing_user = self.obs.user(userid) if missing_user: missing_user.import_into_db(self.db) self.db.conn.commit() self.fill_file_lists() self.db.conn.commit()