Split out History class

This commit is contained in:
Stephan Kulow 2022-10-17 14:58:18 +02:00
parent 13f49f59c9
commit bdc1dc0cc9
4 changed files with 198 additions and 188 deletions

86
binary.py Normal file
View File

@ -0,0 +1,86 @@
import pathlib
BINARY = {
".7z",
".bsp",
".bz2",
".gem",
".gz",
".jar",
".lz",
".lzma",
".obscpio",
".oxt",
".pdf",
".png",
".rpm",
".tbz",
".tbz2",
".tgz",
".ttf",
".txz",
".whl",
".xz",
".zip",
".zst",
}
def is_binary_or_large(filename, size):
"""Decide if is a binary file based on the extension or size"""
binary_suffix = BINARY
non_binary_suffix = {
".1",
".8",
".SUSE",
".asc",
".c",
".cabal",
".cfg",
".changes",
".conf",
".desktop",
".dif",
".diff",
".dsc",
".el",
".html",
".in",
".init",
".install",
".keyring",
".kiwi",
".logrotate",
".macros",
".md",
".obsinfo",
".pamd",
".patch",
".pl",
".pom",
".py",
".rpmlintrc",
".rules",
".script",
".service",
".sh",
".sig",
".sign",
".spec",
".sysconfig",
".test",
".txt",
".xml",
".xml",
".yml",
}
suffix = pathlib.Path(filename).suffix
if suffix in binary_suffix:
return True
if suffix in non_binary_suffix:
return False
if size >= 6 * 1024:
return True
return False

View File

@ -4,10 +4,8 @@ import argparse
import errno
import functools
import hashlib
import itertools
import logging
import pathlib
import re
import shutil
import sys
import time
@ -19,7 +17,9 @@ import osc.core
import requests
from request import Request
from revision import Revision
from git import Git
from history import History
from binary import is_binary_or_large
# Add a retry wrapper for some of the HTTP actions.
def retry(func):
@ -62,32 +62,6 @@ def retry(func):
osc.core.http_GET = retry(osc.core.http_GET)
BINARY = {
".7z",
".bsp",
".bz2",
".gem",
".gz",
".jar",
".lz",
".lzma",
".obscpio",
".oxt",
".pdf",
".png",
".rpm",
".tbz",
".tbz2",
".tgz",
".ttf",
".txz",
".whl",
".xz",
".zip",
".zst",
}
LFS_SUFFIX = "filter=lfs diff=lfs merge=lfs -text"
URL_OBS = "https://api.opensuse.org"
URL_IBS = "https://api.suse.de"
@ -120,66 +94,6 @@ PROJECTS = [
]
def is_binary_or_large(filename, size):
"""Decide if is a binary file based on the extension or size"""
binary_suffix = BINARY
non_binary_suffix = {
".1",
".8",
".SUSE",
".asc",
".c",
".cabal",
".cfg",
".changes",
".conf",
".desktop",
".dif",
".diff",
".dsc",
".el",
".html",
".in",
".init",
".install",
".keyring",
".kiwi",
".logrotate",
".macros",
".md",
".obsinfo",
".pamd",
".patch",
".pl",
".pom",
".py",
".rpmlintrc",
".rules",
".script",
".service",
".sh",
".sig",
".sign",
".spec",
".sysconfig",
".test",
".txt",
".xml",
".xml",
".yml",
}
suffix = pathlib.Path(filename).suffix
if suffix in binary_suffix:
return True
if suffix in non_binary_suffix:
return False
if size >= 6 * 1024:
return True
return False
def _hash(hash_alg, file_or_path):
h = hash_alg()
@ -380,105 +294,6 @@ class ProxySHA256:
class History:
"""Store the history of revisions of a package in different
projects.
"""
def __init__(self, obs, package):
self.obs = obs
self.package = package
self.revisions = {}
def __contains__(self, project):
return project in self.revisions
def __getitem__(self, project):
return self.revisions[project]
def _extract_copypac(self, comment):
original_project = re.findall(
r"osc copypac from project:(.*) package:", comment
)
return original_project[0] if original_project else None
def _fetch_revisions(self, project, **params):
root = self.obs._history(project, self.package, **params)
if root is not None:
return [
Revision(self.obs, self, project, self.package).parse(r)
for r in root.findall("revision")
]
def fetch_revisions(self, project, follow_copypac=False):
"""Get the revision history of a package"""
if project in self:
return
revs = self._fetch_revisions(project)
self.revisions[project] = revs
# while (
# revs
# and follow_copypac
# and (copypac_project := self._extract_copypac(revs[0].comment))
# ):
# # Add the history pre-copypac
# # TODO: missing the old project name
# revs = self._fetch_revisions(copypac_project, deleted=1)
# self.revisions[project] = (
# revs + self.revisions[project]
# )
def fetch_all_revisions(self, projects):
"""Pre-populate the history"""
for project, _, api_url in projects:
self.obs.change_url(api_url)
self.fetch_revisions(project)
def sort_all_revisions(self):
"""Sort revisions for all projects, from older to newer"""
return sorted(itertools.chain(*self.revisions.values()), key=lambda x: x.time)
def find_revision(self, project, revisionid, accepted_at):
last_commited_revision = None
for r in self.revisions.get(project, []):
logging.debug(f"Find revision {revisionid} [{accepted_at}]: {r}")
if str(r.rev) == str(revisionid) or r.srcmd5 == revisionid:
if r.ignored:
logging.debug(
f"{r} fits but is ignored, returning {last_commited_revision}"
)
return last_commited_revision
else:
logging.debug(f"{r} fits")
return r
if r.time > accepted_at:
# if we can't find the right revision, we take the last
# commit. Before ~2012 the data was tracked really loosely
# (e.g. using different timezones and the state field was
# only introduced in 2016...)
logging.warning(
f"Deploying workaround for missing request revision - returning {last_commited_revision}"
)
return last_commited_revision
if r.commit:
last_commited_revision = r
logging.info("No commited revision found, returning None")
return None
def find_last_rev_after_time(self, project, time):
# revs = self.projects.get(project, [])
# return next((r for r in reversed(revs) if r.time <= time), None)
prev = None
for rev in self.revisions.get(project, []):
if rev.time > time:
return prev
if rev.time == time:
return rev
prev = rev
return prev
class Importer:

3
git.py
View File

@ -4,6 +4,9 @@ import logging
import subprocess
import fnmatch
from binary import BINARY
LFS_SUFFIX = "filter=lfs diff=lfs merge=lfs -text"
class Git:
"""Local git repository"""

106
history.py Normal file
View File

@ -0,0 +1,106 @@
import re
import logging
import itertools
from revision import Revision
class History:
"""Store the history of revisions of a package in different
projects.
"""
def __init__(self, obs, package):
self.obs = obs
self.package = package
self.revisions = {}
def __contains__(self, project):
return project in self.revisions
def __getitem__(self, project):
return self.revisions[project]
def _extract_copypac(self, comment):
original_project = re.findall(
r"osc copypac from project:(.*) package:", comment
)
return original_project[0] if original_project else None
def _fetch_revisions(self, project, **params):
root = self.obs._history(project, self.package, **params)
if root is not None:
return [
Revision(self.obs, self, project, self.package).parse(r)
for r in root.findall("revision")
]
def fetch_revisions(self, project, follow_copypac=False):
"""Get the revision history of a package"""
if project in self:
return
revs = self._fetch_revisions(project)
self.revisions[project] = revs
# while (
# revs
# and follow_copypac
# and (copypac_project := self._extract_copypac(revs[0].comment))
# ):
# # Add the history pre-copypac
# # TODO: missing the old project name
# revs = self._fetch_revisions(copypac_project, deleted=1)
# self.revisions[project] = (
# revs + self.revisions[project]
# )
def fetch_all_revisions(self, projects):
"""Pre-populate the history"""
for project, _, api_url in projects:
self.obs.change_url(api_url)
self.fetch_revisions(project)
def sort_all_revisions(self):
"""Sort revisions for all projects, from older to newer"""
return sorted(itertools.chain(*self.revisions.values()), key=lambda x: x.time)
def find_revision(self, project, revisionid, accepted_at):
last_commited_revision = None
for r in self.revisions.get(project, []):
logging.debug(f"Find revision {revisionid} [{accepted_at}]: {r}")
if str(r.rev) == str(revisionid) or r.srcmd5 == revisionid:
if r.ignored:
logging.debug(
f"{r} fits but is ignored, returning {last_commited_revision}"
)
return last_commited_revision
else:
logging.debug(f"{r} fits")
return r
if r.time > accepted_at:
# if we can't find the right revision, we take the last
# commit. Before ~2012 the data was tracked really loosely
# (e.g. using different timezones and the state field was
# only introduced in 2016...)
logging.warning(
f"Deploying workaround for missing request revision - returning {last_commited_revision}"
)
return last_commited_revision
if r.commit:
last_commited_revision = r
logging.info("No commited revision found, returning None")
return None
def find_last_rev_after_time(self, project, time):
# revs = self.projects.get(project, [])
# return next((r for r in reversed(revs) if r.time <= time), None)
prev = None
for rev in self.revisions.get(project, []):
if rev.time > time:
return prev
if rev.time == time:
return rev
prev = rev
return prev