Run many packages in parallel to avoid overhead and make use of CPUS #16

Merged
Ghost merged 5 commits from parallize_packages into main 2022-11-04 10:04:15 +01:00
7 changed files with 150 additions and 150 deletions

View File

@ -1,7 +1,7 @@
all: all:
isort *.py lib/*py tests/*py isort *.py lib/*py tests/*py
autoflake -r --in-place --remove-unused-variables . autoflake --in-place --remove-unused-variables *.py lib/*py tests/*py
black . black *.py lib/*py tests/*py
test: test:
python3 -m unittest -v tests/*.py python3 -m unittest -v tests/*.py

View File

@ -1,6 +1,7 @@
#!/usr/bin/python3 #!/usr/bin/python3
import argparse import argparse
import concurrent.futures
import logging import logging
import pathlib import pathlib
import sys import sys
@ -42,13 +43,20 @@ PROJECTS = [
] ]
def export_package(package, repodir, cachedir, gc):
exporter = GitExporter(URL_OBS, "openSUSE:Factory", package, repodir, cachedir)
exporter.set_gc_interval(gc)
exporter.export_as_git()
def main(): def main():
parser = argparse.ArgumentParser(description="OBS history importer into git") parser = argparse.ArgumentParser(description="OBS history importer into git")
parser.add_argument("package", help="OBS package name") parser.add_argument("packages", help="OBS package names", nargs="*")
parser.add_argument( parser.add_argument(
"-r", "-r",
"--repodir", "--repodir",
required=False, required=False,
default=pathlib.Path("repos"),
type=pathlib.Path, type=pathlib.Path,
help="Local git repository directory", help="Local git repository directory",
) )
@ -94,22 +102,25 @@ def main():
requests_log.propagate = True requests_log.propagate = True
if args.export: if args.export:
TestExporter(args.package).run() if len(args.packages) != 1:
aplanas marked this conversation as resolved Outdated

You mean > 1?

You mean `> 1`?
Outdated
Review

Actually I meant != 1 :)

Actually I meant != 1 :)
print("Can only export one package")
sys.exit(1)
TestExporter(args.packages[0]).run()
return return
if not args.repodir:
args.repodir = pathlib.Path("repos") / args.package
if not args.cachedir: if not args.cachedir:
args.cachedir = pathlib.Path("~/.cache/git-import/").expanduser() args.cachedir = pathlib.Path("~/.cache/git-import/").expanduser()
importer = Importer(URL_OBS, "openSUSE:Factory", args.package) importer = Importer(URL_OBS, "openSUSE:Factory", args.packages)
importer.import_into_db() importer.import_into_db()
exporter = GitExporter( with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor:
aplanas marked this conversation as resolved Outdated

Making max_workers to None will create one process per CPU. That is OK in my desktop, but laptop report like 32 of them, and I am sure this will kill OBS. Maybe create a parameter with some low default?

Making `max_workers` to `None` will create one process per CPU. That is OK in my desktop, but laptop report like 32 of them, and I am sure this will kill OBS. Maybe create a parameter with some low default?
Outdated
Review

I guess I can just hard code to 8. Would that work for you?

I guess I can just hard code to 8. Would that work for you?

seems safer

seems safer
URL_OBS, "openSUSE:Factory", args.package, args.repodir, args.cachedir fs = [
) executor.submit(
exporter.set_gc_interval(args.gc) export_package, package, args.repodir, args.cachedir, args.gc
exporter.export_as_git() )
for package in args.packages
]
concurrent.futures.wait(fs)
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -111,13 +111,18 @@ class DBRevision:
return DBRevision(db, row) return DBRevision(db, row)
@staticmethod @staticmethod
def latest_revision(db, project, package): def max_rev(db, project, package):
with db.cursor() as cur: with db.cursor() as cur:
cur.execute( cur.execute(
"SELECT MAX(rev) FROM revisions where project=%s and package=%s", "SELECT MAX(rev) FROM revisions where project=%s and package=%s",
(project, package), (project, package),
) )
max = cur.fetchone()[0] return cur.fetchone()[0]
return None
@staticmethod
def latest_revision(db, project, package):
max = DBRevision.max_rev(db, project, package)
if max: if max:
return DBRevision.fetch_revision(db, project, package, max) return DBRevision.fetch_revision(db, project, package, max)
return None return None

View File

@ -109,69 +109,6 @@ class Git:
"HEAD", author, committer, message, tree, parents "HEAD", author, committer, message, tree, parents
) )
def merge(
self,
user,
user_email,
user_time,
message,
commit,
committer=None,
committer_email=None,
committer_time=None,
clean_on_conflict=True,
merged=False,
allow_empty=False,
):
new_branch = False
if not merged:
try:
self.repo.merge(commit)
except KeyError:
# If it is the first commit, we will have a missing
# "HEAD", but the files will be there. We can proceed
# to the commit directly.
new_branch = True
if not merged and self.repo.index.conflicts:
for conflict in self.repo.index.conflicts:
conflict = [c for c in conflict if c]
if conflict:
logging.info(f"CONFLICT {conflict[0].path}")
if clean_on_conflict:
self.clean()
# Now I miss Rust enums
return "CONFLICT"
# Some merges are empty in OBS (no changes, not sure
# why), for now we signal them
if not allow_empty and not self.is_dirty():
# I really really do miss Rust enums
return "EMPTY"
if new_branch:
parents = [commit]
else:
parents = [
self.repo.head.target,
commit,
]
commit = self.commit(
user,
user_email,
user_time,
message,
parents,
committer,
committer_email,
committer_time,
allow_empty=allow_empty,
)
return commit
def merge_abort(self): def merge_abort(self):
self.repo.state_cleanup() self.repo.state_cleanup()
@ -188,7 +125,7 @@ class Git:
self.repo.references["refs/heads/" + branch].set_target(commit) self.repo.references["refs/heads/" + branch].set_target(commit)
def gc(self): def gc(self):
logging.info(f"Garbage recollect and repackage {self.path}") logging.debug(f"Garbage recollect and repackage {self.path}")
subprocess.run( subprocess.run(
["git", "gc", "--auto"], ["git", "gc", "--auto"],
cwd=self.path, cwd=self.path,

View File

@ -20,7 +20,7 @@ class GitExporter:
self.obs.change_url(api_url) self.obs.change_url(api_url)
self.proxy_sha256 = ProxySHA256(self.obs, enabled=True) self.proxy_sha256 = ProxySHA256(self.obs, enabled=True)
self.git = Git( self.git = Git(
repodir, repodir / package,
committer="Git OBS Bridge", committer="Git OBS Bridge",
committer_email="obsbridge@suse.de", committer_email="obsbridge@suse.de",
).create() ).create()
@ -60,6 +60,7 @@ class GitExporter:
gc_cnt = self.gc_interval gc_cnt = self.gc_interval
if len(left_to_commit) > 0: if len(left_to_commit) > 0:
logging.info(f"Commiting into {self.git.path}")
self.git.gc() self.git.gc()
for flat in left_to_commit: for flat in left_to_commit:
gc_cnt -= 1 gc_cnt -= 1

View File

@ -1,3 +1,4 @@
import concurrent.futures
import logging import logging
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
@ -8,26 +9,42 @@ from lib.obs_revision import OBSRevision
from lib.user import User from lib.user import User
def refresh_package(importer, project, package):
importer.refresh_package(project, package)
def import_request(importer, number):
importer.import_request(number)
def import_rev(importer, rev):
importer.import_rev(rev)
class Importer: class Importer:
def __init__(self, api_url, project, package): def __init__(self, api_url, project, packages):
# Import a Factory package into the database # Import multiple Factory packages into the database
self.package = package self.packages = packages
self.project = project self.project = project
self.db = DB()
self.obs = OBS() self.obs = OBS()
assert project == "openSUSE:Factory" assert project == "openSUSE:Factory"
self.obs.change_url(api_url) self.obs.change_url(api_url)
self.refreshed_packages = set() self.refreshed_packages = set()
def update_db_package(self, db, project, package): def import_request(self, number):
self.obs.request(number).import_into_db(self.db)
def update_db_package(self, project, package):
root = self.obs._history(project, package) root = self.obs._history(project, package)
if root is None: if root is None:
return return
latest = DBRevision.latest_revision(db, project, package) latest = DBRevision.max_rev(self.db, project, package)
for r in root.findall("revision"): for r in root.findall("revision"):
rev = OBSRevision(self.obs, project, package).parse(r) rev = OBSRevision(self.obs, project, package).parse(r)
if not latest or rev.rev > latest.rev: if not latest or rev.rev > latest:
dbrev = DBRevision.import_obs_rev(db, rev) dbrev = DBRevision.import_obs_rev(self.db, rev)
try: try:
root = rev.read_link() root = rev.read_link()
except ET.ParseError: except ET.ParseError:
@ -38,15 +55,15 @@ class Importer:
tpkg = root.get("package") or package tpkg = root.get("package") or package
dbrev.links_to(tprj, tpkg) dbrev.links_to(tprj, tpkg)
def find_linked_revs(self, db): def find_linked_revs(self):
with db.cursor() as cur: with self.db.cursor() as cur:
cur.execute( cur.execute(
"""SELECT * from revisions WHERE id in (SELECT l.revision_id FROM links l """SELECT * from revisions WHERE id in (SELECT l.revision_id FROM links l
LEFT JOIN linked_revs lrevs ON lrevs.revision_id=l.revision_id LEFT JOIN linked_revs lrevs ON lrevs.revision_id=l.revision_id
WHERE lrevs.id IS NULL) and broken is FALSE;""" WHERE lrevs.id IS NULL) and broken is FALSE;"""
) )
for row in cur.fetchall(): for row in cur.fetchall():
rev = DBRevision(db, row) rev = DBRevision(self.db, row)
linked_rev = rev.linked_rev() linked_rev = rev.linked_rev()
if not linked_rev: if not linked_rev:
logging.debug(f"No link {rev}") logging.debug(f"No link {rev}")
@ -57,8 +74,8 @@ class Importer:
(rev.dbid, linked_rev.dbid), (rev.dbid, linked_rev.dbid),
) )
def fetch_all_linked_packages(self, db, project, package): def fetch_all_linked_packages(self, project, package):
with db.cursor() as cur: with self.db.cursor() as cur:
cur.execute( cur.execute(
"""SELECT DISTINCT l.project, l.package from links l JOIN revisions r """SELECT DISTINCT l.project, l.package from links l JOIN revisions r
on r.id=l.revision_id WHERE r.project=%s AND r.package=%s""", on r.id=l.revision_id WHERE r.project=%s AND r.package=%s""",
@ -67,26 +84,26 @@ class Importer:
for row in cur.fetchall(): for row in cur.fetchall():
(lproject, lpackage) = row (lproject, lpackage) = row
# recurse # recurse
self.refresh_package(db, lproject, lpackage) self.refresh_package(lproject, lpackage)
def find_fake_revisions(self, db): def find_fake_revisions(self):
with db.cursor() as cur: with self.db.cursor() as cur:
cur.execute( cur.execute(
"SELECT * from revisions WHERE id in (SELECT linked_id from linked_revs WHERE considered=FALSE)" "SELECT * from revisions WHERE id in (SELECT linked_id from linked_revs WHERE considered=FALSE)"
) )
for row in cur.fetchall(): for row in cur.fetchall():
self._find_fake_revision(db, DBRevision(db, row)) self._find_fake_revision(DBRevision(self.db, row))
def _find_fake_revision(self, db, rev): def _find_fake_revision(self, rev):
prev = rev.previous_commit() prev = rev.previous_commit()
if not prev: if not prev:
with db.cursor() as cur: with self.db.cursor() as cur:
cur.execute( cur.execute(
"UPDATE linked_revs SET considered=TRUE where linked_id=%s", "UPDATE linked_revs SET considered=TRUE where linked_id=%s",
(rev.dbid,), (rev.dbid,),
) )
return return
with db.cursor() as cur: with self.db.cursor() as cur:
cur.execute( cur.execute(
"""SELECT * FROM revisions WHERE id IN """SELECT * FROM revisions WHERE id IN
(SELECT revision_id from linked_revs WHERE linked_id=%s) (SELECT revision_id from linked_revs WHERE linked_id=%s)
@ -95,7 +112,7 @@ class Importer:
) )
last_linked = None last_linked = None
for linked in cur.fetchall(): for linked in cur.fetchall():
linked = DBRevision(db, linked) linked = DBRevision(self.db, linked)
nextrev = linked.next_commit() nextrev = linked.next_commit()
if nextrev and nextrev.commit_time < rev.commit_time: if nextrev and nextrev.commit_time < rev.commit_time:
continue continue
@ -107,7 +124,7 @@ class Importer:
if not last_linked: if not last_linked:
return return
with db.cursor() as cur: with self.db.cursor() as cur:
linked = last_linked linked = last_linked
cur.execute( cur.execute(
"SELECT 1 FROM fake_revs where revision_id=%s AND linked_id=%s", "SELECT 1 FROM fake_revs where revision_id=%s AND linked_id=%s",
@ -144,70 +161,96 @@ class Importer:
(rev.dbid, linked.dbid), (rev.dbid, linked.dbid),
) )
def revisions_without_files(self, db): def revisions_without_files(self):
with db.cursor() as cur: with self.db.cursor() as cur:
cur.execute( cur.execute(
"SELECT * FROM revisions WHERE broken=FALSE AND expanded_srcmd5 IS NULL" "SELECT * FROM revisions WHERE broken=FALSE AND expanded_srcmd5 IS NULL"
) )
return [DBRevision(db, row) for row in cur.fetchall()] return [DBRevision(self.db, row) for row in cur.fetchall()]
def fill_file_lists(self, db): def import_rev(self, rev):
self.find_linked_revs(db) with self.db.cursor() as cur:
cur.execute(
self.find_fake_revisions(db) """SELECT unexpanded_srcmd5 from revisions WHERE
for rev in self.revisions_without_files(db): id=(SELECT linked_id FROM linked_revs WHERE revision_id=%s)""",
with db.cursor() as cur: (rev.dbid,),
cur.execute(
"""SELECT unexpanded_srcmd5 from revisions WHERE
id=(SELECT linked_id FROM linked_revs WHERE revision_id=%s)""",
(rev.dbid,),
)
linked_rev = cur.fetchone()
if linked_rev:
linked_rev = linked_rev[0]
list = self.obs.list(
rev.project, rev.package, rev.unexpanded_srcmd5, linked_rev
) )
if list: linked_rev = cur.fetchone()
rev.import_dir_list(list) if linked_rev:
md5 = rev.calculate_files_hash() linked_rev = linked_rev[0]
with db.cursor() as cur: list = self.obs.list(
cur.execute( rev.project, rev.package, rev.unexpanded_srcmd5, linked_rev
"UPDATE revisions SET files_hash=%s WHERE id=%s", )
(md5, rev.dbid), if list:
) rev.import_dir_list(list)
else: md5 = rev.calculate_files_hash()
rev.set_broken() with self.db.cursor() as cur:
cur.execute(
"UPDATE revisions SET files_hash=%s WHERE id=%s",
(md5, rev.dbid),
)
else:
rev.set_broken()
def refresh_package(self, db, project, package): def fill_file_lists(self):
self.find_linked_revs()
self.find_fake_revisions()
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
fs = [
executor.submit(import_rev, self, rev)
for rev in self.revisions_without_files()
]
concurrent.futures.wait(fs)
def refresh_package(self, project, package):
key = f"{project}/{package}" key = f"{project}/{package}"
if key in self.refreshed_packages: if key in self.refreshed_packages:
# refreshing once is good enough # refreshing once is good enough
return return
logging.info(f"Refresh {project}/{package}")
self.refreshed_packages.add(key) self.refreshed_packages.add(key)
self.update_db_package(db, project, package) self.update_db_package(project, package)
self.fetch_all_linked_packages(db, project, package) self.fetch_all_linked_packages(project, package)
def import_into_db(self): def import_into_db(self):
db = DB()
self.refresh_package(db, self.project, self.package) with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
for number in DBRevision.requests_to_fetch(db): fs = [
self.obs.request(number).import_into_db(db) executor.submit(refresh_package, self, self.project, package)
with db.cursor() as cur: for package in self.packages
cur.execute( ]
"""SELECT DISTINCT source_project,source_package FROM requests concurrent.futures.wait(fs)
WHERE id IN (SELECT request_id FROM revisions WHERE project=%s and package=%s);""",
(self.project, self.package),
)
for project, package in cur.fetchall():
self.refresh_package(db, project, package)
missing_users = User.missing_users(db) self.db.conn.commit()
aplanas marked this conversation as resolved
Review

Why is this required?

Why is this required?
Review

It's just to avoid wasting too much time when the script exists in the followup step.

It's just to avoid wasting too much time when the script exists in the followup step.
fs = [
executor.submit(import_request, self, number)
for number in DBRevision.requests_to_fetch(self.db)
]
concurrent.futures.wait(fs)
self.db.conn.commit()
aplanas marked this conversation as resolved
Review

Same? Is there any transaction open?

Same? Is there any transaction open?
with self.db.cursor() as cur:
cur.execute(
"""SELECT DISTINCT source_project,source_package FROM requests
WHERE id IN (SELECT request_id FROM revisions WHERE project=%s and package = ANY(%s));""",
(self.project, self.packages),
)
fs = [
executor.submit(refresh_package, self, project, package)
for project, package in cur.fetchall()
]
concurrent.futures.wait(fs)
self.db.conn.commit()
aplanas marked this conversation as resolved
Review

Is not closing the cursor with doing the commit?

Is not closing the cursor `with` doing the commit?
Review

No, only closing the database connection does. Everything until then stays in a transaction.

No, only closing the database connection does. Everything until then stays in a transaction.
missing_users = User.missing_users(self.db)
for userid in missing_users: for userid in missing_users:
missing_user = self.obs.user(userid) missing_user = self.obs.user(userid)
if missing_user: if missing_user:
missing_user.import_into_db(db) missing_user.import_into_db(self.db)
self.db.conn.commit()
self.fill_file_lists(db) self.fill_file_lists()
db.conn.commit() self.db.conn.commit()

View File

@ -41,7 +41,9 @@ class ProxySHA256:
self.hashes = dict() self.hashes = dict()
return return
logging.debug("Retrieve all previously defined SHA256") logging.debug("Retrieve all previously defined SHA256")
response = requests.get(f"http://source.dyn.cloud.suse.de/package/{package}") response = requests.get(
f"http://source.dyn.cloud.suse.de/package/{package}", timeout=5
)
if response.status_code == 200: if response.status_code == 200:
json = response.json() json = response.json()
self.hashes = json["shas"] self.hashes = json["shas"]
@ -67,6 +69,7 @@ class ProxySHA256:
"url": url, "url": url,
"package": package, "package": package,
}, },
timeout=10,
) )
if response.status_code != 200: if response.status_code != 200:
raise Exception(f"Redirector error on {self.url} for {url}") raise Exception(f"Redirector error on {self.url} for {url}")