From ab38332642701f8b885dd9d95f5ce382985c582e Mon Sep 17 00:00:00 2001 From: Stephan Kulow Date: Thu, 3 Nov 2022 20:14:56 +0100 Subject: [PATCH 1/5] Allow to import multiple packages in one go This way we avoid duplicating all startup and SQL queries --- git-importer.py | 24 +++++++++-------- lib/git.py | 65 +-------------------------------------------- lib/git_exporter.py | 2 +- lib/importer.py | 19 ++++++++----- 4 files changed, 28 insertions(+), 82 deletions(-) diff --git a/git-importer.py b/git-importer.py index d5c17f6..0d7d98f 100755 --- a/git-importer.py +++ b/git-importer.py @@ -44,11 +44,12 @@ PROJECTS = [ def main(): parser = argparse.ArgumentParser(description="OBS history importer into git") - parser.add_argument("package", help="OBS package name") + parser.add_argument("packages", help="OBS package names", nargs="*") parser.add_argument( "-r", "--repodir", required=False, + default=pathlib.Path("repos"), type=pathlib.Path, help="Local git repository directory", ) @@ -94,22 +95,23 @@ def main(): requests_log.propagate = True if args.export: - TestExporter(args.package).run() + if len(args.packages) != 0: + print("Can only export one package") + sys.exit(1) + TestExporter(args.packages[0]).run() return - if not args.repodir: - args.repodir = pathlib.Path("repos") / args.package - if not args.cachedir: args.cachedir = pathlib.Path("~/.cache/git-import/").expanduser() - importer = Importer(URL_OBS, "openSUSE:Factory", args.package) + importer = Importer(URL_OBS, "openSUSE:Factory", args.packages) importer.import_into_db() - exporter = GitExporter( - URL_OBS, "openSUSE:Factory", args.package, args.repodir, args.cachedir - ) - exporter.set_gc_interval(args.gc) - exporter.export_as_git() + for package in args.packages: + exporter = GitExporter( + URL_OBS, "openSUSE:Factory", package, args.repodir, args.cachedir + ) + exporter.set_gc_interval(args.gc) + exporter.export_as_git() if __name__ == "__main__": diff --git a/lib/git.py b/lib/git.py index a02b3ec..56b1ee3 100644 --- a/lib/git.py +++ b/lib/git.py @@ -109,69 +109,6 @@ class Git: "HEAD", author, committer, message, tree, parents ) - def merge( - self, - user, - user_email, - user_time, - message, - commit, - committer=None, - committer_email=None, - committer_time=None, - clean_on_conflict=True, - merged=False, - allow_empty=False, - ): - new_branch = False - - if not merged: - try: - self.repo.merge(commit) - except KeyError: - # If it is the first commit, we will have a missing - # "HEAD", but the files will be there. We can proceed - # to the commit directly. - new_branch = True - - if not merged and self.repo.index.conflicts: - for conflict in self.repo.index.conflicts: - conflict = [c for c in conflict if c] - if conflict: - logging.info(f"CONFLICT {conflict[0].path}") - - if clean_on_conflict: - self.clean() - # Now I miss Rust enums - return "CONFLICT" - - # Some merges are empty in OBS (no changes, not sure - # why), for now we signal them - if not allow_empty and not self.is_dirty(): - # I really really do miss Rust enums - return "EMPTY" - - if new_branch: - parents = [commit] - else: - parents = [ - self.repo.head.target, - commit, - ] - commit = self.commit( - user, - user_email, - user_time, - message, - parents, - committer, - committer_email, - committer_time, - allow_empty=allow_empty, - ) - - return commit - def merge_abort(self): self.repo.state_cleanup() @@ -188,7 +125,7 @@ class Git: self.repo.references["refs/heads/" + branch].set_target(commit) def gc(self): - logging.info(f"Garbage recollect and repackage {self.path}") + logging.debug(f"Garbage recollect and repackage {self.path}") subprocess.run( ["git", "gc", "--auto"], cwd=self.path, diff --git a/lib/git_exporter.py b/lib/git_exporter.py index f606946..d1b7da8 100644 --- a/lib/git_exporter.py +++ b/lib/git_exporter.py @@ -20,7 +20,7 @@ class GitExporter: self.obs.change_url(api_url) self.proxy_sha256 = ProxySHA256(self.obs, enabled=True) self.git = Git( - repodir, + repodir / package, committer="Git OBS Bridge", committer_email="obsbridge@suse.de", ).create() diff --git a/lib/importer.py b/lib/importer.py index b5431b4..0b77c04 100644 --- a/lib/importer.py +++ b/lib/importer.py @@ -9,9 +9,9 @@ from lib.user import User class Importer: - def __init__(self, api_url, project, package): - # Import a Factory package into the database - self.package = package + def __init__(self, api_url, project, packages): + # Import multiple Factory packages into the database + self.packages = packages self.project = project self.obs = OBS() @@ -191,23 +191,30 @@ class Importer: def import_into_db(self): db = DB() - self.refresh_package(db, self.project, self.package) + for package in self.packages: + self.refresh_package(db, self.project, package) + db.conn.commit() + for number in DBRevision.requests_to_fetch(db): self.obs.request(number).import_into_db(db) + db.conn.commit() + with db.cursor() as cur: cur.execute( """SELECT DISTINCT source_project,source_package FROM requests - WHERE id IN (SELECT request_id FROM revisions WHERE project=%s and package=%s);""", - (self.project, self.package), + WHERE id IN (SELECT request_id FROM revisions WHERE project=%s and package = ANY(%s));""", + (self.project, self.packages), ) for project, package in cur.fetchall(): self.refresh_package(db, project, package) + db.conn.commit() missing_users = User.missing_users(db) for userid in missing_users: missing_user = self.obs.user(userid) if missing_user: missing_user.import_into_db(db) + db.conn.commit() self.fill_file_lists(db) db.conn.commit() -- 2.45.2 From d21ce571f57d4ce666ea7cdcc39eb86dc236ed4b Mon Sep 17 00:00:00 2001 From: Stephan Kulow Date: Thu, 3 Nov 2022 22:04:45 +0100 Subject: [PATCH 2/5] Refresh the packages in multiple threads --- lib/db_revision.py | 9 ++- lib/git_exporter.py | 1 + lib/importer.py | 178 ++++++++++++++++++++++++++------------------ 3 files changed, 115 insertions(+), 73 deletions(-) diff --git a/lib/db_revision.py b/lib/db_revision.py index 627e57f..ddbe283 100644 --- a/lib/db_revision.py +++ b/lib/db_revision.py @@ -111,13 +111,18 @@ class DBRevision: return DBRevision(db, row) @staticmethod - def latest_revision(db, project, package): + def max_rev(db, project, package): with db.cursor() as cur: cur.execute( "SELECT MAX(rev) FROM revisions where project=%s and package=%s", (project, package), ) - max = cur.fetchone()[0] + return cur.fetchone()[0] + return None + + @staticmethod + def latest_revision(db, project, package): + max = DBRevision.max_rev(db, project, package) if max: return DBRevision.fetch_revision(db, project, package, max) return None diff --git a/lib/git_exporter.py b/lib/git_exporter.py index d1b7da8..fdfc77f 100644 --- a/lib/git_exporter.py +++ b/lib/git_exporter.py @@ -60,6 +60,7 @@ class GitExporter: gc_cnt = self.gc_interval if len(left_to_commit) > 0: + logging.info(f"Commiting into {self.git.path}") self.git.gc() for flat in left_to_commit: gc_cnt -= 1 diff --git a/lib/importer.py b/lib/importer.py index 0b77c04..031d1eb 100644 --- a/lib/importer.py +++ b/lib/importer.py @@ -1,3 +1,4 @@ +import concurrent.futures import logging import xml.etree.ElementTree as ET @@ -8,26 +9,42 @@ from lib.obs_revision import OBSRevision from lib.user import User +def refresh_package(importer, project, package): + importer.refresh_package(project, package) + + +def import_request(importer, number): + importer.import_request(number) + + +def import_rev(importer, rev): + importer.import_rev(rev) + + class Importer: def __init__(self, api_url, project, packages): # Import multiple Factory packages into the database self.packages = packages self.project = project + self.db = DB() self.obs = OBS() assert project == "openSUSE:Factory" self.obs.change_url(api_url) self.refreshed_packages = set() - def update_db_package(self, db, project, package): + def import_request(self, number): + self.obs.request(number).import_into_db(self.db) + + def update_db_package(self, project, package): root = self.obs._history(project, package) if root is None: return - latest = DBRevision.latest_revision(db, project, package) + latest = DBRevision.max_rev(self.db, project, package) for r in root.findall("revision"): rev = OBSRevision(self.obs, project, package).parse(r) - if not latest or rev.rev > latest.rev: - dbrev = DBRevision.import_obs_rev(db, rev) + if not latest or rev.rev > latest: + dbrev = DBRevision.import_obs_rev(self.db, rev) try: root = rev.read_link() except ET.ParseError: @@ -38,15 +55,15 @@ class Importer: tpkg = root.get("package") or package dbrev.links_to(tprj, tpkg) - def find_linked_revs(self, db): - with db.cursor() as cur: + def find_linked_revs(self): + with self.db.cursor() as cur: cur.execute( """SELECT * from revisions WHERE id in (SELECT l.revision_id FROM links l LEFT JOIN linked_revs lrevs ON lrevs.revision_id=l.revision_id WHERE lrevs.id IS NULL) and broken is FALSE;""" ) for row in cur.fetchall(): - rev = DBRevision(db, row) + rev = DBRevision(self.db, row) linked_rev = rev.linked_rev() if not linked_rev: logging.debug(f"No link {rev}") @@ -57,8 +74,8 @@ class Importer: (rev.dbid, linked_rev.dbid), ) - def fetch_all_linked_packages(self, db, project, package): - with db.cursor() as cur: + def fetch_all_linked_packages(self, project, package): + with self.db.cursor() as cur: cur.execute( """SELECT DISTINCT l.project, l.package from links l JOIN revisions r on r.id=l.revision_id WHERE r.project=%s AND r.package=%s""", @@ -67,26 +84,26 @@ class Importer: for row in cur.fetchall(): (lproject, lpackage) = row # recurse - self.refresh_package(db, lproject, lpackage) + self.refresh_package(lproject, lpackage) - def find_fake_revisions(self, db): - with db.cursor() as cur: + def find_fake_revisions(self): + with self.db.cursor() as cur: cur.execute( "SELECT * from revisions WHERE id in (SELECT linked_id from linked_revs WHERE considered=FALSE)" ) for row in cur.fetchall(): - self._find_fake_revision(db, DBRevision(db, row)) + self._find_fake_revision(DBRevision(self.db, row)) - def _find_fake_revision(self, db, rev): + def _find_fake_revision(self, rev): prev = rev.previous_commit() if not prev: - with db.cursor() as cur: + with self.db.cursor() as cur: cur.execute( "UPDATE linked_revs SET considered=TRUE where linked_id=%s", (rev.dbid,), ) return - with db.cursor() as cur: + with self.db.cursor() as cur: cur.execute( """SELECT * FROM revisions WHERE id IN (SELECT revision_id from linked_revs WHERE linked_id=%s) @@ -95,7 +112,7 @@ class Importer: ) last_linked = None for linked in cur.fetchall(): - linked = DBRevision(db, linked) + linked = DBRevision(self.db, linked) nextrev = linked.next_commit() if nextrev and nextrev.commit_time < rev.commit_time: continue @@ -107,7 +124,7 @@ class Importer: if not last_linked: return - with db.cursor() as cur: + with self.db.cursor() as cur: linked = last_linked cur.execute( "SELECT 1 FROM fake_revs where revision_id=%s AND linked_id=%s", @@ -144,77 +161,96 @@ class Importer: (rev.dbid, linked.dbid), ) - def revisions_without_files(self, db): - with db.cursor() as cur: + def revisions_without_files(self): + with self.db.cursor() as cur: cur.execute( "SELECT * FROM revisions WHERE broken=FALSE AND expanded_srcmd5 IS NULL" ) - return [DBRevision(db, row) for row in cur.fetchall()] + return [DBRevision(self.db, row) for row in cur.fetchall()] - def fill_file_lists(self, db): - self.find_linked_revs(db) - - self.find_fake_revisions(db) - for rev in self.revisions_without_files(db): - with db.cursor() as cur: - cur.execute( - """SELECT unexpanded_srcmd5 from revisions WHERE - id=(SELECT linked_id FROM linked_revs WHERE revision_id=%s)""", - (rev.dbid,), - ) - linked_rev = cur.fetchone() - if linked_rev: - linked_rev = linked_rev[0] - list = self.obs.list( - rev.project, rev.package, rev.unexpanded_srcmd5, linked_rev + def import_rev(self, rev): + with self.db.cursor() as cur: + cur.execute( + """SELECT unexpanded_srcmd5 from revisions WHERE + id=(SELECT linked_id FROM linked_revs WHERE revision_id=%s)""", + (rev.dbid,), ) - if list: - rev.import_dir_list(list) - md5 = rev.calculate_files_hash() - with db.cursor() as cur: - cur.execute( - "UPDATE revisions SET files_hash=%s WHERE id=%s", - (md5, rev.dbid), - ) - else: - rev.set_broken() + linked_rev = cur.fetchone() + if linked_rev: + linked_rev = linked_rev[0] + list = self.obs.list( + rev.project, rev.package, rev.unexpanded_srcmd5, linked_rev + ) + if list: + rev.import_dir_list(list) + md5 = rev.calculate_files_hash() + with self.db.cursor() as cur: + cur.execute( + "UPDATE revisions SET files_hash=%s WHERE id=%s", + (md5, rev.dbid), + ) + else: + rev.set_broken() - def refresh_package(self, db, project, package): + def fill_file_lists(self): + self.find_linked_revs() + + self.find_fake_revisions() + with concurrent.futures.ThreadPoolExecutor() as executor: + fs = [ + executor.submit(import_rev, self, rev) + for rev in self.revisions_without_files() + ] + concurrent.futures.wait(fs) + + def refresh_package(self, project, package): key = f"{project}/{package}" if key in self.refreshed_packages: # refreshing once is good enough return + logging.info(f"Refresh {project}/{package}") self.refreshed_packages.add(key) - self.update_db_package(db, project, package) - self.fetch_all_linked_packages(db, project, package) + self.update_db_package(project, package) + self.fetch_all_linked_packages(project, package) def import_into_db(self): - db = DB() - for package in self.packages: - self.refresh_package(db, self.project, package) - db.conn.commit() + with concurrent.futures.ThreadPoolExecutor() as executor: + fs = [ + executor.submit(refresh_package, self, self.project, package) + for package in self.packages + ] + concurrent.futures.wait(fs) - for number in DBRevision.requests_to_fetch(db): - self.obs.request(number).import_into_db(db) - db.conn.commit() + self.db.conn.commit() - with db.cursor() as cur: - cur.execute( - """SELECT DISTINCT source_project,source_package FROM requests - WHERE id IN (SELECT request_id FROM revisions WHERE project=%s and package = ANY(%s));""", - (self.project, self.packages), - ) - for project, package in cur.fetchall(): - self.refresh_package(db, project, package) - db.conn.commit() + fs = [ + executor.submit(import_request, self, number) + for number in DBRevision.requests_to_fetch(self.db) + ] + concurrent.futures.wait(fs) - missing_users = User.missing_users(db) + self.db.conn.commit() + + with self.db.cursor() as cur: + cur.execute( + """SELECT DISTINCT source_project,source_package FROM requests + WHERE id IN (SELECT request_id FROM revisions WHERE project=%s and package = ANY(%s));""", + (self.project, self.packages), + ) + fs = [ + executor.submit(refresh_package, self, project, package) + for project, package in cur.fetchall() + ] + concurrent.futures.wait(fs) + self.db.conn.commit() + + missing_users = User.missing_users(self.db) for userid in missing_users: missing_user = self.obs.user(userid) if missing_user: - missing_user.import_into_db(db) - db.conn.commit() + missing_user.import_into_db(self.db) + self.db.conn.commit() - self.fill_file_lists(db) - db.conn.commit() + self.fill_file_lists() + self.db.conn.commit() -- 2.45.2 From 33a5733cb98eba090a8d231de9120a7ada4be745 Mon Sep 17 00:00:00 2001 From: Stephan Kulow Date: Fri, 4 Nov 2022 07:48:17 +0100 Subject: [PATCH 3/5] Create the git repos in multiple processes Threads appear to be too dangerous for this --- Makefile | 4 ++-- git-importer.py | 21 +++++++++++++++------ lib/proxy_sha256.py | 5 ++++- 3 files changed, 21 insertions(+), 9 deletions(-) diff --git a/Makefile b/Makefile index 3171f83..050b045 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,7 @@ all: isort *.py lib/*py tests/*py - autoflake -r --in-place --remove-unused-variables . - black . + autoflake --in-place --remove-unused-variables *.py lib/*py tests/*py + black *.py lib/*py tests/*py test: python3 -m unittest -v tests/*.py diff --git a/git-importer.py b/git-importer.py index 0d7d98f..a40d04e 100755 --- a/git-importer.py +++ b/git-importer.py @@ -1,6 +1,7 @@ #!/usr/bin/python3 import argparse +import concurrent.futures import logging import pathlib import sys @@ -42,6 +43,12 @@ PROJECTS = [ ] +def export_package(package, repodir, cachedir, gc): + exporter = GitExporter(URL_OBS, "openSUSE:Factory", package, repodir, cachedir) + exporter.set_gc_interval(gc) + exporter.export_as_git() + + def main(): parser = argparse.ArgumentParser(description="OBS history importer into git") parser.add_argument("packages", help="OBS package names", nargs="*") @@ -106,12 +113,14 @@ def main(): importer = Importer(URL_OBS, "openSUSE:Factory", args.packages) importer.import_into_db() - for package in args.packages: - exporter = GitExporter( - URL_OBS, "openSUSE:Factory", package, args.repodir, args.cachedir - ) - exporter.set_gc_interval(args.gc) - exporter.export_as_git() + with concurrent.futures.ProcessPoolExecutor() as executor: + fs = [ + executor.submit( + export_package, package, args.repodir, args.cachedir, args.gc + ) + for package in args.packages + ] + concurrent.futures.wait(fs) if __name__ == "__main__": diff --git a/lib/proxy_sha256.py b/lib/proxy_sha256.py index 2864351..8513b04 100644 --- a/lib/proxy_sha256.py +++ b/lib/proxy_sha256.py @@ -41,7 +41,9 @@ class ProxySHA256: self.hashes = dict() return logging.debug("Retrieve all previously defined SHA256") - response = requests.get(f"http://source.dyn.cloud.suse.de/package/{package}") + response = requests.get( + f"http://source.dyn.cloud.suse.de/package/{package}", timeout=5 + ) if response.status_code == 200: json = response.json() self.hashes = json["shas"] @@ -67,6 +69,7 @@ class ProxySHA256: "url": url, "package": package, }, + timeout=10, ) if response.status_code != 200: raise Exception(f"Redirector error on {self.url} for {url}") -- 2.45.2 From 60ced896a3615b99cfdf1ccf6f6094ef87bd0346 Mon Sep 17 00:00:00 2001 From: Stephan Kulow Date: Fri, 4 Nov 2022 09:58:36 +0100 Subject: [PATCH 4/5] Fix condition for export --- git-importer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/git-importer.py b/git-importer.py index a40d04e..6b030d6 100755 --- a/git-importer.py +++ b/git-importer.py @@ -102,7 +102,7 @@ def main(): requests_log.propagate = True if args.export: - if len(args.packages) != 0: + if len(args.packages) != 1: print("Can only export one package") sys.exit(1) TestExporter(args.packages[0]).run() -- 2.45.2 From a457a16a50475a64da4bc689f5382fb42033bff0 Mon Sep 17 00:00:00 2001 From: Stephan Kulow Date: Fri, 4 Nov 2022 10:00:28 +0100 Subject: [PATCH 5/5] Limit the workers to 8 This is hard coding the limit, we may want to make this configurable but for now the machines supposed to run this code are very similiar --- git-importer.py | 2 +- lib/importer.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/git-importer.py b/git-importer.py index 6b030d6..2948efb 100755 --- a/git-importer.py +++ b/git-importer.py @@ -113,7 +113,7 @@ def main(): importer = Importer(URL_OBS, "openSUSE:Factory", args.packages) importer.import_into_db() - with concurrent.futures.ProcessPoolExecutor() as executor: + with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor: fs = [ executor.submit( export_package, package, args.repodir, args.cachedir, args.gc diff --git a/lib/importer.py b/lib/importer.py index 031d1eb..5cd3062 100644 --- a/lib/importer.py +++ b/lib/importer.py @@ -196,7 +196,7 @@ class Importer: self.find_linked_revs() self.find_fake_revisions() - with concurrent.futures.ThreadPoolExecutor() as executor: + with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: fs = [ executor.submit(import_rev, self, rev) for rev in self.revisions_without_files() @@ -215,7 +215,7 @@ class Importer: def import_into_db(self): - with concurrent.futures.ThreadPoolExecutor() as executor: + with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: fs = [ executor.submit(refresh_package, self, self.project, package) for package in self.packages -- 2.45.2