diff --git a/lib/importer.py b/lib/importer.py index be96eb1..c086f85 100644 --- a/lib/importer.py +++ b/lib/importer.py @@ -1,4 +1,3 @@ -import concurrent.futures import logging import pathlib import xml.etree.ElementTree as ET @@ -107,7 +106,7 @@ class Importer: with self.db.cursor() as cur: cur.execute( """SELECT * FROM revisions WHERE id IN - (SELECT revision_id from linked_revs WHERE linked_id=%s) + (SELECT revision_id from linked_revs WHERE linked_id=%s) AND commit_time <= %s ORDER BY commit_time""", (prev.dbid, rev.commit_time), ) @@ -140,7 +139,7 @@ class Importer: fake_rev = linked.rev + rev.rev / 1000.0 comment = f"Updating link to change in {rev.project}/{rev.package} revision {int(rev.rev)}" cur.execute( - """INSERT INTO revisions (project,package,rev,unexpanded_srcmd5, + """INSERT INTO revisions (project,package,rev,unexpanded_srcmd5, commit_time, userid, comment, api_url) VALUES(%s,%s,%s,%s,%s,%s,%s,%s) RETURNING id""", ( linked.project, @@ -163,10 +162,12 @@ class Importer: (rev.dbid, linked.dbid), ) - def revisions_without_files(self): + def revisions_without_files(self, package): + logging.debug(f"revisions_without_files({package})") with self.db.cursor() as cur: cur.execute( - "SELECT * FROM revisions WHERE broken=FALSE AND expanded_srcmd5 IS NULL" + "SELECT * FROM revisions WHERE package=%s AND broken=FALSE AND expanded_srcmd5 IS NULL", + (package, ) ) return [DBRevision(self.db, row) for row in cur.fetchall()] @@ -180,11 +181,11 @@ class Importer: linked_rev = cur.fetchone() if linked_rev: linked_rev = linked_rev[0] - list = self.obs.list( + obs_dir_list = self.obs.list( rev.project, rev.package, rev.unexpanded_srcmd5, linked_rev ) - if list: - rev.import_dir_list(list) + if obs_dir_list: + rev.import_dir_list(obs_dir_list) md5 = rev.calculate_files_hash() with self.db.cursor() as cur: cur.execute( @@ -198,12 +199,10 @@ class Importer: self.find_linked_revs() self.find_fake_revisions() - with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: - fs = [ - executor.submit(import_rev, self, rev) - for rev in self.revisions_without_files() - ] - concurrent.futures.wait(fs) + for package in self.packages: + for rev in self.revisions_without_files(package): + print(f"rev {rev} is without files") + self.import_rev(rev) def refresh_package(self, project, package): key = f"{project}/{package}" @@ -218,31 +217,25 @@ class Importer: self.fetch_all_linked_packages(project, package) def import_into_db(self): + for package in self.packages: + refresh_package(self, self.project, package) - with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: - for package in self.packages: refresh_package(self, self.project, package) + self.db.conn.commit() - self.db.conn.commit() + for number in DBRevision.requests_to_fetch(self.db): + self.import_request(number) - fs = [ - executor.submit(import_request, self, number) - for number in DBRevision.requests_to_fetch(self.db) - ] - concurrent.futures.wait(fs) + self.db.conn.commit() - self.db.conn.commit() + with self.db.cursor() as cur: + cur.execute( + """SELECT DISTINCT source_project,source_package FROM requests + WHERE id IN (SELECT request_id FROM revisions WHERE project=%s and package = ANY(%s));""", + (self.project, self.packages), + ) + for project, package in cur.fetchall(): + self.refresh_package(project, package) - with self.db.cursor() as cur: - cur.execute( - """SELECT DISTINCT source_project,source_package FROM requests - WHERE id IN (SELECT request_id FROM revisions WHERE project=%s and package = ANY(%s));""", - (self.project, self.packages), - ) - fs = [ - executor.submit(refresh_package, self, project, package) - for project, package in cur.fetchall() - ] - concurrent.futures.wait(fs) self.db.conn.commit() missing_users = User.missing_users(self.db)