forked from adamm/git-importer
56cbe0a125
There seems to be races when using db cursors from multiple threads. as found by import issues after switching to a newer computer that has performance and energy efficient cores. As this is not particularly performance critical, convert to single threaded use which makes it work again
258 lines
9.3 KiB
Python
258 lines
9.3 KiB
Python
import logging
|
|
import pathlib
|
|
import xml.etree.ElementTree as ET
|
|
|
|
from lib.db import DB
|
|
from lib.db_revision import DBRevision
|
|
from lib.obs import OBS
|
|
from lib.obs_revision import OBSRevision
|
|
from lib.user import User
|
|
|
|
|
|
def refresh_package(importer, project, package):
|
|
importer.refresh_package(project, package)
|
|
|
|
|
|
def import_request(importer, number):
|
|
importer.import_request(number)
|
|
|
|
|
|
def import_rev(importer, rev):
|
|
importer.import_rev(rev)
|
|
|
|
|
|
class Importer:
|
|
def __init__(self, api_url, project, packages):
|
|
# Import multiple Factory packages into the database
|
|
self.packages = packages
|
|
self.project = project
|
|
|
|
self.db = DB()
|
|
self.obs = OBS(api_url)
|
|
assert project == "openSUSE:Factory"
|
|
self.refreshed_packages = set()
|
|
self.gone_packages_set = None
|
|
|
|
def import_request(self, number):
|
|
self.obs.request(number).import_into_db(self.db)
|
|
|
|
def update_db_package(self, project, package):
|
|
root = self.obs._history(project, package)
|
|
if root is None:
|
|
return
|
|
latest = DBRevision.max_rev(self.db, project, package)
|
|
for r in root.findall("revision"):
|
|
rev = OBSRevision(self.obs, project, package).parse(r)
|
|
if not latest or rev.rev > latest:
|
|
dbrev = DBRevision.import_obs_rev(self.db, rev)
|
|
try:
|
|
root = rev.read_link()
|
|
except ET.ParseError:
|
|
dbrev.set_broken()
|
|
continue
|
|
if root is not None:
|
|
tprj = root.get("project") or project
|
|
tpkg = root.get("package") or package
|
|
dbrev.links_to(tprj, tpkg)
|
|
|
|
def find_linked_revs(self):
|
|
with self.db.cursor() as cur:
|
|
cur.execute(
|
|
"""SELECT * from revisions WHERE id in (SELECT l.revision_id FROM links l
|
|
LEFT JOIN linked_revs lrevs ON lrevs.revision_id=l.revision_id
|
|
WHERE lrevs.id IS NULL) and broken is FALSE;"""
|
|
)
|
|
for row in cur.fetchall():
|
|
rev = DBRevision(self.db, row)
|
|
linked_rev = rev.linked_rev()
|
|
if not linked_rev:
|
|
logging.debug(f"No link {rev}")
|
|
continue
|
|
cur.execute(
|
|
"""INSERT INTO linked_revs (revision_id, linked_id)
|
|
VALUES (%s,%s)""",
|
|
(rev.dbid, linked_rev.dbid),
|
|
)
|
|
|
|
def fetch_all_linked_packages(self, project, package):
|
|
with self.db.cursor() as cur:
|
|
cur.execute(
|
|
"""SELECT DISTINCT l.project, l.package from links l JOIN revisions r
|
|
on r.id=l.revision_id WHERE r.project=%s AND r.package=%s""",
|
|
(project, package),
|
|
)
|
|
for row in cur.fetchall():
|
|
(lproject, lpackage) = row
|
|
# recurse
|
|
self.refresh_package(lproject, lpackage)
|
|
|
|
def find_fake_revisions(self):
|
|
with self.db.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT * from revisions WHERE id in (SELECT linked_id from linked_revs WHERE considered=FALSE)"
|
|
)
|
|
for row in cur.fetchall():
|
|
self._find_fake_revision(DBRevision(self.db, row))
|
|
|
|
def _find_fake_revision(self, rev):
|
|
prev = rev.previous_commit()
|
|
if not prev:
|
|
with self.db.cursor() as cur:
|
|
cur.execute(
|
|
"UPDATE linked_revs SET considered=TRUE where linked_id=%s",
|
|
(rev.dbid,),
|
|
)
|
|
return
|
|
with self.db.cursor() as cur:
|
|
cur.execute(
|
|
"""SELECT * FROM revisions WHERE id IN
|
|
(SELECT revision_id from linked_revs WHERE linked_id=%s)
|
|
AND commit_time <= %s ORDER BY commit_time""",
|
|
(prev.dbid, rev.commit_time),
|
|
)
|
|
last_linked = None
|
|
for linked in cur.fetchall():
|
|
linked = DBRevision(self.db, linked)
|
|
nextrev = linked.next_commit()
|
|
if nextrev and nextrev.commit_time < rev.commit_time:
|
|
continue
|
|
last_linked = linked
|
|
cur.execute(
|
|
"UPDATE linked_revs SET considered=TRUE where linked_id=%s",
|
|
(rev.dbid,),
|
|
)
|
|
if not last_linked:
|
|
return
|
|
|
|
with self.db.cursor() as cur:
|
|
linked = last_linked
|
|
cur.execute(
|
|
"SELECT 1 FROM fake_revs where revision_id=%s AND linked_id=%s",
|
|
(rev.dbid, linked.dbid),
|
|
)
|
|
if cur.fetchone():
|
|
cur.execute(
|
|
"UPDATE linked_revs SET considered=TRUE where linked_id=%s",
|
|
(rev.dbid,),
|
|
)
|
|
return
|
|
fake_rev = linked.rev + rev.rev / 1000.0
|
|
comment = f"Updating link to change in {rev.project}/{rev.package} revision {int(rev.rev)}"
|
|
cur.execute(
|
|
"""INSERT INTO revisions (project,package,rev,unexpanded_srcmd5,
|
|
commit_time, userid, comment, api_url) VALUES(%s,%s,%s,%s,%s,%s,%s,%s) RETURNING id""",
|
|
(
|
|
linked.project,
|
|
linked.package,
|
|
fake_rev,
|
|
linked.unexpanded_srcmd5,
|
|
rev.commit_time,
|
|
"buildservice-autocommit",
|
|
comment,
|
|
linked.api_url,
|
|
),
|
|
)
|
|
new_id = cur.fetchone()[0]
|
|
cur.execute(
|
|
"""INSERT INTO linked_revs (revision_id, linked_id) VALUES (%s,%s)""",
|
|
(new_id, rev.dbid),
|
|
)
|
|
cur.execute(
|
|
"""INSERT INTO fake_revs (revision_id, linked_id) VALUES (%s,%s)""",
|
|
(rev.dbid, linked.dbid),
|
|
)
|
|
|
|
def revisions_without_files(self, package):
|
|
logging.debug(f"revisions_without_files({package})")
|
|
with self.db.cursor() as cur:
|
|
cur.execute(
|
|
"SELECT * FROM revisions WHERE package=%s AND broken=FALSE AND expanded_srcmd5 IS NULL",
|
|
(package, )
|
|
)
|
|
return [DBRevision(self.db, row) for row in cur.fetchall()]
|
|
|
|
def import_rev(self, rev):
|
|
with self.db.cursor() as cur:
|
|
cur.execute(
|
|
"""SELECT unexpanded_srcmd5 from revisions WHERE
|
|
id=(SELECT linked_id FROM linked_revs WHERE revision_id=%s)""",
|
|
(rev.dbid,),
|
|
)
|
|
linked_rev = cur.fetchone()
|
|
if linked_rev:
|
|
linked_rev = linked_rev[0]
|
|
obs_dir_list = self.obs.list(
|
|
rev.project, rev.package, rev.unexpanded_srcmd5, linked_rev
|
|
)
|
|
if obs_dir_list:
|
|
rev.import_dir_list(obs_dir_list)
|
|
md5 = rev.calculate_files_hash()
|
|
with self.db.cursor() as cur:
|
|
cur.execute(
|
|
"UPDATE revisions SET files_hash=%s WHERE id=%s",
|
|
(md5, rev.dbid),
|
|
)
|
|
else:
|
|
rev.set_broken()
|
|
|
|
def fill_file_lists(self):
|
|
self.find_linked_revs()
|
|
|
|
self.find_fake_revisions()
|
|
for package in self.packages:
|
|
for rev in self.revisions_without_files(package):
|
|
print(f"rev {rev} is without files")
|
|
self.import_rev(rev)
|
|
|
|
def refresh_package(self, project, package):
|
|
key = f"{project}/{package}"
|
|
if key in self.refreshed_packages:
|
|
# refreshing once is good enough
|
|
return
|
|
if self.package_gone(key):
|
|
return
|
|
logging.debug(f"Refresh {project}/{package}")
|
|
self.refreshed_packages.add(key)
|
|
self.update_db_package(project, package)
|
|
self.fetch_all_linked_packages(project, package)
|
|
|
|
def import_into_db(self):
|
|
for package in self.packages:
|
|
refresh_package(self, self.project, package)
|
|
|
|
self.db.conn.commit()
|
|
|
|
for number in DBRevision.requests_to_fetch(self.db):
|
|
self.import_request(number)
|
|
|
|
self.db.conn.commit()
|
|
|
|
with self.db.cursor() as cur:
|
|
cur.execute(
|
|
"""SELECT DISTINCT source_project,source_package FROM requests
|
|
WHERE id IN (SELECT request_id FROM revisions WHERE project=%s and package = ANY(%s));""",
|
|
(self.project, self.packages),
|
|
)
|
|
for project, package in cur.fetchall():
|
|
self.refresh_package(project, package)
|
|
|
|
self.db.conn.commit()
|
|
|
|
missing_users = User.missing_users(self.db)
|
|
for userid in missing_users:
|
|
missing_user = self.obs.user(userid)
|
|
if missing_user:
|
|
missing_user.import_into_db(self.db)
|
|
self.db.conn.commit()
|
|
|
|
self.fill_file_lists()
|
|
self.db.conn.commit()
|
|
|
|
def package_gone(self, key):
|
|
if not self.gone_packages_set:
|
|
self.gone_packages_set = set()
|
|
with open(pathlib.Path(__file__).parent.parent / "gone-packages.txt") as f:
|
|
for line in f.readlines():
|
|
self.gone_packages_set.add(line.strip())
|
|
return key in self.gone_packages_set
|