Merge pull request 'Run many packages in parallel to avoid overhead and make use of CPUS' (#16) from parallize_packages into main

Reviewed-on: https://gitea.opensuse.org/importers/git-importer/pulls/16
This commit is contained in:
coolo 2022-11-04 10:04:15 +01:00
commit 4cc0a23d4e
7 changed files with 150 additions and 150 deletions

View File

@ -1,7 +1,7 @@
all: all:
isort *.py lib/*py tests/*py isort *.py lib/*py tests/*py
autoflake -r --in-place --remove-unused-variables . autoflake --in-place --remove-unused-variables *.py lib/*py tests/*py
black . black *.py lib/*py tests/*py
test: test:
python3 -m unittest -v tests/*.py python3 -m unittest -v tests/*.py

View File

@ -1,6 +1,7 @@
#!/usr/bin/python3 #!/usr/bin/python3
import argparse import argparse
import concurrent.futures
import logging import logging
import pathlib import pathlib
import sys import sys
@ -42,13 +43,20 @@ PROJECTS = [
] ]
def export_package(package, repodir, cachedir, gc):
exporter = GitExporter(URL_OBS, "openSUSE:Factory", package, repodir, cachedir)
exporter.set_gc_interval(gc)
exporter.export_as_git()
def main(): def main():
parser = argparse.ArgumentParser(description="OBS history importer into git") parser = argparse.ArgumentParser(description="OBS history importer into git")
parser.add_argument("package", help="OBS package name") parser.add_argument("packages", help="OBS package names", nargs="*")
parser.add_argument( parser.add_argument(
"-r", "-r",
"--repodir", "--repodir",
required=False, required=False,
default=pathlib.Path("repos"),
type=pathlib.Path, type=pathlib.Path,
help="Local git repository directory", help="Local git repository directory",
) )
@ -94,22 +102,25 @@ def main():
requests_log.propagate = True requests_log.propagate = True
if args.export: if args.export:
TestExporter(args.package).run() if len(args.packages) != 1:
print("Can only export one package")
sys.exit(1)
TestExporter(args.packages[0]).run()
return return
if not args.repodir:
args.repodir = pathlib.Path("repos") / args.package
if not args.cachedir: if not args.cachedir:
args.cachedir = pathlib.Path("~/.cache/git-import/").expanduser() args.cachedir = pathlib.Path("~/.cache/git-import/").expanduser()
importer = Importer(URL_OBS, "openSUSE:Factory", args.package) importer = Importer(URL_OBS, "openSUSE:Factory", args.packages)
importer.import_into_db() importer.import_into_db()
exporter = GitExporter( with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor:
URL_OBS, "openSUSE:Factory", args.package, args.repodir, args.cachedir fs = [
) executor.submit(
exporter.set_gc_interval(args.gc) export_package, package, args.repodir, args.cachedir, args.gc
exporter.export_as_git() )
for package in args.packages
]
concurrent.futures.wait(fs)
if __name__ == "__main__": if __name__ == "__main__":

View File

@ -111,13 +111,18 @@ class DBRevision:
return DBRevision(db, row) return DBRevision(db, row)
@staticmethod @staticmethod
def latest_revision(db, project, package): def max_rev(db, project, package):
with db.cursor() as cur: with db.cursor() as cur:
cur.execute( cur.execute(
"SELECT MAX(rev) FROM revisions where project=%s and package=%s", "SELECT MAX(rev) FROM revisions where project=%s and package=%s",
(project, package), (project, package),
) )
max = cur.fetchone()[0] return cur.fetchone()[0]
return None
@staticmethod
def latest_revision(db, project, package):
max = DBRevision.max_rev(db, project, package)
if max: if max:
return DBRevision.fetch_revision(db, project, package, max) return DBRevision.fetch_revision(db, project, package, max)
return None return None

View File

@ -109,69 +109,6 @@ class Git:
"HEAD", author, committer, message, tree, parents "HEAD", author, committer, message, tree, parents
) )
def merge(
self,
user,
user_email,
user_time,
message,
commit,
committer=None,
committer_email=None,
committer_time=None,
clean_on_conflict=True,
merged=False,
allow_empty=False,
):
new_branch = False
if not merged:
try:
self.repo.merge(commit)
except KeyError:
# If it is the first commit, we will have a missing
# "HEAD", but the files will be there. We can proceed
# to the commit directly.
new_branch = True
if not merged and self.repo.index.conflicts:
for conflict in self.repo.index.conflicts:
conflict = [c for c in conflict if c]
if conflict:
logging.info(f"CONFLICT {conflict[0].path}")
if clean_on_conflict:
self.clean()
# Now I miss Rust enums
return "CONFLICT"
# Some merges are empty in OBS (no changes, not sure
# why), for now we signal them
if not allow_empty and not self.is_dirty():
# I really really do miss Rust enums
return "EMPTY"
if new_branch:
parents = [commit]
else:
parents = [
self.repo.head.target,
commit,
]
commit = self.commit(
user,
user_email,
user_time,
message,
parents,
committer,
committer_email,
committer_time,
allow_empty=allow_empty,
)
return commit
def merge_abort(self): def merge_abort(self):
self.repo.state_cleanup() self.repo.state_cleanup()
@ -188,7 +125,7 @@ class Git:
self.repo.references["refs/heads/" + branch].set_target(commit) self.repo.references["refs/heads/" + branch].set_target(commit)
def gc(self): def gc(self):
logging.info(f"Garbage recollect and repackage {self.path}") logging.debug(f"Garbage recollect and repackage {self.path}")
subprocess.run( subprocess.run(
["git", "gc", "--auto"], ["git", "gc", "--auto"],
cwd=self.path, cwd=self.path,

View File

@ -20,7 +20,7 @@ class GitExporter:
self.obs.change_url(api_url) self.obs.change_url(api_url)
self.proxy_sha256 = ProxySHA256(self.obs, enabled=True) self.proxy_sha256 = ProxySHA256(self.obs, enabled=True)
self.git = Git( self.git = Git(
repodir, repodir / package,
committer="Git OBS Bridge", committer="Git OBS Bridge",
committer_email="obsbridge@suse.de", committer_email="obsbridge@suse.de",
).create() ).create()
@ -60,6 +60,7 @@ class GitExporter:
gc_cnt = self.gc_interval gc_cnt = self.gc_interval
if len(left_to_commit) > 0: if len(left_to_commit) > 0:
logging.info(f"Commiting into {self.git.path}")
self.git.gc() self.git.gc()
for flat in left_to_commit: for flat in left_to_commit:
gc_cnt -= 1 gc_cnt -= 1

View File

@ -1,3 +1,4 @@
import concurrent.futures
import logging import logging
import xml.etree.ElementTree as ET import xml.etree.ElementTree as ET
@ -8,26 +9,42 @@ from lib.obs_revision import OBSRevision
from lib.user import User from lib.user import User
def refresh_package(importer, project, package):
importer.refresh_package(project, package)
def import_request(importer, number):
importer.import_request(number)
def import_rev(importer, rev):
importer.import_rev(rev)
class Importer: class Importer:
def __init__(self, api_url, project, package): def __init__(self, api_url, project, packages):
# Import a Factory package into the database # Import multiple Factory packages into the database
self.package = package self.packages = packages
self.project = project self.project = project
self.db = DB()
self.obs = OBS() self.obs = OBS()
assert project == "openSUSE:Factory" assert project == "openSUSE:Factory"
self.obs.change_url(api_url) self.obs.change_url(api_url)
self.refreshed_packages = set() self.refreshed_packages = set()
def update_db_package(self, db, project, package): def import_request(self, number):
self.obs.request(number).import_into_db(self.db)
def update_db_package(self, project, package):
root = self.obs._history(project, package) root = self.obs._history(project, package)
if root is None: if root is None:
return return
latest = DBRevision.latest_revision(db, project, package) latest = DBRevision.max_rev(self.db, project, package)
for r in root.findall("revision"): for r in root.findall("revision"):
rev = OBSRevision(self.obs, project, package).parse(r) rev = OBSRevision(self.obs, project, package).parse(r)
if not latest or rev.rev > latest.rev: if not latest or rev.rev > latest:
dbrev = DBRevision.import_obs_rev(db, rev) dbrev = DBRevision.import_obs_rev(self.db, rev)
try: try:
root = rev.read_link() root = rev.read_link()
except ET.ParseError: except ET.ParseError:
@ -38,15 +55,15 @@ class Importer:
tpkg = root.get("package") or package tpkg = root.get("package") or package
dbrev.links_to(tprj, tpkg) dbrev.links_to(tprj, tpkg)
def find_linked_revs(self, db): def find_linked_revs(self):
with db.cursor() as cur: with self.db.cursor() as cur:
cur.execute( cur.execute(
"""SELECT * from revisions WHERE id in (SELECT l.revision_id FROM links l """SELECT * from revisions WHERE id in (SELECT l.revision_id FROM links l
LEFT JOIN linked_revs lrevs ON lrevs.revision_id=l.revision_id LEFT JOIN linked_revs lrevs ON lrevs.revision_id=l.revision_id
WHERE lrevs.id IS NULL) and broken is FALSE;""" WHERE lrevs.id IS NULL) and broken is FALSE;"""
) )
for row in cur.fetchall(): for row in cur.fetchall():
rev = DBRevision(db, row) rev = DBRevision(self.db, row)
linked_rev = rev.linked_rev() linked_rev = rev.linked_rev()
if not linked_rev: if not linked_rev:
logging.debug(f"No link {rev}") logging.debug(f"No link {rev}")
@ -57,8 +74,8 @@ class Importer:
(rev.dbid, linked_rev.dbid), (rev.dbid, linked_rev.dbid),
) )
def fetch_all_linked_packages(self, db, project, package): def fetch_all_linked_packages(self, project, package):
with db.cursor() as cur: with self.db.cursor() as cur:
cur.execute( cur.execute(
"""SELECT DISTINCT l.project, l.package from links l JOIN revisions r """SELECT DISTINCT l.project, l.package from links l JOIN revisions r
on r.id=l.revision_id WHERE r.project=%s AND r.package=%s""", on r.id=l.revision_id WHERE r.project=%s AND r.package=%s""",
@ -67,26 +84,26 @@ class Importer:
for row in cur.fetchall(): for row in cur.fetchall():
(lproject, lpackage) = row (lproject, lpackage) = row
# recurse # recurse
self.refresh_package(db, lproject, lpackage) self.refresh_package(lproject, lpackage)
def find_fake_revisions(self, db): def find_fake_revisions(self):
with db.cursor() as cur: with self.db.cursor() as cur:
cur.execute( cur.execute(
"SELECT * from revisions WHERE id in (SELECT linked_id from linked_revs WHERE considered=FALSE)" "SELECT * from revisions WHERE id in (SELECT linked_id from linked_revs WHERE considered=FALSE)"
) )
for row in cur.fetchall(): for row in cur.fetchall():
self._find_fake_revision(db, DBRevision(db, row)) self._find_fake_revision(DBRevision(self.db, row))
def _find_fake_revision(self, db, rev): def _find_fake_revision(self, rev):
prev = rev.previous_commit() prev = rev.previous_commit()
if not prev: if not prev:
with db.cursor() as cur: with self.db.cursor() as cur:
cur.execute( cur.execute(
"UPDATE linked_revs SET considered=TRUE where linked_id=%s", "UPDATE linked_revs SET considered=TRUE where linked_id=%s",
(rev.dbid,), (rev.dbid,),
) )
return return
with db.cursor() as cur: with self.db.cursor() as cur:
cur.execute( cur.execute(
"""SELECT * FROM revisions WHERE id IN """SELECT * FROM revisions WHERE id IN
(SELECT revision_id from linked_revs WHERE linked_id=%s) (SELECT revision_id from linked_revs WHERE linked_id=%s)
@ -95,7 +112,7 @@ class Importer:
) )
last_linked = None last_linked = None
for linked in cur.fetchall(): for linked in cur.fetchall():
linked = DBRevision(db, linked) linked = DBRevision(self.db, linked)
nextrev = linked.next_commit() nextrev = linked.next_commit()
if nextrev and nextrev.commit_time < rev.commit_time: if nextrev and nextrev.commit_time < rev.commit_time:
continue continue
@ -107,7 +124,7 @@ class Importer:
if not last_linked: if not last_linked:
return return
with db.cursor() as cur: with self.db.cursor() as cur:
linked = last_linked linked = last_linked
cur.execute( cur.execute(
"SELECT 1 FROM fake_revs where revision_id=%s AND linked_id=%s", "SELECT 1 FROM fake_revs where revision_id=%s AND linked_id=%s",
@ -144,70 +161,96 @@ class Importer:
(rev.dbid, linked.dbid), (rev.dbid, linked.dbid),
) )
def revisions_without_files(self, db): def revisions_without_files(self):
with db.cursor() as cur: with self.db.cursor() as cur:
cur.execute( cur.execute(
"SELECT * FROM revisions WHERE broken=FALSE AND expanded_srcmd5 IS NULL" "SELECT * FROM revisions WHERE broken=FALSE AND expanded_srcmd5 IS NULL"
) )
return [DBRevision(db, row) for row in cur.fetchall()] return [DBRevision(self.db, row) for row in cur.fetchall()]
def fill_file_lists(self, db): def import_rev(self, rev):
self.find_linked_revs(db) with self.db.cursor() as cur:
cur.execute(
self.find_fake_revisions(db) """SELECT unexpanded_srcmd5 from revisions WHERE
for rev in self.revisions_without_files(db): id=(SELECT linked_id FROM linked_revs WHERE revision_id=%s)""",
with db.cursor() as cur: (rev.dbid,),
cur.execute(
"""SELECT unexpanded_srcmd5 from revisions WHERE
id=(SELECT linked_id FROM linked_revs WHERE revision_id=%s)""",
(rev.dbid,),
)
linked_rev = cur.fetchone()
if linked_rev:
linked_rev = linked_rev[0]
list = self.obs.list(
rev.project, rev.package, rev.unexpanded_srcmd5, linked_rev
) )
if list: linked_rev = cur.fetchone()
rev.import_dir_list(list) if linked_rev:
md5 = rev.calculate_files_hash() linked_rev = linked_rev[0]
with db.cursor() as cur: list = self.obs.list(
cur.execute( rev.project, rev.package, rev.unexpanded_srcmd5, linked_rev
"UPDATE revisions SET files_hash=%s WHERE id=%s", )
(md5, rev.dbid), if list:
) rev.import_dir_list(list)
else: md5 = rev.calculate_files_hash()
rev.set_broken() with self.db.cursor() as cur:
cur.execute(
"UPDATE revisions SET files_hash=%s WHERE id=%s",
(md5, rev.dbid),
)
else:
rev.set_broken()
def refresh_package(self, db, project, package): def fill_file_lists(self):
self.find_linked_revs()
self.find_fake_revisions()
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
fs = [
executor.submit(import_rev, self, rev)
for rev in self.revisions_without_files()
]
concurrent.futures.wait(fs)
def refresh_package(self, project, package):
key = f"{project}/{package}" key = f"{project}/{package}"
if key in self.refreshed_packages: if key in self.refreshed_packages:
# refreshing once is good enough # refreshing once is good enough
return return
logging.info(f"Refresh {project}/{package}")
self.refreshed_packages.add(key) self.refreshed_packages.add(key)
self.update_db_package(db, project, package) self.update_db_package(project, package)
self.fetch_all_linked_packages(db, project, package) self.fetch_all_linked_packages(project, package)
def import_into_db(self): def import_into_db(self):
db = DB()
self.refresh_package(db, self.project, self.package) with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
for number in DBRevision.requests_to_fetch(db): fs = [
self.obs.request(number).import_into_db(db) executor.submit(refresh_package, self, self.project, package)
with db.cursor() as cur: for package in self.packages
cur.execute( ]
"""SELECT DISTINCT source_project,source_package FROM requests concurrent.futures.wait(fs)
WHERE id IN (SELECT request_id FROM revisions WHERE project=%s and package=%s);""",
(self.project, self.package),
)
for project, package in cur.fetchall():
self.refresh_package(db, project, package)
missing_users = User.missing_users(db) self.db.conn.commit()
fs = [
executor.submit(import_request, self, number)
for number in DBRevision.requests_to_fetch(self.db)
]
concurrent.futures.wait(fs)
self.db.conn.commit()
with self.db.cursor() as cur:
cur.execute(
"""SELECT DISTINCT source_project,source_package FROM requests
WHERE id IN (SELECT request_id FROM revisions WHERE project=%s and package = ANY(%s));""",
(self.project, self.packages),
)
fs = [
executor.submit(refresh_package, self, project, package)
for project, package in cur.fetchall()
]
concurrent.futures.wait(fs)
self.db.conn.commit()
missing_users = User.missing_users(self.db)
for userid in missing_users: for userid in missing_users:
missing_user = self.obs.user(userid) missing_user = self.obs.user(userid)
if missing_user: if missing_user:
missing_user.import_into_db(db) missing_user.import_into_db(self.db)
self.db.conn.commit()
self.fill_file_lists(db) self.fill_file_lists()
db.conn.commit() self.db.conn.commit()

View File

@ -41,7 +41,9 @@ class ProxySHA256:
self.hashes = dict() self.hashes = dict()
return return
logging.debug("Retrieve all previously defined SHA256") logging.debug("Retrieve all previously defined SHA256")
response = requests.get(f"http://source.dyn.cloud.suse.de/package/{package}") response = requests.get(
f"http://source.dyn.cloud.suse.de/package/{package}", timeout=5
)
if response.status_code == 200: if response.status_code == 200:
json = response.json() json = response.json()
self.hashes = json["shas"] self.hashes = json["shas"]
@ -67,6 +69,7 @@ class ProxySHA256:
"url": url, "url": url,
"package": package, "package": package,
}, },
timeout=10,
) )
if response.status_code != 200: if response.status_code != 200:
raise Exception(f"Redirector error on {self.url} for {url}") raise Exception(f"Redirector error on {self.url} for {url}")