Run many packages in parallel to avoid overhead and make use of CPUS #16

Merged
Ghost merged 5 commits from parallize_packages into main 2022-11-04 10:04:15 +01:00
7 changed files with 150 additions and 150 deletions

View File

@ -1,7 +1,7 @@
all:
isort *.py lib/*py tests/*py
autoflake -r --in-place --remove-unused-variables .
black .
autoflake --in-place --remove-unused-variables *.py lib/*py tests/*py
black *.py lib/*py tests/*py
test:
python3 -m unittest -v tests/*.py

View File

@ -1,6 +1,7 @@
#!/usr/bin/python3
import argparse
import concurrent.futures
import logging
import pathlib
import sys
@ -42,13 +43,20 @@ PROJECTS = [
]
def export_package(package, repodir, cachedir, gc):
exporter = GitExporter(URL_OBS, "openSUSE:Factory", package, repodir, cachedir)
exporter.set_gc_interval(gc)
exporter.export_as_git()
def main():
parser = argparse.ArgumentParser(description="OBS history importer into git")
parser.add_argument("package", help="OBS package name")
parser.add_argument("packages", help="OBS package names", nargs="*")
parser.add_argument(
"-r",
"--repodir",
required=False,
default=pathlib.Path("repos"),
type=pathlib.Path,
help="Local git repository directory",
)
@ -94,22 +102,25 @@ def main():
requests_log.propagate = True
if args.export:
TestExporter(args.package).run()
if len(args.packages) != 1:
print("Can only export one package")
sys.exit(1)
TestExporter(args.packages[0]).run()
return
if not args.repodir:
args.repodir = pathlib.Path("repos") / args.package
if not args.cachedir:
args.cachedir = pathlib.Path("~/.cache/git-import/").expanduser()
importer = Importer(URL_OBS, "openSUSE:Factory", args.package)
importer = Importer(URL_OBS, "openSUSE:Factory", args.packages)
importer.import_into_db()
exporter = GitExporter(
URL_OBS, "openSUSE:Factory", args.package, args.repodir, args.cachedir
)
exporter.set_gc_interval(args.gc)
exporter.export_as_git()
with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor:
fs = [
executor.submit(
export_package, package, args.repodir, args.cachedir, args.gc
)
for package in args.packages
]
concurrent.futures.wait(fs)
if __name__ == "__main__":

View File

@ -111,13 +111,18 @@ class DBRevision:
return DBRevision(db, row)
@staticmethod
def latest_revision(db, project, package):
def max_rev(db, project, package):
with db.cursor() as cur:
cur.execute(
"SELECT MAX(rev) FROM revisions where project=%s and package=%s",
(project, package),
)
max = cur.fetchone()[0]
return cur.fetchone()[0]
return None
@staticmethod
def latest_revision(db, project, package):
max = DBRevision.max_rev(db, project, package)
if max:
return DBRevision.fetch_revision(db, project, package, max)
return None

View File

@ -109,69 +109,6 @@ class Git:
"HEAD", author, committer, message, tree, parents
)
def merge(
self,
user,
user_email,
user_time,
message,
commit,
committer=None,
committer_email=None,
committer_time=None,
clean_on_conflict=True,
merged=False,
allow_empty=False,
):
new_branch = False
if not merged:
try:
self.repo.merge(commit)
except KeyError:
# If it is the first commit, we will have a missing
# "HEAD", but the files will be there. We can proceed
# to the commit directly.
new_branch = True
if not merged and self.repo.index.conflicts:
for conflict in self.repo.index.conflicts:
conflict = [c for c in conflict if c]
if conflict:
logging.info(f"CONFLICT {conflict[0].path}")
if clean_on_conflict:
self.clean()
# Now I miss Rust enums
return "CONFLICT"
# Some merges are empty in OBS (no changes, not sure
# why), for now we signal them
if not allow_empty and not self.is_dirty():
# I really really do miss Rust enums
return "EMPTY"
if new_branch:
parents = [commit]
else:
parents = [
self.repo.head.target,
commit,
]
commit = self.commit(
user,
user_email,
user_time,
message,
parents,
committer,
committer_email,
committer_time,
allow_empty=allow_empty,
)
return commit
def merge_abort(self):
self.repo.state_cleanup()
@ -188,7 +125,7 @@ class Git:
self.repo.references["refs/heads/" + branch].set_target(commit)
def gc(self):
logging.info(f"Garbage recollect and repackage {self.path}")
logging.debug(f"Garbage recollect and repackage {self.path}")
subprocess.run(
["git", "gc", "--auto"],
cwd=self.path,

View File

@ -20,7 +20,7 @@ class GitExporter:
self.obs.change_url(api_url)
self.proxy_sha256 = ProxySHA256(self.obs, enabled=True)
self.git = Git(
repodir,
repodir / package,
committer="Git OBS Bridge",
committer_email="obsbridge@suse.de",
).create()
@ -60,6 +60,7 @@ class GitExporter:
gc_cnt = self.gc_interval
if len(left_to_commit) > 0:
logging.info(f"Commiting into {self.git.path}")
self.git.gc()
for flat in left_to_commit:
gc_cnt -= 1

View File

@ -1,3 +1,4 @@
import concurrent.futures
import logging
import xml.etree.ElementTree as ET
@ -8,26 +9,42 @@ from lib.obs_revision import OBSRevision
from lib.user import User
def refresh_package(importer, project, package):
importer.refresh_package(project, package)
def import_request(importer, number):
importer.import_request(number)
def import_rev(importer, rev):
importer.import_rev(rev)
class Importer:
def __init__(self, api_url, project, package):
# Import a Factory package into the database
self.package = package
def __init__(self, api_url, project, packages):
# Import multiple Factory packages into the database
self.packages = packages
self.project = project
self.db = DB()
self.obs = OBS()
assert project == "openSUSE:Factory"
self.obs.change_url(api_url)
self.refreshed_packages = set()
def update_db_package(self, db, project, package):
def import_request(self, number):
self.obs.request(number).import_into_db(self.db)
def update_db_package(self, project, package):
root = self.obs._history(project, package)
if root is None:
return
latest = DBRevision.latest_revision(db, project, package)
latest = DBRevision.max_rev(self.db, project, package)
for r in root.findall("revision"):
rev = OBSRevision(self.obs, project, package).parse(r)
if not latest or rev.rev > latest.rev:
dbrev = DBRevision.import_obs_rev(db, rev)
if not latest or rev.rev > latest:
dbrev = DBRevision.import_obs_rev(self.db, rev)
try:
root = rev.read_link()
except ET.ParseError:
@ -38,15 +55,15 @@ class Importer:
tpkg = root.get("package") or package
dbrev.links_to(tprj, tpkg)
def find_linked_revs(self, db):
with db.cursor() as cur:
def find_linked_revs(self):
with self.db.cursor() as cur:
cur.execute(
"""SELECT * from revisions WHERE id in (SELECT l.revision_id FROM links l
LEFT JOIN linked_revs lrevs ON lrevs.revision_id=l.revision_id
WHERE lrevs.id IS NULL) and broken is FALSE;"""
)
for row in cur.fetchall():
rev = DBRevision(db, row)
rev = DBRevision(self.db, row)
linked_rev = rev.linked_rev()
if not linked_rev:
logging.debug(f"No link {rev}")
@ -57,8 +74,8 @@ class Importer:
(rev.dbid, linked_rev.dbid),
)
def fetch_all_linked_packages(self, db, project, package):
with db.cursor() as cur:
def fetch_all_linked_packages(self, project, package):
with self.db.cursor() as cur:
cur.execute(
"""SELECT DISTINCT l.project, l.package from links l JOIN revisions r
on r.id=l.revision_id WHERE r.project=%s AND r.package=%s""",
@ -67,26 +84,26 @@ class Importer:
for row in cur.fetchall():
(lproject, lpackage) = row
# recurse
self.refresh_package(db, lproject, lpackage)
self.refresh_package(lproject, lpackage)
def find_fake_revisions(self, db):
with db.cursor() as cur:
def find_fake_revisions(self):
with self.db.cursor() as cur:
cur.execute(
"SELECT * from revisions WHERE id in (SELECT linked_id from linked_revs WHERE considered=FALSE)"
)
for row in cur.fetchall():
self._find_fake_revision(db, DBRevision(db, row))
self._find_fake_revision(DBRevision(self.db, row))
def _find_fake_revision(self, db, rev):
def _find_fake_revision(self, rev):
prev = rev.previous_commit()
if not prev:
with db.cursor() as cur:
with self.db.cursor() as cur:
cur.execute(
"UPDATE linked_revs SET considered=TRUE where linked_id=%s",
(rev.dbid,),
)
return
with db.cursor() as cur:
with self.db.cursor() as cur:
cur.execute(
"""SELECT * FROM revisions WHERE id IN
(SELECT revision_id from linked_revs WHERE linked_id=%s)
@ -95,7 +112,7 @@ class Importer:
)
last_linked = None
for linked in cur.fetchall():
linked = DBRevision(db, linked)
linked = DBRevision(self.db, linked)
nextrev = linked.next_commit()
if nextrev and nextrev.commit_time < rev.commit_time:
continue
@ -107,7 +124,7 @@ class Importer:
if not last_linked:
return
with db.cursor() as cur:
with self.db.cursor() as cur:
linked = last_linked
cur.execute(
"SELECT 1 FROM fake_revs where revision_id=%s AND linked_id=%s",
@ -144,70 +161,96 @@ class Importer:
(rev.dbid, linked.dbid),
)
def revisions_without_files(self, db):
with db.cursor() as cur:
def revisions_without_files(self):
with self.db.cursor() as cur:
cur.execute(
"SELECT * FROM revisions WHERE broken=FALSE AND expanded_srcmd5 IS NULL"
)
return [DBRevision(db, row) for row in cur.fetchall()]
return [DBRevision(self.db, row) for row in cur.fetchall()]
def fill_file_lists(self, db):
self.find_linked_revs(db)
self.find_fake_revisions(db)
for rev in self.revisions_without_files(db):
with db.cursor() as cur:
cur.execute(
"""SELECT unexpanded_srcmd5 from revisions WHERE
id=(SELECT linked_id FROM linked_revs WHERE revision_id=%s)""",
(rev.dbid,),
)
linked_rev = cur.fetchone()
if linked_rev:
linked_rev = linked_rev[0]
list = self.obs.list(
rev.project, rev.package, rev.unexpanded_srcmd5, linked_rev
def import_rev(self, rev):
with self.db.cursor() as cur:
cur.execute(
"""SELECT unexpanded_srcmd5 from revisions WHERE
id=(SELECT linked_id FROM linked_revs WHERE revision_id=%s)""",
(rev.dbid,),
)
if list:
rev.import_dir_list(list)
md5 = rev.calculate_files_hash()
with db.cursor() as cur:
cur.execute(
"UPDATE revisions SET files_hash=%s WHERE id=%s",
(md5, rev.dbid),
)
else:
rev.set_broken()
linked_rev = cur.fetchone()
if linked_rev:
linked_rev = linked_rev[0]
list = self.obs.list(
rev.project, rev.package, rev.unexpanded_srcmd5, linked_rev
)
if list:
rev.import_dir_list(list)
md5 = rev.calculate_files_hash()
with self.db.cursor() as cur:
cur.execute(
"UPDATE revisions SET files_hash=%s WHERE id=%s",
(md5, rev.dbid),
)
else:
rev.set_broken()
def refresh_package(self, db, project, package):
def fill_file_lists(self):
self.find_linked_revs()
self.find_fake_revisions()
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
fs = [
executor.submit(import_rev, self, rev)
for rev in self.revisions_without_files()
]
concurrent.futures.wait(fs)
def refresh_package(self, project, package):
key = f"{project}/{package}"
if key in self.refreshed_packages:
# refreshing once is good enough
return
logging.info(f"Refresh {project}/{package}")
self.refreshed_packages.add(key)
self.update_db_package(db, project, package)
self.fetch_all_linked_packages(db, project, package)
self.update_db_package(project, package)
self.fetch_all_linked_packages(project, package)
def import_into_db(self):
db = DB()
self.refresh_package(db, self.project, self.package)
for number in DBRevision.requests_to_fetch(db):
self.obs.request(number).import_into_db(db)
with db.cursor() as cur:
cur.execute(
"""SELECT DISTINCT source_project,source_package FROM requests
WHERE id IN (SELECT request_id FROM revisions WHERE project=%s and package=%s);""",
(self.project, self.package),
)
for project, package in cur.fetchall():
self.refresh_package(db, project, package)
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
fs = [
executor.submit(refresh_package, self, self.project, package)
for package in self.packages
]
concurrent.futures.wait(fs)
missing_users = User.missing_users(db)
self.db.conn.commit()
aplanas marked this conversation as resolved
Review

Why is this required?

Why is this required?
Review

It's just to avoid wasting too much time when the script exists in the followup step.

It's just to avoid wasting too much time when the script exists in the followup step.
fs = [
executor.submit(import_request, self, number)
for number in DBRevision.requests_to_fetch(self.db)
]
concurrent.futures.wait(fs)
self.db.conn.commit()
aplanas marked this conversation as resolved
Review

Same? Is there any transaction open?

Same? Is there any transaction open?
with self.db.cursor() as cur:
cur.execute(
"""SELECT DISTINCT source_project,source_package FROM requests
WHERE id IN (SELECT request_id FROM revisions WHERE project=%s and package = ANY(%s));""",
(self.project, self.packages),
)
fs = [
executor.submit(refresh_package, self, project, package)
for project, package in cur.fetchall()
]
concurrent.futures.wait(fs)
self.db.conn.commit()
aplanas marked this conversation as resolved
Review

Is not closing the cursor with doing the commit?

Is not closing the cursor `with` doing the commit?
Review

No, only closing the database connection does. Everything until then stays in a transaction.

No, only closing the database connection does. Everything until then stays in a transaction.
missing_users = User.missing_users(self.db)
for userid in missing_users:
missing_user = self.obs.user(userid)
if missing_user:
missing_user.import_into_db(db)
missing_user.import_into_db(self.db)
self.db.conn.commit()
self.fill_file_lists(db)
db.conn.commit()
self.fill_file_lists()
self.db.conn.commit()

View File

@ -41,7 +41,9 @@ class ProxySHA256:
self.hashes = dict()
return
logging.debug("Retrieve all previously defined SHA256")
response = requests.get(f"http://source.dyn.cloud.suse.de/package/{package}")
response = requests.get(
f"http://source.dyn.cloud.suse.de/package/{package}", timeout=5
)
if response.status_code == 200:
json = response.json()
self.hashes = json["shas"]
@ -67,6 +69,7 @@ class ProxySHA256:
"url": url,
"package": package,
},
timeout=10,
)
if response.status_code != 200:
raise Exception(f"Redirector error on {self.url} for {url}")