Run many packages in parallel to avoid overhead and make use of CPUS #16
4
Makefile
4
Makefile
@ -1,7 +1,7 @@
|
||||
all:
|
||||
isort *.py lib/*py tests/*py
|
||||
autoflake -r --in-place --remove-unused-variables .
|
||||
black .
|
||||
autoflake --in-place --remove-unused-variables *.py lib/*py tests/*py
|
||||
black *.py lib/*py tests/*py
|
||||
|
||||
test:
|
||||
python3 -m unittest -v tests/*.py
|
||||
|
@ -1,6 +1,7 @@
|
||||
#!/usr/bin/python3
|
||||
|
||||
import argparse
|
||||
import concurrent.futures
|
||||
import logging
|
||||
import pathlib
|
||||
import sys
|
||||
@ -42,13 +43,20 @@ PROJECTS = [
|
||||
]
|
||||
|
||||
|
||||
def export_package(package, repodir, cachedir, gc):
|
||||
exporter = GitExporter(URL_OBS, "openSUSE:Factory", package, repodir, cachedir)
|
||||
exporter.set_gc_interval(gc)
|
||||
exporter.export_as_git()
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="OBS history importer into git")
|
||||
parser.add_argument("package", help="OBS package name")
|
||||
parser.add_argument("packages", help="OBS package names", nargs="*")
|
||||
parser.add_argument(
|
||||
"-r",
|
||||
"--repodir",
|
||||
required=False,
|
||||
default=pathlib.Path("repos"),
|
||||
type=pathlib.Path,
|
||||
help="Local git repository directory",
|
||||
)
|
||||
@ -94,22 +102,25 @@ def main():
|
||||
requests_log.propagate = True
|
||||
|
||||
if args.export:
|
||||
TestExporter(args.package).run()
|
||||
if len(args.packages) != 1:
|
||||
aplanas marked this conversation as resolved
Outdated
|
||||
print("Can only export one package")
|
||||
sys.exit(1)
|
||||
TestExporter(args.packages[0]).run()
|
||||
return
|
||||
|
||||
if not args.repodir:
|
||||
args.repodir = pathlib.Path("repos") / args.package
|
||||
|
||||
if not args.cachedir:
|
||||
args.cachedir = pathlib.Path("~/.cache/git-import/").expanduser()
|
||||
|
||||
importer = Importer(URL_OBS, "openSUSE:Factory", args.package)
|
||||
importer = Importer(URL_OBS, "openSUSE:Factory", args.packages)
|
||||
importer.import_into_db()
|
||||
exporter = GitExporter(
|
||||
URL_OBS, "openSUSE:Factory", args.package, args.repodir, args.cachedir
|
||||
)
|
||||
exporter.set_gc_interval(args.gc)
|
||||
exporter.export_as_git()
|
||||
with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor:
|
||||
aplanas marked this conversation as resolved
Outdated
aplanas
commented
Making Making `max_workers` to `None` will create one process per CPU. That is OK in my desktop, but laptop report like 32 of them, and I am sure this will kill OBS. Maybe create a parameter with some low default?
Ghost
commented
I guess I can just hard code to 8. Would that work for you? I guess I can just hard code to 8. Would that work for you?
aplanas
commented
seems safer seems safer
|
||||
fs = [
|
||||
executor.submit(
|
||||
export_package, package, args.repodir, args.cachedir, args.gc
|
||||
)
|
||||
for package in args.packages
|
||||
]
|
||||
concurrent.futures.wait(fs)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
@ -111,13 +111,18 @@ class DBRevision:
|
||||
return DBRevision(db, row)
|
||||
|
||||
@staticmethod
|
||||
def latest_revision(db, project, package):
|
||||
def max_rev(db, project, package):
|
||||
with db.cursor() as cur:
|
||||
cur.execute(
|
||||
"SELECT MAX(rev) FROM revisions where project=%s and package=%s",
|
||||
(project, package),
|
||||
)
|
||||
max = cur.fetchone()[0]
|
||||
return cur.fetchone()[0]
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def latest_revision(db, project, package):
|
||||
max = DBRevision.max_rev(db, project, package)
|
||||
if max:
|
||||
return DBRevision.fetch_revision(db, project, package, max)
|
||||
return None
|
||||
|
65
lib/git.py
65
lib/git.py
@ -109,69 +109,6 @@ class Git:
|
||||
"HEAD", author, committer, message, tree, parents
|
||||
)
|
||||
|
||||
def merge(
|
||||
self,
|
||||
user,
|
||||
user_email,
|
||||
user_time,
|
||||
message,
|
||||
commit,
|
||||
committer=None,
|
||||
committer_email=None,
|
||||
committer_time=None,
|
||||
clean_on_conflict=True,
|
||||
merged=False,
|
||||
allow_empty=False,
|
||||
):
|
||||
new_branch = False
|
||||
|
||||
if not merged:
|
||||
try:
|
||||
self.repo.merge(commit)
|
||||
except KeyError:
|
||||
# If it is the first commit, we will have a missing
|
||||
# "HEAD", but the files will be there. We can proceed
|
||||
# to the commit directly.
|
||||
new_branch = True
|
||||
|
||||
if not merged and self.repo.index.conflicts:
|
||||
for conflict in self.repo.index.conflicts:
|
||||
conflict = [c for c in conflict if c]
|
||||
if conflict:
|
||||
logging.info(f"CONFLICT {conflict[0].path}")
|
||||
|
||||
if clean_on_conflict:
|
||||
self.clean()
|
||||
# Now I miss Rust enums
|
||||
return "CONFLICT"
|
||||
|
||||
# Some merges are empty in OBS (no changes, not sure
|
||||
# why), for now we signal them
|
||||
if not allow_empty and not self.is_dirty():
|
||||
# I really really do miss Rust enums
|
||||
return "EMPTY"
|
||||
|
||||
if new_branch:
|
||||
parents = [commit]
|
||||
else:
|
||||
parents = [
|
||||
self.repo.head.target,
|
||||
commit,
|
||||
]
|
||||
commit = self.commit(
|
||||
user,
|
||||
user_email,
|
||||
user_time,
|
||||
message,
|
||||
parents,
|
||||
committer,
|
||||
committer_email,
|
||||
committer_time,
|
||||
allow_empty=allow_empty,
|
||||
)
|
||||
|
||||
return commit
|
||||
|
||||
def merge_abort(self):
|
||||
self.repo.state_cleanup()
|
||||
|
||||
@ -188,7 +125,7 @@ class Git:
|
||||
self.repo.references["refs/heads/" + branch].set_target(commit)
|
||||
|
||||
def gc(self):
|
||||
logging.info(f"Garbage recollect and repackage {self.path}")
|
||||
logging.debug(f"Garbage recollect and repackage {self.path}")
|
||||
subprocess.run(
|
||||
["git", "gc", "--auto"],
|
||||
cwd=self.path,
|
||||
|
@ -20,7 +20,7 @@ class GitExporter:
|
||||
self.obs.change_url(api_url)
|
||||
self.proxy_sha256 = ProxySHA256(self.obs, enabled=True)
|
||||
self.git = Git(
|
||||
repodir,
|
||||
repodir / package,
|
||||
committer="Git OBS Bridge",
|
||||
committer_email="obsbridge@suse.de",
|
||||
).create()
|
||||
@ -60,6 +60,7 @@ class GitExporter:
|
||||
|
||||
gc_cnt = self.gc_interval
|
||||
if len(left_to_commit) > 0:
|
||||
logging.info(f"Commiting into {self.git.path}")
|
||||
self.git.gc()
|
||||
for flat in left_to_commit:
|
||||
gc_cnt -= 1
|
||||
|
181
lib/importer.py
181
lib/importer.py
@ -1,3 +1,4 @@
|
||||
import concurrent.futures
|
||||
import logging
|
||||
import xml.etree.ElementTree as ET
|
||||
|
||||
@ -8,26 +9,42 @@ from lib.obs_revision import OBSRevision
|
||||
from lib.user import User
|
||||
|
||||
|
||||
def refresh_package(importer, project, package):
|
||||
importer.refresh_package(project, package)
|
||||
|
||||
|
||||
def import_request(importer, number):
|
||||
importer.import_request(number)
|
||||
|
||||
|
||||
def import_rev(importer, rev):
|
||||
importer.import_rev(rev)
|
||||
|
||||
|
||||
class Importer:
|
||||
def __init__(self, api_url, project, package):
|
||||
# Import a Factory package into the database
|
||||
self.package = package
|
||||
def __init__(self, api_url, project, packages):
|
||||
# Import multiple Factory packages into the database
|
||||
self.packages = packages
|
||||
self.project = project
|
||||
|
||||
self.db = DB()
|
||||
self.obs = OBS()
|
||||
assert project == "openSUSE:Factory"
|
||||
self.obs.change_url(api_url)
|
||||
self.refreshed_packages = set()
|
||||
|
||||
def update_db_package(self, db, project, package):
|
||||
def import_request(self, number):
|
||||
self.obs.request(number).import_into_db(self.db)
|
||||
|
||||
def update_db_package(self, project, package):
|
||||
root = self.obs._history(project, package)
|
||||
if root is None:
|
||||
return
|
||||
latest = DBRevision.latest_revision(db, project, package)
|
||||
latest = DBRevision.max_rev(self.db, project, package)
|
||||
for r in root.findall("revision"):
|
||||
rev = OBSRevision(self.obs, project, package).parse(r)
|
||||
if not latest or rev.rev > latest.rev:
|
||||
dbrev = DBRevision.import_obs_rev(db, rev)
|
||||
if not latest or rev.rev > latest:
|
||||
dbrev = DBRevision.import_obs_rev(self.db, rev)
|
||||
try:
|
||||
root = rev.read_link()
|
||||
except ET.ParseError:
|
||||
@ -38,15 +55,15 @@ class Importer:
|
||||
tpkg = root.get("package") or package
|
||||
dbrev.links_to(tprj, tpkg)
|
||||
|
||||
def find_linked_revs(self, db):
|
||||
with db.cursor() as cur:
|
||||
def find_linked_revs(self):
|
||||
with self.db.cursor() as cur:
|
||||
cur.execute(
|
||||
"""SELECT * from revisions WHERE id in (SELECT l.revision_id FROM links l
|
||||
LEFT JOIN linked_revs lrevs ON lrevs.revision_id=l.revision_id
|
||||
WHERE lrevs.id IS NULL) and broken is FALSE;"""
|
||||
)
|
||||
for row in cur.fetchall():
|
||||
rev = DBRevision(db, row)
|
||||
rev = DBRevision(self.db, row)
|
||||
linked_rev = rev.linked_rev()
|
||||
if not linked_rev:
|
||||
logging.debug(f"No link {rev}")
|
||||
@ -57,8 +74,8 @@ class Importer:
|
||||
(rev.dbid, linked_rev.dbid),
|
||||
)
|
||||
|
||||
def fetch_all_linked_packages(self, db, project, package):
|
||||
with db.cursor() as cur:
|
||||
def fetch_all_linked_packages(self, project, package):
|
||||
with self.db.cursor() as cur:
|
||||
cur.execute(
|
||||
"""SELECT DISTINCT l.project, l.package from links l JOIN revisions r
|
||||
on r.id=l.revision_id WHERE r.project=%s AND r.package=%s""",
|
||||
@ -67,26 +84,26 @@ class Importer:
|
||||
for row in cur.fetchall():
|
||||
(lproject, lpackage) = row
|
||||
# recurse
|
||||
self.refresh_package(db, lproject, lpackage)
|
||||
self.refresh_package(lproject, lpackage)
|
||||
|
||||
def find_fake_revisions(self, db):
|
||||
with db.cursor() as cur:
|
||||
def find_fake_revisions(self):
|
||||
with self.db.cursor() as cur:
|
||||
cur.execute(
|
||||
"SELECT * from revisions WHERE id in (SELECT linked_id from linked_revs WHERE considered=FALSE)"
|
||||
)
|
||||
for row in cur.fetchall():
|
||||
self._find_fake_revision(db, DBRevision(db, row))
|
||||
self._find_fake_revision(DBRevision(self.db, row))
|
||||
|
||||
def _find_fake_revision(self, db, rev):
|
||||
def _find_fake_revision(self, rev):
|
||||
prev = rev.previous_commit()
|
||||
if not prev:
|
||||
with db.cursor() as cur:
|
||||
with self.db.cursor() as cur:
|
||||
cur.execute(
|
||||
"UPDATE linked_revs SET considered=TRUE where linked_id=%s",
|
||||
(rev.dbid,),
|
||||
)
|
||||
return
|
||||
with db.cursor() as cur:
|
||||
with self.db.cursor() as cur:
|
||||
cur.execute(
|
||||
"""SELECT * FROM revisions WHERE id IN
|
||||
(SELECT revision_id from linked_revs WHERE linked_id=%s)
|
||||
@ -95,7 +112,7 @@ class Importer:
|
||||
)
|
||||
last_linked = None
|
||||
for linked in cur.fetchall():
|
||||
linked = DBRevision(db, linked)
|
||||
linked = DBRevision(self.db, linked)
|
||||
nextrev = linked.next_commit()
|
||||
if nextrev and nextrev.commit_time < rev.commit_time:
|
||||
continue
|
||||
@ -107,7 +124,7 @@ class Importer:
|
||||
if not last_linked:
|
||||
return
|
||||
|
||||
with db.cursor() as cur:
|
||||
with self.db.cursor() as cur:
|
||||
linked = last_linked
|
||||
cur.execute(
|
||||
"SELECT 1 FROM fake_revs where revision_id=%s AND linked_id=%s",
|
||||
@ -144,70 +161,96 @@ class Importer:
|
||||
(rev.dbid, linked.dbid),
|
||||
)
|
||||
|
||||
def revisions_without_files(self, db):
|
||||
with db.cursor() as cur:
|
||||
def revisions_without_files(self):
|
||||
with self.db.cursor() as cur:
|
||||
cur.execute(
|
||||
"SELECT * FROM revisions WHERE broken=FALSE AND expanded_srcmd5 IS NULL"
|
||||
)
|
||||
return [DBRevision(db, row) for row in cur.fetchall()]
|
||||
return [DBRevision(self.db, row) for row in cur.fetchall()]
|
||||
|
||||
def fill_file_lists(self, db):
|
||||
self.find_linked_revs(db)
|
||||
|
||||
self.find_fake_revisions(db)
|
||||
for rev in self.revisions_without_files(db):
|
||||
with db.cursor() as cur:
|
||||
cur.execute(
|
||||
"""SELECT unexpanded_srcmd5 from revisions WHERE
|
||||
id=(SELECT linked_id FROM linked_revs WHERE revision_id=%s)""",
|
||||
(rev.dbid,),
|
||||
)
|
||||
linked_rev = cur.fetchone()
|
||||
if linked_rev:
|
||||
linked_rev = linked_rev[0]
|
||||
list = self.obs.list(
|
||||
rev.project, rev.package, rev.unexpanded_srcmd5, linked_rev
|
||||
def import_rev(self, rev):
|
||||
with self.db.cursor() as cur:
|
||||
cur.execute(
|
||||
"""SELECT unexpanded_srcmd5 from revisions WHERE
|
||||
id=(SELECT linked_id FROM linked_revs WHERE revision_id=%s)""",
|
||||
(rev.dbid,),
|
||||
)
|
||||
if list:
|
||||
rev.import_dir_list(list)
|
||||
md5 = rev.calculate_files_hash()
|
||||
with db.cursor() as cur:
|
||||
cur.execute(
|
||||
"UPDATE revisions SET files_hash=%s WHERE id=%s",
|
||||
(md5, rev.dbid),
|
||||
)
|
||||
else:
|
||||
rev.set_broken()
|
||||
linked_rev = cur.fetchone()
|
||||
if linked_rev:
|
||||
linked_rev = linked_rev[0]
|
||||
list = self.obs.list(
|
||||
rev.project, rev.package, rev.unexpanded_srcmd5, linked_rev
|
||||
)
|
||||
if list:
|
||||
rev.import_dir_list(list)
|
||||
md5 = rev.calculate_files_hash()
|
||||
with self.db.cursor() as cur:
|
||||
cur.execute(
|
||||
"UPDATE revisions SET files_hash=%s WHERE id=%s",
|
||||
(md5, rev.dbid),
|
||||
)
|
||||
else:
|
||||
rev.set_broken()
|
||||
|
||||
def refresh_package(self, db, project, package):
|
||||
def fill_file_lists(self):
|
||||
self.find_linked_revs()
|
||||
|
||||
self.find_fake_revisions()
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
|
||||
fs = [
|
||||
executor.submit(import_rev, self, rev)
|
||||
for rev in self.revisions_without_files()
|
||||
]
|
||||
concurrent.futures.wait(fs)
|
||||
|
||||
def refresh_package(self, project, package):
|
||||
key = f"{project}/{package}"
|
||||
if key in self.refreshed_packages:
|
||||
# refreshing once is good enough
|
||||
return
|
||||
logging.info(f"Refresh {project}/{package}")
|
||||
self.refreshed_packages.add(key)
|
||||
self.update_db_package(db, project, package)
|
||||
self.fetch_all_linked_packages(db, project, package)
|
||||
self.update_db_package(project, package)
|
||||
self.fetch_all_linked_packages(project, package)
|
||||
|
||||
def import_into_db(self):
|
||||
db = DB()
|
||||
|
||||
self.refresh_package(db, self.project, self.package)
|
||||
for number in DBRevision.requests_to_fetch(db):
|
||||
self.obs.request(number).import_into_db(db)
|
||||
with db.cursor() as cur:
|
||||
cur.execute(
|
||||
"""SELECT DISTINCT source_project,source_package FROM requests
|
||||
WHERE id IN (SELECT request_id FROM revisions WHERE project=%s and package=%s);""",
|
||||
(self.project, self.package),
|
||||
)
|
||||
for project, package in cur.fetchall():
|
||||
self.refresh_package(db, project, package)
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
|
||||
fs = [
|
||||
executor.submit(refresh_package, self, self.project, package)
|
||||
for package in self.packages
|
||||
]
|
||||
concurrent.futures.wait(fs)
|
||||
|
||||
missing_users = User.missing_users(db)
|
||||
self.db.conn.commit()
|
||||
aplanas marked this conversation as resolved
aplanas
commented
Why is this required? Why is this required?
Ghost
commented
It's just to avoid wasting too much time when the script exists in the followup step. It's just to avoid wasting too much time when the script exists in the followup step.
|
||||
|
||||
fs = [
|
||||
executor.submit(import_request, self, number)
|
||||
for number in DBRevision.requests_to_fetch(self.db)
|
||||
]
|
||||
concurrent.futures.wait(fs)
|
||||
|
||||
self.db.conn.commit()
|
||||
aplanas marked this conversation as resolved
aplanas
commented
Same? Is there any transaction open? Same? Is there any transaction open?
|
||||
|
||||
with self.db.cursor() as cur:
|
||||
cur.execute(
|
||||
"""SELECT DISTINCT source_project,source_package FROM requests
|
||||
WHERE id IN (SELECT request_id FROM revisions WHERE project=%s and package = ANY(%s));""",
|
||||
(self.project, self.packages),
|
||||
)
|
||||
fs = [
|
||||
executor.submit(refresh_package, self, project, package)
|
||||
for project, package in cur.fetchall()
|
||||
]
|
||||
concurrent.futures.wait(fs)
|
||||
self.db.conn.commit()
|
||||
aplanas marked this conversation as resolved
aplanas
commented
Is not closing the cursor Is not closing the cursor `with` doing the commit?
Ghost
commented
No, only closing the database connection does. Everything until then stays in a transaction. No, only closing the database connection does. Everything until then stays in a transaction.
|
||||
|
||||
missing_users = User.missing_users(self.db)
|
||||
for userid in missing_users:
|
||||
missing_user = self.obs.user(userid)
|
||||
if missing_user:
|
||||
missing_user.import_into_db(db)
|
||||
missing_user.import_into_db(self.db)
|
||||
self.db.conn.commit()
|
||||
|
||||
self.fill_file_lists(db)
|
||||
db.conn.commit()
|
||||
self.fill_file_lists()
|
||||
self.db.conn.commit()
|
||||
|
@ -41,7 +41,9 @@ class ProxySHA256:
|
||||
self.hashes = dict()
|
||||
return
|
||||
logging.debug("Retrieve all previously defined SHA256")
|
||||
response = requests.get(f"http://source.dyn.cloud.suse.de/package/{package}")
|
||||
response = requests.get(
|
||||
f"http://source.dyn.cloud.suse.de/package/{package}", timeout=5
|
||||
)
|
||||
if response.status_code == 200:
|
||||
json = response.json()
|
||||
self.hashes = json["shas"]
|
||||
@ -67,6 +69,7 @@ class ProxySHA256:
|
||||
"url": url,
|
||||
"package": package,
|
||||
},
|
||||
timeout=10,
|
||||
)
|
||||
if response.status_code != 200:
|
||||
raise Exception(f"Redirector error on {self.url} for {url}")
|
||||
|
Loading…
Reference in New Issue
Block a user
You mean
> 1
?Actually I meant != 1 :)