Run many packages in parallel to avoid overhead and make use of CPUS #16
4
Makefile
4
Makefile
@ -1,7 +1,7 @@
|
|||||||
all:
|
all:
|
||||||
isort *.py lib/*py tests/*py
|
isort *.py lib/*py tests/*py
|
||||||
autoflake -r --in-place --remove-unused-variables .
|
autoflake --in-place --remove-unused-variables *.py lib/*py tests/*py
|
||||||
black .
|
black *.py lib/*py tests/*py
|
||||||
|
|
||||||
test:
|
test:
|
||||||
python3 -m unittest -v tests/*.py
|
python3 -m unittest -v tests/*.py
|
||||||
|
@ -1,6 +1,7 @@
|
|||||||
#!/usr/bin/python3
|
#!/usr/bin/python3
|
||||||
|
|
||||||
import argparse
|
import argparse
|
||||||
|
import concurrent.futures
|
||||||
import logging
|
import logging
|
||||||
import pathlib
|
import pathlib
|
||||||
import sys
|
import sys
|
||||||
@ -42,13 +43,20 @@ PROJECTS = [
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def export_package(package, repodir, cachedir, gc):
|
||||||
|
exporter = GitExporter(URL_OBS, "openSUSE:Factory", package, repodir, cachedir)
|
||||||
|
exporter.set_gc_interval(gc)
|
||||||
|
exporter.export_as_git()
|
||||||
|
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
parser = argparse.ArgumentParser(description="OBS history importer into git")
|
parser = argparse.ArgumentParser(description="OBS history importer into git")
|
||||||
parser.add_argument("package", help="OBS package name")
|
parser.add_argument("packages", help="OBS package names", nargs="*")
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"-r",
|
"-r",
|
||||||
"--repodir",
|
"--repodir",
|
||||||
required=False,
|
required=False,
|
||||||
|
default=pathlib.Path("repos"),
|
||||||
type=pathlib.Path,
|
type=pathlib.Path,
|
||||||
help="Local git repository directory",
|
help="Local git repository directory",
|
||||||
)
|
)
|
||||||
@ -94,22 +102,25 @@ def main():
|
|||||||
requests_log.propagate = True
|
requests_log.propagate = True
|
||||||
|
|
||||||
if args.export:
|
if args.export:
|
||||||
TestExporter(args.package).run()
|
if len(args.packages) != 1:
|
||||||
aplanas marked this conversation as resolved
Outdated
|
|||||||
|
print("Can only export one package")
|
||||||
|
sys.exit(1)
|
||||||
|
TestExporter(args.packages[0]).run()
|
||||||
return
|
return
|
||||||
|
|
||||||
if not args.repodir:
|
|
||||||
args.repodir = pathlib.Path("repos") / args.package
|
|
||||||
|
|
||||||
if not args.cachedir:
|
if not args.cachedir:
|
||||||
args.cachedir = pathlib.Path("~/.cache/git-import/").expanduser()
|
args.cachedir = pathlib.Path("~/.cache/git-import/").expanduser()
|
||||||
|
|
||||||
importer = Importer(URL_OBS, "openSUSE:Factory", args.package)
|
importer = Importer(URL_OBS, "openSUSE:Factory", args.packages)
|
||||||
importer.import_into_db()
|
importer.import_into_db()
|
||||||
exporter = GitExporter(
|
with concurrent.futures.ProcessPoolExecutor(max_workers=8) as executor:
|
||||||
aplanas marked this conversation as resolved
Outdated
aplanas
commented
Making Making `max_workers` to `None` will create one process per CPU. That is OK in my desktop, but laptop report like 32 of them, and I am sure this will kill OBS. Maybe create a parameter with some low default?
Ghost
commented
I guess I can just hard code to 8. Would that work for you? I guess I can just hard code to 8. Would that work for you?
aplanas
commented
seems safer seems safer
|
|||||||
URL_OBS, "openSUSE:Factory", args.package, args.repodir, args.cachedir
|
fs = [
|
||||||
)
|
executor.submit(
|
||||||
exporter.set_gc_interval(args.gc)
|
export_package, package, args.repodir, args.cachedir, args.gc
|
||||||
exporter.export_as_git()
|
)
|
||||||
|
for package in args.packages
|
||||||
|
]
|
||||||
|
concurrent.futures.wait(fs)
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
@ -111,13 +111,18 @@ class DBRevision:
|
|||||||
return DBRevision(db, row)
|
return DBRevision(db, row)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def latest_revision(db, project, package):
|
def max_rev(db, project, package):
|
||||||
with db.cursor() as cur:
|
with db.cursor() as cur:
|
||||||
cur.execute(
|
cur.execute(
|
||||||
"SELECT MAX(rev) FROM revisions where project=%s and package=%s",
|
"SELECT MAX(rev) FROM revisions where project=%s and package=%s",
|
||||||
(project, package),
|
(project, package),
|
||||||
)
|
)
|
||||||
max = cur.fetchone()[0]
|
return cur.fetchone()[0]
|
||||||
|
return None
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def latest_revision(db, project, package):
|
||||||
|
max = DBRevision.max_rev(db, project, package)
|
||||||
if max:
|
if max:
|
||||||
return DBRevision.fetch_revision(db, project, package, max)
|
return DBRevision.fetch_revision(db, project, package, max)
|
||||||
return None
|
return None
|
||||||
|
65
lib/git.py
65
lib/git.py
@ -109,69 +109,6 @@ class Git:
|
|||||||
"HEAD", author, committer, message, tree, parents
|
"HEAD", author, committer, message, tree, parents
|
||||||
)
|
)
|
||||||
|
|
||||||
def merge(
|
|
||||||
self,
|
|
||||||
user,
|
|
||||||
user_email,
|
|
||||||
user_time,
|
|
||||||
message,
|
|
||||||
commit,
|
|
||||||
committer=None,
|
|
||||||
committer_email=None,
|
|
||||||
committer_time=None,
|
|
||||||
clean_on_conflict=True,
|
|
||||||
merged=False,
|
|
||||||
allow_empty=False,
|
|
||||||
):
|
|
||||||
new_branch = False
|
|
||||||
|
|
||||||
if not merged:
|
|
||||||
try:
|
|
||||||
self.repo.merge(commit)
|
|
||||||
except KeyError:
|
|
||||||
# If it is the first commit, we will have a missing
|
|
||||||
# "HEAD", but the files will be there. We can proceed
|
|
||||||
# to the commit directly.
|
|
||||||
new_branch = True
|
|
||||||
|
|
||||||
if not merged and self.repo.index.conflicts:
|
|
||||||
for conflict in self.repo.index.conflicts:
|
|
||||||
conflict = [c for c in conflict if c]
|
|
||||||
if conflict:
|
|
||||||
logging.info(f"CONFLICT {conflict[0].path}")
|
|
||||||
|
|
||||||
if clean_on_conflict:
|
|
||||||
self.clean()
|
|
||||||
# Now I miss Rust enums
|
|
||||||
return "CONFLICT"
|
|
||||||
|
|
||||||
# Some merges are empty in OBS (no changes, not sure
|
|
||||||
# why), for now we signal them
|
|
||||||
if not allow_empty and not self.is_dirty():
|
|
||||||
# I really really do miss Rust enums
|
|
||||||
return "EMPTY"
|
|
||||||
|
|
||||||
if new_branch:
|
|
||||||
parents = [commit]
|
|
||||||
else:
|
|
||||||
parents = [
|
|
||||||
self.repo.head.target,
|
|
||||||
commit,
|
|
||||||
]
|
|
||||||
commit = self.commit(
|
|
||||||
user,
|
|
||||||
user_email,
|
|
||||||
user_time,
|
|
||||||
message,
|
|
||||||
parents,
|
|
||||||
committer,
|
|
||||||
committer_email,
|
|
||||||
committer_time,
|
|
||||||
allow_empty=allow_empty,
|
|
||||||
)
|
|
||||||
|
|
||||||
return commit
|
|
||||||
|
|
||||||
def merge_abort(self):
|
def merge_abort(self):
|
||||||
self.repo.state_cleanup()
|
self.repo.state_cleanup()
|
||||||
|
|
||||||
@ -188,7 +125,7 @@ class Git:
|
|||||||
self.repo.references["refs/heads/" + branch].set_target(commit)
|
self.repo.references["refs/heads/" + branch].set_target(commit)
|
||||||
|
|
||||||
def gc(self):
|
def gc(self):
|
||||||
logging.info(f"Garbage recollect and repackage {self.path}")
|
logging.debug(f"Garbage recollect and repackage {self.path}")
|
||||||
subprocess.run(
|
subprocess.run(
|
||||||
["git", "gc", "--auto"],
|
["git", "gc", "--auto"],
|
||||||
cwd=self.path,
|
cwd=self.path,
|
||||||
|
@ -20,7 +20,7 @@ class GitExporter:
|
|||||||
self.obs.change_url(api_url)
|
self.obs.change_url(api_url)
|
||||||
self.proxy_sha256 = ProxySHA256(self.obs, enabled=True)
|
self.proxy_sha256 = ProxySHA256(self.obs, enabled=True)
|
||||||
self.git = Git(
|
self.git = Git(
|
||||||
repodir,
|
repodir / package,
|
||||||
committer="Git OBS Bridge",
|
committer="Git OBS Bridge",
|
||||||
committer_email="obsbridge@suse.de",
|
committer_email="obsbridge@suse.de",
|
||||||
).create()
|
).create()
|
||||||
@ -60,6 +60,7 @@ class GitExporter:
|
|||||||
|
|
||||||
gc_cnt = self.gc_interval
|
gc_cnt = self.gc_interval
|
||||||
if len(left_to_commit) > 0:
|
if len(left_to_commit) > 0:
|
||||||
|
logging.info(f"Commiting into {self.git.path}")
|
||||||
self.git.gc()
|
self.git.gc()
|
||||||
for flat in left_to_commit:
|
for flat in left_to_commit:
|
||||||
gc_cnt -= 1
|
gc_cnt -= 1
|
||||||
|
181
lib/importer.py
181
lib/importer.py
@ -1,3 +1,4 @@
|
|||||||
|
import concurrent.futures
|
||||||
import logging
|
import logging
|
||||||
import xml.etree.ElementTree as ET
|
import xml.etree.ElementTree as ET
|
||||||
|
|
||||||
@ -8,26 +9,42 @@ from lib.obs_revision import OBSRevision
|
|||||||
from lib.user import User
|
from lib.user import User
|
||||||
|
|
||||||
|
|
||||||
|
def refresh_package(importer, project, package):
|
||||||
|
importer.refresh_package(project, package)
|
||||||
|
|
||||||
|
|
||||||
|
def import_request(importer, number):
|
||||||
|
importer.import_request(number)
|
||||||
|
|
||||||
|
|
||||||
|
def import_rev(importer, rev):
|
||||||
|
importer.import_rev(rev)
|
||||||
|
|
||||||
|
|
||||||
class Importer:
|
class Importer:
|
||||||
def __init__(self, api_url, project, package):
|
def __init__(self, api_url, project, packages):
|
||||||
# Import a Factory package into the database
|
# Import multiple Factory packages into the database
|
||||||
self.package = package
|
self.packages = packages
|
||||||
self.project = project
|
self.project = project
|
||||||
|
|
||||||
|
self.db = DB()
|
||||||
self.obs = OBS()
|
self.obs = OBS()
|
||||||
assert project == "openSUSE:Factory"
|
assert project == "openSUSE:Factory"
|
||||||
self.obs.change_url(api_url)
|
self.obs.change_url(api_url)
|
||||||
self.refreshed_packages = set()
|
self.refreshed_packages = set()
|
||||||
|
|
||||||
def update_db_package(self, db, project, package):
|
def import_request(self, number):
|
||||||
|
self.obs.request(number).import_into_db(self.db)
|
||||||
|
|
||||||
|
def update_db_package(self, project, package):
|
||||||
root = self.obs._history(project, package)
|
root = self.obs._history(project, package)
|
||||||
if root is None:
|
if root is None:
|
||||||
return
|
return
|
||||||
latest = DBRevision.latest_revision(db, project, package)
|
latest = DBRevision.max_rev(self.db, project, package)
|
||||||
for r in root.findall("revision"):
|
for r in root.findall("revision"):
|
||||||
rev = OBSRevision(self.obs, project, package).parse(r)
|
rev = OBSRevision(self.obs, project, package).parse(r)
|
||||||
if not latest or rev.rev > latest.rev:
|
if not latest or rev.rev > latest:
|
||||||
dbrev = DBRevision.import_obs_rev(db, rev)
|
dbrev = DBRevision.import_obs_rev(self.db, rev)
|
||||||
try:
|
try:
|
||||||
root = rev.read_link()
|
root = rev.read_link()
|
||||||
except ET.ParseError:
|
except ET.ParseError:
|
||||||
@ -38,15 +55,15 @@ class Importer:
|
|||||||
tpkg = root.get("package") or package
|
tpkg = root.get("package") or package
|
||||||
dbrev.links_to(tprj, tpkg)
|
dbrev.links_to(tprj, tpkg)
|
||||||
|
|
||||||
def find_linked_revs(self, db):
|
def find_linked_revs(self):
|
||||||
with db.cursor() as cur:
|
with self.db.cursor() as cur:
|
||||||
cur.execute(
|
cur.execute(
|
||||||
"""SELECT * from revisions WHERE id in (SELECT l.revision_id FROM links l
|
"""SELECT * from revisions WHERE id in (SELECT l.revision_id FROM links l
|
||||||
LEFT JOIN linked_revs lrevs ON lrevs.revision_id=l.revision_id
|
LEFT JOIN linked_revs lrevs ON lrevs.revision_id=l.revision_id
|
||||||
WHERE lrevs.id IS NULL) and broken is FALSE;"""
|
WHERE lrevs.id IS NULL) and broken is FALSE;"""
|
||||||
)
|
)
|
||||||
for row in cur.fetchall():
|
for row in cur.fetchall():
|
||||||
rev = DBRevision(db, row)
|
rev = DBRevision(self.db, row)
|
||||||
linked_rev = rev.linked_rev()
|
linked_rev = rev.linked_rev()
|
||||||
if not linked_rev:
|
if not linked_rev:
|
||||||
logging.debug(f"No link {rev}")
|
logging.debug(f"No link {rev}")
|
||||||
@ -57,8 +74,8 @@ class Importer:
|
|||||||
(rev.dbid, linked_rev.dbid),
|
(rev.dbid, linked_rev.dbid),
|
||||||
)
|
)
|
||||||
|
|
||||||
def fetch_all_linked_packages(self, db, project, package):
|
def fetch_all_linked_packages(self, project, package):
|
||||||
with db.cursor() as cur:
|
with self.db.cursor() as cur:
|
||||||
cur.execute(
|
cur.execute(
|
||||||
"""SELECT DISTINCT l.project, l.package from links l JOIN revisions r
|
"""SELECT DISTINCT l.project, l.package from links l JOIN revisions r
|
||||||
on r.id=l.revision_id WHERE r.project=%s AND r.package=%s""",
|
on r.id=l.revision_id WHERE r.project=%s AND r.package=%s""",
|
||||||
@ -67,26 +84,26 @@ class Importer:
|
|||||||
for row in cur.fetchall():
|
for row in cur.fetchall():
|
||||||
(lproject, lpackage) = row
|
(lproject, lpackage) = row
|
||||||
# recurse
|
# recurse
|
||||||
self.refresh_package(db, lproject, lpackage)
|
self.refresh_package(lproject, lpackage)
|
||||||
|
|
||||||
def find_fake_revisions(self, db):
|
def find_fake_revisions(self):
|
||||||
with db.cursor() as cur:
|
with self.db.cursor() as cur:
|
||||||
cur.execute(
|
cur.execute(
|
||||||
"SELECT * from revisions WHERE id in (SELECT linked_id from linked_revs WHERE considered=FALSE)"
|
"SELECT * from revisions WHERE id in (SELECT linked_id from linked_revs WHERE considered=FALSE)"
|
||||||
)
|
)
|
||||||
for row in cur.fetchall():
|
for row in cur.fetchall():
|
||||||
self._find_fake_revision(db, DBRevision(db, row))
|
self._find_fake_revision(DBRevision(self.db, row))
|
||||||
|
|
||||||
def _find_fake_revision(self, db, rev):
|
def _find_fake_revision(self, rev):
|
||||||
prev = rev.previous_commit()
|
prev = rev.previous_commit()
|
||||||
if not prev:
|
if not prev:
|
||||||
with db.cursor() as cur:
|
with self.db.cursor() as cur:
|
||||||
cur.execute(
|
cur.execute(
|
||||||
"UPDATE linked_revs SET considered=TRUE where linked_id=%s",
|
"UPDATE linked_revs SET considered=TRUE where linked_id=%s",
|
||||||
(rev.dbid,),
|
(rev.dbid,),
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
with db.cursor() as cur:
|
with self.db.cursor() as cur:
|
||||||
cur.execute(
|
cur.execute(
|
||||||
"""SELECT * FROM revisions WHERE id IN
|
"""SELECT * FROM revisions WHERE id IN
|
||||||
(SELECT revision_id from linked_revs WHERE linked_id=%s)
|
(SELECT revision_id from linked_revs WHERE linked_id=%s)
|
||||||
@ -95,7 +112,7 @@ class Importer:
|
|||||||
)
|
)
|
||||||
last_linked = None
|
last_linked = None
|
||||||
for linked in cur.fetchall():
|
for linked in cur.fetchall():
|
||||||
linked = DBRevision(db, linked)
|
linked = DBRevision(self.db, linked)
|
||||||
nextrev = linked.next_commit()
|
nextrev = linked.next_commit()
|
||||||
if nextrev and nextrev.commit_time < rev.commit_time:
|
if nextrev and nextrev.commit_time < rev.commit_time:
|
||||||
continue
|
continue
|
||||||
@ -107,7 +124,7 @@ class Importer:
|
|||||||
if not last_linked:
|
if not last_linked:
|
||||||
return
|
return
|
||||||
|
|
||||||
with db.cursor() as cur:
|
with self.db.cursor() as cur:
|
||||||
linked = last_linked
|
linked = last_linked
|
||||||
cur.execute(
|
cur.execute(
|
||||||
"SELECT 1 FROM fake_revs where revision_id=%s AND linked_id=%s",
|
"SELECT 1 FROM fake_revs where revision_id=%s AND linked_id=%s",
|
||||||
@ -144,70 +161,96 @@ class Importer:
|
|||||||
(rev.dbid, linked.dbid),
|
(rev.dbid, linked.dbid),
|
||||||
)
|
)
|
||||||
|
|
||||||
def revisions_without_files(self, db):
|
def revisions_without_files(self):
|
||||||
with db.cursor() as cur:
|
with self.db.cursor() as cur:
|
||||||
cur.execute(
|
cur.execute(
|
||||||
"SELECT * FROM revisions WHERE broken=FALSE AND expanded_srcmd5 IS NULL"
|
"SELECT * FROM revisions WHERE broken=FALSE AND expanded_srcmd5 IS NULL"
|
||||||
)
|
)
|
||||||
return [DBRevision(db, row) for row in cur.fetchall()]
|
return [DBRevision(self.db, row) for row in cur.fetchall()]
|
||||||
|
|
||||||
def fill_file_lists(self, db):
|
def import_rev(self, rev):
|
||||||
self.find_linked_revs(db)
|
with self.db.cursor() as cur:
|
||||||
|
cur.execute(
|
||||||
self.find_fake_revisions(db)
|
"""SELECT unexpanded_srcmd5 from revisions WHERE
|
||||||
for rev in self.revisions_without_files(db):
|
id=(SELECT linked_id FROM linked_revs WHERE revision_id=%s)""",
|
||||||
with db.cursor() as cur:
|
(rev.dbid,),
|
||||||
cur.execute(
|
|
||||||
"""SELECT unexpanded_srcmd5 from revisions WHERE
|
|
||||||
id=(SELECT linked_id FROM linked_revs WHERE revision_id=%s)""",
|
|
||||||
(rev.dbid,),
|
|
||||||
)
|
|
||||||
linked_rev = cur.fetchone()
|
|
||||||
if linked_rev:
|
|
||||||
linked_rev = linked_rev[0]
|
|
||||||
list = self.obs.list(
|
|
||||||
rev.project, rev.package, rev.unexpanded_srcmd5, linked_rev
|
|
||||||
)
|
)
|
||||||
if list:
|
linked_rev = cur.fetchone()
|
||||||
rev.import_dir_list(list)
|
if linked_rev:
|
||||||
md5 = rev.calculate_files_hash()
|
linked_rev = linked_rev[0]
|
||||||
with db.cursor() as cur:
|
list = self.obs.list(
|
||||||
cur.execute(
|
rev.project, rev.package, rev.unexpanded_srcmd5, linked_rev
|
||||||
"UPDATE revisions SET files_hash=%s WHERE id=%s",
|
)
|
||||||
(md5, rev.dbid),
|
if list:
|
||||||
)
|
rev.import_dir_list(list)
|
||||||
else:
|
md5 = rev.calculate_files_hash()
|
||||||
rev.set_broken()
|
with self.db.cursor() as cur:
|
||||||
|
cur.execute(
|
||||||
|
"UPDATE revisions SET files_hash=%s WHERE id=%s",
|
||||||
|
(md5, rev.dbid),
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
rev.set_broken()
|
||||||
|
|
||||||
def refresh_package(self, db, project, package):
|
def fill_file_lists(self):
|
||||||
|
self.find_linked_revs()
|
||||||
|
|
||||||
|
self.find_fake_revisions()
|
||||||
|
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
|
||||||
|
fs = [
|
||||||
|
executor.submit(import_rev, self, rev)
|
||||||
|
for rev in self.revisions_without_files()
|
||||||
|
]
|
||||||
|
concurrent.futures.wait(fs)
|
||||||
|
|
||||||
|
def refresh_package(self, project, package):
|
||||||
key = f"{project}/{package}"
|
key = f"{project}/{package}"
|
||||||
if key in self.refreshed_packages:
|
if key in self.refreshed_packages:
|
||||||
# refreshing once is good enough
|
# refreshing once is good enough
|
||||||
return
|
return
|
||||||
|
logging.info(f"Refresh {project}/{package}")
|
||||||
self.refreshed_packages.add(key)
|
self.refreshed_packages.add(key)
|
||||||
self.update_db_package(db, project, package)
|
self.update_db_package(project, package)
|
||||||
self.fetch_all_linked_packages(db, project, package)
|
self.fetch_all_linked_packages(project, package)
|
||||||
|
|
||||||
def import_into_db(self):
|
def import_into_db(self):
|
||||||
db = DB()
|
|
||||||
|
|
||||||
self.refresh_package(db, self.project, self.package)
|
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
|
||||||
for number in DBRevision.requests_to_fetch(db):
|
fs = [
|
||||||
self.obs.request(number).import_into_db(db)
|
executor.submit(refresh_package, self, self.project, package)
|
||||||
with db.cursor() as cur:
|
for package in self.packages
|
||||||
cur.execute(
|
]
|
||||||
"""SELECT DISTINCT source_project,source_package FROM requests
|
concurrent.futures.wait(fs)
|
||||||
WHERE id IN (SELECT request_id FROM revisions WHERE project=%s and package=%s);""",
|
|
||||||
(self.project, self.package),
|
|
||||||
)
|
|
||||||
for project, package in cur.fetchall():
|
|
||||||
self.refresh_package(db, project, package)
|
|
||||||
|
|
||||||
missing_users = User.missing_users(db)
|
self.db.conn.commit()
|
||||||
aplanas marked this conversation as resolved
aplanas
commented
Why is this required? Why is this required?
Ghost
commented
It's just to avoid wasting too much time when the script exists in the followup step. It's just to avoid wasting too much time when the script exists in the followup step.
|
|||||||
|
|
||||||
|
fs = [
|
||||||
|
executor.submit(import_request, self, number)
|
||||||
|
for number in DBRevision.requests_to_fetch(self.db)
|
||||||
|
]
|
||||||
|
concurrent.futures.wait(fs)
|
||||||
|
|
||||||
|
self.db.conn.commit()
|
||||||
aplanas marked this conversation as resolved
aplanas
commented
Same? Is there any transaction open? Same? Is there any transaction open?
|
|||||||
|
|
||||||
|
with self.db.cursor() as cur:
|
||||||
|
cur.execute(
|
||||||
|
"""SELECT DISTINCT source_project,source_package FROM requests
|
||||||
|
WHERE id IN (SELECT request_id FROM revisions WHERE project=%s and package = ANY(%s));""",
|
||||||
|
(self.project, self.packages),
|
||||||
|
)
|
||||||
|
fs = [
|
||||||
|
executor.submit(refresh_package, self, project, package)
|
||||||
|
for project, package in cur.fetchall()
|
||||||
|
]
|
||||||
|
concurrent.futures.wait(fs)
|
||||||
|
self.db.conn.commit()
|
||||||
aplanas marked this conversation as resolved
aplanas
commented
Is not closing the cursor Is not closing the cursor `with` doing the commit?
Ghost
commented
No, only closing the database connection does. Everything until then stays in a transaction. No, only closing the database connection does. Everything until then stays in a transaction.
|
|||||||
|
|
||||||
|
missing_users = User.missing_users(self.db)
|
||||||
for userid in missing_users:
|
for userid in missing_users:
|
||||||
missing_user = self.obs.user(userid)
|
missing_user = self.obs.user(userid)
|
||||||
if missing_user:
|
if missing_user:
|
||||||
missing_user.import_into_db(db)
|
missing_user.import_into_db(self.db)
|
||||||
|
self.db.conn.commit()
|
||||||
|
|
||||||
self.fill_file_lists(db)
|
self.fill_file_lists()
|
||||||
db.conn.commit()
|
self.db.conn.commit()
|
||||||
|
@ -41,7 +41,9 @@ class ProxySHA256:
|
|||||||
self.hashes = dict()
|
self.hashes = dict()
|
||||||
return
|
return
|
||||||
logging.debug("Retrieve all previously defined SHA256")
|
logging.debug("Retrieve all previously defined SHA256")
|
||||||
response = requests.get(f"http://source.dyn.cloud.suse.de/package/{package}")
|
response = requests.get(
|
||||||
|
f"http://source.dyn.cloud.suse.de/package/{package}", timeout=5
|
||||||
|
)
|
||||||
if response.status_code == 200:
|
if response.status_code == 200:
|
||||||
json = response.json()
|
json = response.json()
|
||||||
self.hashes = json["shas"]
|
self.hashes = json["shas"]
|
||||||
@ -67,6 +69,7 @@ class ProxySHA256:
|
|||||||
"url": url,
|
"url": url,
|
||||||
"package": package,
|
"package": package,
|
||||||
},
|
},
|
||||||
|
timeout=10,
|
||||||
)
|
)
|
||||||
if response.status_code != 200:
|
if response.status_code != 200:
|
||||||
raise Exception(f"Redirector error on {self.url} for {url}")
|
raise Exception(f"Redirector error on {self.url} for {url}")
|
||||||
|
Loading…
Reference in New Issue
Block a user
You mean
> 1
?Actually I meant != 1 :)