forked from importers/git-importer
Refresh the packages in multiple threads
This commit is contained in:
parent
ab38332642
commit
d21ce571f5
@ -111,13 +111,18 @@ class DBRevision:
|
||||
return DBRevision(db, row)
|
||||
|
||||
@staticmethod
|
||||
def latest_revision(db, project, package):
|
||||
def max_rev(db, project, package):
|
||||
with db.cursor() as cur:
|
||||
cur.execute(
|
||||
"SELECT MAX(rev) FROM revisions where project=%s and package=%s",
|
||||
(project, package),
|
||||
)
|
||||
max = cur.fetchone()[0]
|
||||
return cur.fetchone()[0]
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def latest_revision(db, project, package):
|
||||
max = DBRevision.max_rev(db, project, package)
|
||||
if max:
|
||||
return DBRevision.fetch_revision(db, project, package, max)
|
||||
return None
|
||||
|
@ -60,6 +60,7 @@ class GitExporter:
|
||||
|
||||
gc_cnt = self.gc_interval
|
||||
if len(left_to_commit) > 0:
|
||||
logging.info(f"Commiting into {self.git.path}")
|
||||
self.git.gc()
|
||||
for flat in left_to_commit:
|
||||
gc_cnt -= 1
|
||||
|
178
lib/importer.py
178
lib/importer.py
@ -1,3 +1,4 @@
|
||||
import concurrent.futures
|
||||
import logging
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
@ -8,26 +9,42 @@ from lib.obs_revision import OBSRevision
|
||||
from lib.user import User
|
||||
|
||||
|
||||
def refresh_package(importer, project, package):
|
||||
importer.refresh_package(project, package)
|
||||
|
||||
|
||||
def import_request(importer, number):
|
||||
importer.import_request(number)
|
||||
|
||||
|
||||
def import_rev(importer, rev):
|
||||
importer.import_rev(rev)
|
||||
|
||||
|
||||
class Importer:
|
||||
def __init__(self, api_url, project, packages):
|
||||
# Import multiple Factory packages into the database
|
||||
self.packages = packages
|
||||
self.project = project
|
||||
|
||||
self.db = DB()
|
||||
self.obs = OBS()
|
||||
assert project == "openSUSE:Factory"
|
||||
self.obs.change_url(api_url)
|
||||
self.refreshed_packages = set()
|
||||
|
||||
def update_db_package(self, db, project, package):
|
||||
def import_request(self, number):
|
||||
self.obs.request(number).import_into_db(self.db)
|
||||
|
||||
def update_db_package(self, project, package):
|
||||
root = self.obs._history(project, package)
|
||||
if root is None:
|
||||
return
|
||||
latest = DBRevision.latest_revision(db, project, package)
|
||||
latest = DBRevision.max_rev(self.db, project, package)
|
||||
for r in root.findall("revision"):
|
||||
rev = OBSRevision(self.obs, project, package).parse(r)
|
||||
if not latest or rev.rev > latest.rev:
|
||||
dbrev = DBRevision.import_obs_rev(db, rev)
|
||||
if not latest or rev.rev > latest:
|
||||
dbrev = DBRevision.import_obs_rev(self.db, rev)
|
||||
try:
|
||||
root = rev.read_link()
|
||||
except ET.ParseError:
|
||||
@ -38,15 +55,15 @@ class Importer:
|
||||
tpkg = root.get("package") or package
|
||||
dbrev.links_to(tprj, tpkg)
|
||||
|
||||
def find_linked_revs(self, db):
|
||||
with db.cursor() as cur:
|
||||
def find_linked_revs(self):
|
||||
with self.db.cursor() as cur:
|
||||
cur.execute(
|
||||
"""SELECT * from revisions WHERE id in (SELECT l.revision_id FROM links l
|
||||
LEFT JOIN linked_revs lrevs ON lrevs.revision_id=l.revision_id
|
||||
WHERE lrevs.id IS NULL) and broken is FALSE;"""
|
||||
)
|
||||
for row in cur.fetchall():
|
||||
rev = DBRevision(db, row)
|
||||
rev = DBRevision(self.db, row)
|
||||
linked_rev = rev.linked_rev()
|
||||
if not linked_rev:
|
||||
logging.debug(f"No link {rev}")
|
||||
@ -57,8 +74,8 @@ class Importer:
|
||||
(rev.dbid, linked_rev.dbid),
|
||||
)
|
||||
|
||||
def fetch_all_linked_packages(self, db, project, package):
|
||||
with db.cursor() as cur:
|
||||
def fetch_all_linked_packages(self, project, package):
|
||||
with self.db.cursor() as cur:
|
||||
cur.execute(
|
||||
"""SELECT DISTINCT l.project, l.package from links l JOIN revisions r
|
||||
on r.id=l.revision_id WHERE r.project=%s AND r.package=%s""",
|
||||
@ -67,26 +84,26 @@ class Importer:
|
||||
for row in cur.fetchall():
|
||||
(lproject, lpackage) = row
|
||||
# recurse
|
||||
self.refresh_package(db, lproject, lpackage)
|
||||
self.refresh_package(lproject, lpackage)
|
||||
|
||||
def find_fake_revisions(self, db):
|
||||
with db.cursor() as cur:
|
||||
def find_fake_revisions(self):
|
||||
with self.db.cursor() as cur:
|
||||
cur.execute(
|
||||
"SELECT * from revisions WHERE id in (SELECT linked_id from linked_revs WHERE considered=FALSE)"
|
||||
)
|
||||
for row in cur.fetchall():
|
||||
self._find_fake_revision(db, DBRevision(db, row))
|
||||
self._find_fake_revision(DBRevision(self.db, row))
|
||||
|
||||
def _find_fake_revision(self, db, rev):
|
||||
def _find_fake_revision(self, rev):
|
||||
prev = rev.previous_commit()
|
||||
if not prev:
|
||||
with db.cursor() as cur:
|
||||
with self.db.cursor() as cur:
|
||||
cur.execute(
|
||||
"UPDATE linked_revs SET considered=TRUE where linked_id=%s",
|
||||
(rev.dbid,),
|
||||
)
|
||||
return
|
||||
with db.cursor() as cur:
|
||||
with self.db.cursor() as cur:
|
||||
cur.execute(
|
||||
"""SELECT * FROM revisions WHERE id IN
|
||||
(SELECT revision_id from linked_revs WHERE linked_id=%s)
|
||||
@ -95,7 +112,7 @@ class Importer:
|
||||
)
|
||||
last_linked = None
|
||||
for linked in cur.fetchall():
|
||||
linked = DBRevision(db, linked)
|
||||
linked = DBRevision(self.db, linked)
|
||||
nextrev = linked.next_commit()
|
||||
if nextrev and nextrev.commit_time < rev.commit_time:
|
||||
continue
|
||||
@ -107,7 +124,7 @@ class Importer:
|
||||
if not last_linked:
|
||||
return
|
||||
|
||||
with db.cursor() as cur:
|
||||
with self.db.cursor() as cur:
|
||||
linked = last_linked
|
||||
cur.execute(
|
||||
"SELECT 1 FROM fake_revs where revision_id=%s AND linked_id=%s",
|
||||
@ -144,77 +161,96 @@ class Importer:
|
||||
(rev.dbid, linked.dbid),
|
||||
)
|
||||
|
||||
def revisions_without_files(self, db):
|
||||
with db.cursor() as cur:
|
||||
def revisions_without_files(self):
|
||||
with self.db.cursor() as cur:
|
||||
cur.execute(
|
||||
"SELECT * FROM revisions WHERE broken=FALSE AND expanded_srcmd5 IS NULL"
|
||||
)
|
||||
return [DBRevision(db, row) for row in cur.fetchall()]
|
||||
return [DBRevision(self.db, row) for row in cur.fetchall()]
|
||||
|
||||
def fill_file_lists(self, db):
|
||||
self.find_linked_revs(db)
|
||||
|
||||
self.find_fake_revisions(db)
|
||||
for rev in self.revisions_without_files(db):
|
||||
with db.cursor() as cur:
|
||||
cur.execute(
|
||||
"""SELECT unexpanded_srcmd5 from revisions WHERE
|
||||
id=(SELECT linked_id FROM linked_revs WHERE revision_id=%s)""",
|
||||
(rev.dbid,),
|
||||
)
|
||||
linked_rev = cur.fetchone()
|
||||
if linked_rev:
|
||||
linked_rev = linked_rev[0]
|
||||
list = self.obs.list(
|
||||
rev.project, rev.package, rev.unexpanded_srcmd5, linked_rev
|
||||
def import_rev(self, rev):
|
||||
with self.db.cursor() as cur:
|
||||
cur.execute(
|
||||
"""SELECT unexpanded_srcmd5 from revisions WHERE
|
||||
id=(SELECT linked_id FROM linked_revs WHERE revision_id=%s)""",
|
||||
(rev.dbid,),
|
||||
)
|
||||
if list:
|
||||
rev.import_dir_list(list)
|
||||
md5 = rev.calculate_files_hash()
|
||||
with db.cursor() as cur:
|
||||
cur.execute(
|
||||
"UPDATE revisions SET files_hash=%s WHERE id=%s",
|
||||
(md5, rev.dbid),
|
||||
)
|
||||
else:
|
||||
rev.set_broken()
|
||||
linked_rev = cur.fetchone()
|
||||
if linked_rev:
|
||||
linked_rev = linked_rev[0]
|
||||
list = self.obs.list(
|
||||
rev.project, rev.package, rev.unexpanded_srcmd5, linked_rev
|
||||
)
|
||||
if list:
|
||||
rev.import_dir_list(list)
|
||||
md5 = rev.calculate_files_hash()
|
||||
with self.db.cursor() as cur:
|
||||
cur.execute(
|
||||
"UPDATE revisions SET files_hash=%s WHERE id=%s",
|
||||
(md5, rev.dbid),
|
||||
)
|
||||
else:
|
||||
rev.set_broken()
|
||||
|
||||
def refresh_package(self, db, project, package):
|
||||
def fill_file_lists(self):
|
||||
self.find_linked_revs()
|
||||
|
||||
self.find_fake_revisions()
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
fs = [
|
||||
executor.submit(import_rev, self, rev)
|
||||
for rev in self.revisions_without_files()
|
||||
]
|
||||
concurrent.futures.wait(fs)
|
||||
|
||||
def refresh_package(self, project, package):
|
||||
key = f"{project}/{package}"
|
||||
if key in self.refreshed_packages:
|
||||
# refreshing once is good enough
|
||||
return
|
||||
logging.info(f"Refresh {project}/{package}")
|
||||
self.refreshed_packages.add(key)
|
||||
self.update_db_package(db, project, package)
|
||||
self.fetch_all_linked_packages(db, project, package)
|
||||
self.update_db_package(project, package)
|
||||
self.fetch_all_linked_packages(project, package)
|
||||
|
||||
def import_into_db(self):
|
||||
db = DB()
|
||||
|
||||
for package in self.packages:
|
||||
self.refresh_package(db, self.project, package)
|
||||
db.conn.commit()
|
||||
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||
fs = [
|
||||
executor.submit(refresh_package, self, self.project, package)
|
||||
for package in self.packages
|
||||
]
|
||||
concurrent.futures.wait(fs)
|
||||
|
||||
for number in DBRevision.requests_to_fetch(db):
|
||||
self.obs.request(number).import_into_db(db)
|
||||
db.conn.commit()
|
||||
self.db.conn.commit()
|
||||
|
||||
with db.cursor() as cur:
|
||||
cur.execute(
|
||||
"""SELECT DISTINCT source_project,source_package FROM requests
|
||||
WHERE id IN (SELECT request_id FROM revisions WHERE project=%s and package = ANY(%s));""",
|
||||
(self.project, self.packages),
|
||||
)
|
||||
for project, package in cur.fetchall():
|
||||
self.refresh_package(db, project, package)
|
||||
db.conn.commit()
|
||||
fs = [
|
||||
executor.submit(import_request, self, number)
|
||||
for number in DBRevision.requests_to_fetch(self.db)
|
||||
]
|
||||
concurrent.futures.wait(fs)
|
||||
|
||||
missing_users = User.missing_users(db)
|
||||
self.db.conn.commit()
|
||||
|
||||
with self.db.cursor() as cur:
|
||||
cur.execute(
|
||||
"""SELECT DISTINCT source_project,source_package FROM requests
|
||||
WHERE id IN (SELECT request_id FROM revisions WHERE project=%s and package = ANY(%s));""",
|
||||
(self.project, self.packages),
|
||||
)
|
||||
fs = [
|
||||
executor.submit(refresh_package, self, project, package)
|
||||
for project, package in cur.fetchall()
|
||||
]
|
||||
concurrent.futures.wait(fs)
|
||||
self.db.conn.commit()
|
||||
|
||||
missing_users = User.missing_users(self.db)
|
||||
for userid in missing_users:
|
||||
missing_user = self.obs.user(userid)
|
||||
if missing_user:
|
||||
missing_user.import_into_db(db)
|
||||
db.conn.commit()
|
||||
missing_user.import_into_db(self.db)
|
||||
self.db.conn.commit()
|
||||
|
||||
self.fill_file_lists(db)
|
||||
db.conn.commit()
|
||||
self.fill_file_lists()
|
||||
self.db.conn.commit()
|
||||
|
Loading…
Reference in New Issue
Block a user