Avoid multi-threading races on import

There seems to be races when using db cursors from multiple threads. as
found by import issues after switching to a newer computer that has
performance and energy efficient cores.

As this is not particularly performance critical, convert to single
threaded use which makes it work again
This commit is contained in:
Dirk Müller 2023-11-28 23:36:44 +01:00
parent 4353f015c8
commit 56cbe0a125
No known key found for this signature in database

View File

@ -1,4 +1,3 @@
import concurrent.futures
import logging import logging
import pathlib import pathlib
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
@ -163,10 +162,12 @@ class Importer:
(rev.dbid, linked.dbid), (rev.dbid, linked.dbid),
) )
def revisions_without_files(self): def revisions_without_files(self, package):
logging.debug(f"revisions_without_files({package})")
with self.db.cursor() as cur: with self.db.cursor() as cur:
cur.execute( cur.execute(
"SELECT * FROM revisions WHERE broken=FALSE AND expanded_srcmd5 IS NULL" "SELECT * FROM revisions WHERE package=%s AND broken=FALSE AND expanded_srcmd5 IS NULL",
(package, )
) )
return [DBRevision(self.db, row) for row in cur.fetchall()] return [DBRevision(self.db, row) for row in cur.fetchall()]
@ -180,11 +181,11 @@ class Importer:
linked_rev = cur.fetchone() linked_rev = cur.fetchone()
if linked_rev: if linked_rev:
linked_rev = linked_rev[0] linked_rev = linked_rev[0]
list = self.obs.list( obs_dir_list = self.obs.list(
rev.project, rev.package, rev.unexpanded_srcmd5, linked_rev rev.project, rev.package, rev.unexpanded_srcmd5, linked_rev
) )
if list: if obs_dir_list:
rev.import_dir_list(list) rev.import_dir_list(obs_dir_list)
md5 = rev.calculate_files_hash() md5 = rev.calculate_files_hash()
with self.db.cursor() as cur: with self.db.cursor() as cur:
cur.execute( cur.execute(
@ -198,12 +199,10 @@ class Importer:
self.find_linked_revs() self.find_linked_revs()
self.find_fake_revisions() self.find_fake_revisions()
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: for package in self.packages:
fs = [ for rev in self.revisions_without_files(package):
executor.submit(import_rev, self, rev) print(f"rev {rev} is without files")
for rev in self.revisions_without_files() self.import_rev(rev)
]
concurrent.futures.wait(fs)
def refresh_package(self, project, package): def refresh_package(self, project, package):
key = f"{project}/{package}" key = f"{project}/{package}"
@ -218,17 +217,13 @@ class Importer:
self.fetch_all_linked_packages(project, package) self.fetch_all_linked_packages(project, package)
def import_into_db(self): def import_into_db(self):
for package in self.packages:
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor: refresh_package(self, self.project, package)
for package in self.packages: refresh_package(self, self.project, package)
self.db.conn.commit() self.db.conn.commit()
fs = [ for number in DBRevision.requests_to_fetch(self.db):
executor.submit(import_request, self, number) self.import_request(number)
for number in DBRevision.requests_to_fetch(self.db)
]
concurrent.futures.wait(fs)
self.db.conn.commit() self.db.conn.commit()
@ -238,11 +233,9 @@ class Importer:
WHERE id IN (SELECT request_id FROM revisions WHERE project=%s and package = ANY(%s));""", WHERE id IN (SELECT request_id FROM revisions WHERE project=%s and package = ANY(%s));""",
(self.project, self.packages), (self.project, self.packages),
) )
fs = [ for project, package in cur.fetchall():
executor.submit(refresh_package, self, project, package) self.refresh_package(project, package)
for project, package in cur.fetchall()
]
concurrent.futures.wait(fs)
self.db.conn.commit() self.db.conn.commit()
missing_users = User.missing_users(self.db) missing_users = User.missing_users(self.db)