forked from importers/git-importer
107 lines
3.7 KiB
Python
107 lines
3.7 KiB
Python
import itertools
|
|
import logging
|
|
import re
|
|
|
|
from lib.revision import Revision
|
|
|
|
|
|
class History:
|
|
"""Store the history of revisions of a package in different
|
|
projects.
|
|
|
|
"""
|
|
|
|
def __init__(self, obs, package):
|
|
self.obs = obs
|
|
self.package = package
|
|
|
|
self.revisions = {}
|
|
|
|
def __contains__(self, project):
|
|
return project in self.revisions
|
|
|
|
def __getitem__(self, project):
|
|
return self.revisions[project]
|
|
|
|
def _extract_copypac(self, comment):
|
|
original_project = re.findall(
|
|
r"osc copypac from project:(.*) package:", comment
|
|
)
|
|
return original_project[0] if original_project else None
|
|
|
|
def _fetch_revisions(self, project, **params):
|
|
root = self.obs._history(project, self.package, **params)
|
|
if root is not None:
|
|
return [
|
|
Revision(self.obs, self, project, self.package).parse(r)
|
|
for r in root.findall("revision")
|
|
]
|
|
|
|
def fetch_revisions(self, project, follow_copypac=False):
|
|
"""Get the revision history of a package"""
|
|
if project in self:
|
|
return
|
|
|
|
revs = self._fetch_revisions(project)
|
|
self.revisions[project] = revs
|
|
# while (
|
|
# revs
|
|
# and follow_copypac
|
|
# and (copypac_project := self._extract_copypac(revs[0].comment))
|
|
# ):
|
|
# # Add the history pre-copypac
|
|
# # TODO: missing the old project name
|
|
# revs = self._fetch_revisions(copypac_project, deleted=1)
|
|
# self.revisions[project] = (
|
|
# revs + self.revisions[project]
|
|
# )
|
|
|
|
def fetch_all_revisions(self, projects):
|
|
"""Pre-populate the history"""
|
|
for project, _, api_url in projects:
|
|
self.obs.change_url(api_url)
|
|
self.fetch_revisions(project)
|
|
|
|
def sort_all_revisions(self):
|
|
"""Sort revisions for all projects, from older to newer"""
|
|
return sorted(itertools.chain(*self.revisions.values()), key=lambda x: x.time)
|
|
|
|
def find_revision(self, project, revisionid, accepted_at):
|
|
last_commited_revision = None
|
|
for r in self.revisions.get(project, []):
|
|
logging.debug(f"Find revision {revisionid} [{accepted_at}]: {r}")
|
|
if str(r.rev) == str(revisionid) or r.srcmd5 == revisionid:
|
|
if r.ignored:
|
|
logging.debug(
|
|
f"{r} fits but is ignored, returning {last_commited_revision}"
|
|
)
|
|
return last_commited_revision
|
|
else:
|
|
logging.debug(f"{r} fits")
|
|
return r
|
|
if r.time > accepted_at:
|
|
# if we can't find the right revision, we take the last
|
|
# commit. Before ~2012 the data was tracked really loosely
|
|
# (e.g. using different timezones and the state field was
|
|
# only introduced in 2016...)
|
|
logging.warning(
|
|
f"Deploying workaround for missing request revision - returning {last_commited_revision}"
|
|
)
|
|
return last_commited_revision
|
|
if r.commit:
|
|
last_commited_revision = r
|
|
logging.info("No commited revision found, returning None")
|
|
return None
|
|
|
|
def find_last_rev_after_time(self, project, time):
|
|
# revs = self.projects.get(project, [])
|
|
# return next((r for r in reversed(revs) if r.time <= time), None)
|
|
prev = None
|
|
for rev in self.revisions.get(project, []):
|
|
if rev.time > time:
|
|
return prev
|
|
if rev.time == time:
|
|
return rev
|
|
prev = rev
|
|
return prev
|