From e53b030dcf9627f4fd2f1b3dacdcf04e48178367 Mon Sep 17 00:00:00 2001 From: Stephan Kulow Date: Mon, 17 Oct 2022 15:17:25 +0200 Subject: [PATCH] Split out OBS, ProxySHA256 and Importer --- binary.py | 2 +- git-importer.py | 592 +----------------------------------------------- git.py | 1 + importer.py | 354 +++++++++++++++++++++++++++++ obs.py | 140 ++++++++++++ proxy_sha256.py | 105 +++++++++ 6 files changed, 602 insertions(+), 592 deletions(-) create mode 100644 importer.py create mode 100644 obs.py create mode 100644 proxy_sha256.py diff --git a/binary.py b/binary.py index a449f27..9794d43 100644 --- a/binary.py +++ b/binary.py @@ -25,6 +25,7 @@ BINARY = { ".zst", } + def is_binary_or_large(filename, size): """Decide if is a binary file based on the extension or size""" binary_suffix = BINARY @@ -83,4 +84,3 @@ def is_binary_or_large(filename, size): return True return False - diff --git a/git-importer.py b/git-importer.py index 3aa2cb8..dca5f95 100755 --- a/git-importer.py +++ b/git-importer.py @@ -1,67 +1,14 @@ #!/usr/bin/python3 import argparse -import errno -import functools -import hashlib import logging import pathlib import shutil import sys -import time -import urllib.parse -import xml.etree.ElementTree as ET -from urllib.error import HTTPError import osc.core -import requests - -from request import Request -from git import Git -from history import History -from binary import is_binary_or_large - -# Add a retry wrapper for some of the HTTP actions. -def retry(func): - def wrapper(*args, **kwargs): - retry = 0 - while retry < 5: - try: - return func(*args, **kwargs) - except HTTPError as e: - if 500 <= e.code <= 599: - retry += 1 - logging.warning( - f"HTTPError {e.code} -- Retrying {args[0]} ({retry})" - ) - # TODO: remove when move to async - time.sleep(0.5) - else: - raise - except urllib.error.URLError as e: - if e.reason.errno in (errno.ENETUNREACH, errno.EADDRNOTAVAIL): - retry += 1 - logging.warning(f"URLError {e} -- Retrying {args[0]} ({retry})") - time.sleep(0.5) - else: - logging.warning(f"URLError {e.errno} uncaught") - raise - except OSError as e: - if e.errno in (errno.ENETUNREACH, errno.EADDRNOTAVAIL): # sporadically hits cloud VMs :( - retry += 1 - logging.warning(f"OSError {e} -- Retrying {args[0]} ({retry})") - # TODO: remove when move to async - time.sleep(0.5) - else: - logging.warning(f"OSError {e.errno} uncaught") - raise - - return wrapper - - -osc.core.http_GET = retry(osc.core.http_GET) - +from importer import Importer URL_OBS = "https://api.opensuse.org" URL_IBS = "https://api.suse.de" @@ -94,543 +41,6 @@ PROJECTS = [ ] -def _hash(hash_alg, file_or_path): - h = hash_alg() - - def __hash(f): - while chunk := f.read(1024 * 4): - h.update(chunk) - - if hasattr(file_or_path, "read"): - __hash(file_or_path) - else: - with file_or_path.open("rb") as f: - __hash(f) - return h.hexdigest() - - -md5 = functools.partial(_hash, hashlib.md5) -sha256 = functools.partial(_hash, hashlib.sha256) - - -def _files_hash(hash_alg, dirpath): - """List of (filepath, md5) for a directory""" - # TODO: do it async or multythread - files = [f for f in dirpath.iterdir() if f.is_file()] - return [(f.parts[-1], hash_alg(f)) for f in files] - - -files_md5 = functools.partial(_files_hash, md5) -files_sha256 = functools.partial(_files_hash, sha256) - -class OBS: - def __init__(self, url=None): - if url: - self.change_url(url) - - def change_url(self, url): - self.url = url - osc.conf.get_config(override_apiurl=url) - - def _xml(self, url_path, **params): - url = osc.core.makeurl(self.url, [url_path], params) - logging.debug(f"GET {url}") - return ET.parse(osc.core.http_GET(url)).getroot() - - def _meta(self, project, package, **params): - try: - root = self._xml(f"source/{project}/{package}/_meta", **params) - except HTTPError: - logging.error(f"Package [{project}/{package} {params}] has no meta") - return None - return root - - def _history(self, project, package, **params): - try: - root = self._xml(f"source/{project}/{package}/_history", **params) - except HTTPError: - logging.error(f"Package [{project}/{package} {params}] has no history") - return None - return root - - def _link(self, project, package, rev): - try: - root = self._xml(f"source/{project}/{package}/_link", rev=rev) - except HTTPError: - logging.info("Package has no link") - return None - except ET.ParseError: - logging.error( - f"Package [{project}/{package} rev={rev}] _link can't be parsed" - ) - return root - - def _request(self, requestid): - try: - root = self._xml(f"request/{requestid}") - except HTTPError: - logging.warning(f"Cannot fetch request {requestid}") - return None - return root - - def exists(self, project, package): - root = self._meta(project, package) - if root is None: - return False - return root.get("project") == project - - def devel_project(self, project, package): - root = self._meta(project, package) - devel = root.find("devel") - if devel is None: - return None - return devel.get("project") - - def request(self, requestid): - root = self._request(requestid) - if root is not None: - return Request().parse(root) - - def files(self, project, package, revision): - root = self._xml(f"source/{project}/{package}", rev=revision, expand=1) - return [ - (e.get("name"), int(e.get("size")), e.get("md5")) - for e in root.findall("entry") - ] - - def _download(self, project, package, name, revision): - url = osc.core.makeurl( - self.url, - ["source", project, package, urllib.parse.quote(name)], - {"rev": revision, "expand": 1}, - ) - return osc.core.http_GET(url) - - def download(self, project, package, name, revision, dirpath): - with (dirpath / name).open("wb") as f: - f.write(self._download(project, package, name, revision).read()) - - -class ProxySHA256: - def __init__(self, obs, url=None, enabled=True): - self.obs = obs - self.url = url if url else "http://source.dyn.cloud.suse.de" - self.enabled = enabled - self.hashes = None - self.texts = set() - - def load_package(self, package): - # _project is unreachable for the proxy - due to being a fake package - if package == "_project": - self.enabled = False - self.texts = set(["_config", "_service"]) - self.hashes = dict() - return - logging.info("Retrieve all previously defined SHA256") - response = requests.get(f"http://source.dyn.cloud.suse.de/package/{package}") - if response.status_code == 200: - json = response.json() - self.hashes = json["shas"] - self.texts = set(json["texts"]) - - def get(self, package, name, file_md5): - key = f"{file_md5}-{name}" - if self.hashes is None: - if self.enabled: - self.load_package(package) - else: - self.hashes = {} - return self.hashes.get(key, None) - - def _proxy_put(self, project, package, name, revision, file_md5, size): - quoted_name = urllib.parse.quote(name) - url = f"{self.obs.url}/public/source/{project}/{package}/{quoted_name}?rev={revision}" - response = requests.put( - self.url, - data={ - "hash": file_md5, - "filename": name, - "url": url, - "package": package, - }, - ) - if response.status_code != 200: - raise Exception(f"Redirector error on {self.url} for {url}") - - key = (file_md5, name) - self.hashes[key] = { - "sha256": response.content.decode("utf-8"), - "fsize": size, - } - return self.hashes[key] - - def _obs_put(self, project, package, name, revision, file_md5, size): - key = (file_md5, name) - self.hashes[key] = { - "sha256": sha256(self.obs._download(project, package, name, revision)), - "fsize": size, - } - return self.hashes[key] - - def put(self, project, package, name, revision, file_md5, size): - if not self.enabled: - return self._obs_put(project, package, name, revision, file_md5, size) - return self._proxy_put(project, package, name, revision, file_md5, size) - - def is_text(self, filename): - return filename in self.texts - - def get_or_put(self, project, package, name, revision, file_md5, size): - result = self.get(package, name, file_md5) - if not result: - result = self.put(project, package, name, revision, file_md5, size) - - # Sanity check - if result["fsize"] != size: - raise Exception(f"Redirector has different size for {name}") - - return result - - - - - - -class Importer: - def __init__(self, projects, package, repodir, search_ancestor, rebase_devel): - # The idea is to create each commit in order, and draw the - # same graph described by the revisions timeline. For that we - # need first to fetch all the revisions and sort them - # linearly, based on the timestamp. - # - # After that we recreate the commits, and if one revision is a - # request that contains a target inside the projects in the - # "history", we create a merge commit. - # - # Optionally, if a flag is set, we will try to find a common - # "Initial commit" from a reference branch (the first one in - # "projects", that is safe to assume to be "openSUSE:Factory". - # This is not always a good idea. For example, in a normal - # situation the "devel" project history is older than - # "factory", and we can root the tree on it. But for some - # other projects we lost partially the "devel" history project - # (could be moved), and "factory" is not the root. - - self.package = package - self.search_ancestor = search_ancestor - self.rebase_devel = rebase_devel - - self.obs = OBS() - self.git = Git( - repodir, - committer="Git OBS Bridge", - committer_email="obsbridge@suse.de", - ).create() - self.proxy_sha256 = ProxySHA256(self.obs, enabled=True) - - self.history = History(self.obs, self.package) - - # Add the "devel" project - (project, branch, api_url) = projects[0] - assert project == "openSUSE:Factory" - self.obs.change_url(api_url) - devel_project = self.obs.devel_project(project, package) - if devel_project: - self.projects = [(devel_project, "devel", api_url)] + projects - else: - self.projects = projects - - # Associate the branch and api_url information per project - self.projects_info = { - project: (branch, api_url) for (project, branch, api_url) in self.projects - } - - def download(self, revision): - obs_files = self.obs.files(revision.project, revision.package, revision.srcmd5) - git_files = { - (f.name, f.stat().st_size, md5(f)) - for f in self.git.path.iterdir() - if f.is_file() and f.name not in (".gitattributes") - } - - # Overwrite ".gitattributes" with the - self.git.add_default_lfs_gitattributes(force=True) - - # Download each file in OBS if it is not a binary (or large) - # file - for (name, size, file_md5) in obs_files: - # this file creates easily 100k commits and is just useless data :( - # unfortunately it's stored in the same meta package as the project config - if revision.package == "_project" and name == "_staging_workflow": - continue - # have such files been detected as text mimetype before? - is_text = self.proxy_sha256.is_text(name) - if not is_text and is_binary_or_large(name, size): - file_sha256 = self.proxy_sha256.get_or_put( - revision.project, - revision.package, - name, - revision.srcmd5, - file_md5, - size, - ) - self.git.add_lfs(name, file_sha256["sha256"], size) - else: - if (name, size, file_md5) not in git_files: - print(f"Download {name}") - self.obs.download( - revision.project, - revision.package, - name, - revision.srcmd5, - self.git.path, - ) - # Validate the MD5 of the downloaded file - if md5(self.git.path / name) != file_md5: - raise Exception(f"Download error in {name}") - self.git.add(name) - - # Remove extra files - obs_names = {n for (n, _, _) in obs_files} - git_names = {n for (n, _, _) in git_files} - for name in git_names - obs_names: - print(f"Remove {name}") - self.git.remove(name) - - def import_all_revisions(self, gc): - # Fetch all the requests and sort them. Ideally we should - # build the graph here, to avoid new commits before the merge. - # For now we will sort them and invalidate the commits if - # "rebase_devel" is set. - self.history.fetch_all_revisions(self.projects) - revisions = self.history.sort_all_revisions() - - logging.debug(f"Selected import order for {self.package}") - for revision in revisions: - logging.debug(revision) - - gc_cnt = gc - for revision in revisions: - gc_cnt -= 1 - if gc_cnt <= 0 and gc: - self.git.gc() - gc_cnt = gc - self.import_revision(revision) - - def import_new_revision_with_request(self, revision, request): - """Create a new branch as a result of a merge""" - - submitted_revision = self.history.find_revision( - request.source, request.revisionid, revision.time - ) - if not submitted_revision: - logging.warning(f"Request {request} does not connect to a known revision") - return False - - if not submitted_revision.commit: - # If the revision appointed by the request is not part of - # the git history, we can have an ordering problem. One - # example is "premake4". - self.import_revision(submitted_revision) - - assert submitted_revision.commit is not None - - project = revision.project - branch, _ = self.projects_info[project] - - # TODO: add an empty commit marking the acceptenace of the request (see discussion in PR 2858) - self.git.branch(branch, submitted_revision.commit) - self.git.clean() - self.git.checkout(branch) - - logging.info(f"Create new branch based on {submitted_revision.commit}") - revision.commit = submitted_revision.commit - - def _rebase_branch_history(self, project, revision): - branch, _ = self.projects_info[project] - history = self.history[project] - revision_index = history.index(revision) - for index in range(revision_index + 1, len(history)): - revision = history[index] - # We are done when we have one non-commited revision - if not revision.commit: - return - logging.info(f"Rebasing {revision} from {branch}") - revision.commit = None - self.import_revision(revision) - - def import_revision_with_request(self, revision, request): - """Import a single revision via a merge""" - - submitted_revision = self.history.find_revision( - request.source, request.revisionid, revision.time - ) - if not submitted_revision: - logging.warning(f"Request {request} does not connect to a known revision") - return False - assert submitted_revision.commit is not None - - # TODO: detect a revision, case in point - # Base:System/bash/284 -> rq683701 -> accept O:F/151 - # -> autocommit Base:System/bash/285 - # Revert lead to openSUSE:Factory/bash/152 - # Base:System/286 restored the reverted code in devel project - # rq684575 was created and accepted as O:F/153 - # But the 284-285 and the 285-286 changeset is seen as empty - # as the revert was never in Base:System, so the - # submitted_revision of 684575 has no commit - if submitted_revision.commit == "EMPTY": - logging.warning("Empty commit submitted?!") - return False - - message = ( - f"Accepting request {revision.requestid}: {revision.comment}\n\n{revision}" - ) - commit = self.git.merge( - # TODO: revision.userid or request.creator? - f"OBS User {revision.userid}", - "null@suse.de", - revision.time, - message, - submitted_revision.commit, - ) - - if commit == "EMPTY": - logging.warning("Empty merge. Ignoring the revision and the request") - self.git.merge_abort() - revision.commit = commit - return False - - if commit == "CONFLICT": - logging.info("Merge conflict. Downloading revision") - self.download(revision) - message = f"CONFLICT {message}" - commit = self.git.merge( - f"OBS User {revision.userid}", - "null@suse.de", - revision.time, - message, - submitted_revision.commit, - merged=True, - ) - - assert commit and commit != "CONFLICT" - logging.info(f"Merge with {submitted_revision.commit} into {commit}") - revision.commit = commit - - # TODO: There are more checks to do, like for example, the - # last commit into the non-devel branch should be a merge from - # the devel branch - if self.rebase_devel: - branch, _ = self.projects_info.get(request.source, (None, None)) - if branch == "devel": - self.git.repo.references[f"refs/heads/{branch}"].set_target(commit) - self._rebase_branch_history(request.source, submitted_revision) - - return True - - def matching_request(self, revision): - request = self.obs.request(revision.requestid) - if not request: - return None - - # to be handled by the caller - if request.type() != "submit": - return request - - if request.source not in self.projects_info: - logging.info("Request from a non exported project") - return None - - if request.target != revision.project: - # This seems to happen when the devel project gets - # reinitialized (for example, SR#943593 in 7zip, or - # SR#437901 in ColorFull) - logging.info("Request target different from current project") - return None - - if request.source == request.target: - # this is not a merge, but a different way to do a - # contribution to the (devel) project - see bindfs's rev 1 - logging.info("Request within the same project") - return None - - return request - - def import_revision(self, revision): - """Import a single revision into git""" - project = revision.project - branch, api_url = self.projects_info[project] - - logging.info(f"Importing [{revision}] to {branch}") - - self.obs.change_url(api_url) - - # Populate linkrev and replace srcmd5 from the linked - # revision. If the expansion fails, the revision will be ignored - # and not imported. - if not revision.check_expanded(): - logging.warning(f"Broken revision") - revision.ignored = True - return - - # When doing a SR, we see also a revision in the origin - # project with the outgoing request, but without changes in - # the project. We can ignore them. - # - # If there is a request ID, it will be filtered out later, - # when the target project is different from itself. - if revision.userid == "autobuild" and not revision.requestid: - logging.info("Ignoring autocommit") - revision.ignored = True - return - - if revision.userid == "buildservice-autocommit": - logging.info("Ignoring autocommit") - revision.ignored = True - return - - # Create the reference if the branch is new. If so return - # True. - new_branch = self.git.checkout(branch) - - if revision.requestid: - request = self.matching_request(revision) - if request: - if request.type() == "delete": - # TODO: after this comes a restore, this should be collapsed - # before even hitting git - logging.info("Delete request ignored") - revision.ignored = True - return - - logging.debug(f"Found matching request: #{revision.project} #{request}") - if new_branch: - self.import_new_revision_with_request(revision, request) - return - if self.import_revision_with_request(revision, request): - return - - # Import revision as a single commit (without merging) - self.download(revision) - - if new_branch or self.git.is_dirty(): - commit = self.git.commit( - f"OBS User {revision.userid}", - "null@suse.de", - revision.time, - # TODO: Normalize better the commit message - f"{revision.comment}\n\n{revision}", - # Create an empty commit only if is a new branch - allow_empty=new_branch, - ) - revision.commit = commit - logging.info(f"Commit {commit}") - else: - logging.info("Skip empty commit") - revision.ignored = True - - def main(): parser = argparse.ArgumentParser(description="OBS history importer into git") parser.add_argument("package", help="OBS package name") diff --git a/git.py b/git.py index 71cebe7..c63da49 100644 --- a/git.py +++ b/git.py @@ -8,6 +8,7 @@ from binary import BINARY LFS_SUFFIX = "filter=lfs diff=lfs merge=lfs -text" + class Git: """Local git repository""" diff --git a/importer.py b/importer.py new file mode 100644 index 0000000..c049983 --- /dev/null +++ b/importer.py @@ -0,0 +1,354 @@ +import functools +import logging + +from git import Git +from history import History +from binary import is_binary_or_large +from proxy_sha256 import ProxySHA256, md5, sha256 +from obs import OBS + + +def _files_hash(hash_alg, dirpath): + """List of (filepath, md5) for a directory""" + # TODO: do it async or multythread + files = [f for f in dirpath.iterdir() if f.is_file()] + return [(f.parts[-1], hash_alg(f)) for f in files] + + +files_md5 = functools.partial(_files_hash, md5) +files_sha256 = functools.partial(_files_hash, sha256) + + +class Importer: + def __init__(self, projects, package, repodir, search_ancestor, rebase_devel): + # The idea is to create each commit in order, and draw the + # same graph described by the revisions timeline. For that we + # need first to fetch all the revisions and sort them + # linearly, based on the timestamp. + # + # After that we recreate the commits, and if one revision is a + # request that contains a target inside the projects in the + # "history", we create a merge commit. + # + # Optionally, if a flag is set, we will try to find a common + # "Initial commit" from a reference branch (the first one in + # "projects", that is safe to assume to be "openSUSE:Factory". + # This is not always a good idea. For example, in a normal + # situation the "devel" project history is older than + # "factory", and we can root the tree on it. But for some + # other projects we lost partially the "devel" history project + # (could be moved), and "factory" is not the root. + + self.package = package + self.search_ancestor = search_ancestor + self.rebase_devel = rebase_devel + + self.obs = OBS() + self.git = Git( + repodir, + committer="Git OBS Bridge", + committer_email="obsbridge@suse.de", + ).create() + self.proxy_sha256 = ProxySHA256(self.obs, enabled=True) + + self.history = History(self.obs, self.package) + + # Add the "devel" project + (project, branch, api_url) = projects[0] + assert project == "openSUSE:Factory" + self.obs.change_url(api_url) + devel_project = self.obs.devel_project(project, package) + if devel_project: + self.projects = [(devel_project, "devel", api_url)] + projects + else: + self.projects = projects + + # Associate the branch and api_url information per project + self.projects_info = { + project: (branch, api_url) for (project, branch, api_url) in self.projects + } + + def download(self, revision): + obs_files = self.obs.files(revision.project, revision.package, revision.srcmd5) + git_files = { + (f.name, f.stat().st_size, md5(f)) + for f in self.git.path.iterdir() + if f.is_file() and f.name not in (".gitattributes") + } + + # Overwrite ".gitattributes" with the + self.git.add_default_lfs_gitattributes(force=True) + + # Download each file in OBS if it is not a binary (or large) + # file + for (name, size, file_md5) in obs_files: + # this file creates easily 100k commits and is just useless data :( + # unfortunately it's stored in the same meta package as the project config + if revision.package == "_project" and name == "_staging_workflow": + continue + # have such files been detected as text mimetype before? + is_text = self.proxy_sha256.is_text(name) + if not is_text and is_binary_or_large(name, size): + file_sha256 = self.proxy_sha256.get_or_put( + revision.project, + revision.package, + name, + revision.srcmd5, + file_md5, + size, + ) + self.git.add_lfs(name, file_sha256["sha256"], size) + else: + if (name, size, file_md5) not in git_files: + print(f"Download {name}") + self.obs.download( + revision.project, + revision.package, + name, + revision.srcmd5, + self.git.path, + ) + # Validate the MD5 of the downloaded file + if md5(self.git.path / name) != file_md5: + raise Exception(f"Download error in {name}") + self.git.add(name) + + # Remove extra files + obs_names = {n for (n, _, _) in obs_files} + git_names = {n for (n, _, _) in git_files} + for name in git_names - obs_names: + print(f"Remove {name}") + self.git.remove(name) + + def import_all_revisions(self, gc): + # Fetch all the requests and sort them. Ideally we should + # build the graph here, to avoid new commits before the merge. + # For now we will sort them and invalidate the commits if + # "rebase_devel" is set. + self.history.fetch_all_revisions(self.projects) + revisions = self.history.sort_all_revisions() + + logging.debug(f"Selected import order for {self.package}") + for revision in revisions: + logging.debug(revision) + + gc_cnt = gc + for revision in revisions: + gc_cnt -= 1 + if gc_cnt <= 0 and gc: + self.git.gc() + gc_cnt = gc + self.import_revision(revision) + + def import_new_revision_with_request(self, revision, request): + """Create a new branch as a result of a merge""" + + submitted_revision = self.history.find_revision( + request.source, request.revisionid, revision.time + ) + if not submitted_revision: + logging.warning(f"Request {request} does not connect to a known revision") + return False + + if not submitted_revision.commit: + # If the revision appointed by the request is not part of + # the git history, we can have an ordering problem. One + # example is "premake4". + self.import_revision(submitted_revision) + + assert submitted_revision.commit is not None + + project = revision.project + branch, _ = self.projects_info[project] + + # TODO: add an empty commit marking the acceptenace of the request (see discussion in PR 2858) + self.git.branch(branch, submitted_revision.commit) + self.git.clean() + self.git.checkout(branch) + + logging.info(f"Create new branch based on {submitted_revision.commit}") + revision.commit = submitted_revision.commit + + def _rebase_branch_history(self, project, revision): + branch, _ = self.projects_info[project] + history = self.history[project] + revision_index = history.index(revision) + for index in range(revision_index + 1, len(history)): + revision = history[index] + # We are done when we have one non-commited revision + if not revision.commit: + return + logging.info(f"Rebasing {revision} from {branch}") + revision.commit = None + self.import_revision(revision) + + def import_revision_with_request(self, revision, request): + """Import a single revision via a merge""" + + submitted_revision = self.history.find_revision( + request.source, request.revisionid, revision.time + ) + if not submitted_revision: + logging.warning(f"Request {request} does not connect to a known revision") + return False + assert submitted_revision.commit is not None + + # TODO: detect a revision, case in point + # Base:System/bash/284 -> rq683701 -> accept O:F/151 + # -> autocommit Base:System/bash/285 + # Revert lead to openSUSE:Factory/bash/152 + # Base:System/286 restored the reverted code in devel project + # rq684575 was created and accepted as O:F/153 + # But the 284-285 and the 285-286 changeset is seen as empty + # as the revert was never in Base:System, so the + # submitted_revision of 684575 has no commit + if submitted_revision.commit == "EMPTY": + logging.warning("Empty commit submitted?!") + return False + + message = ( + f"Accepting request {revision.requestid}: {revision.comment}\n\n{revision}" + ) + commit = self.git.merge( + # TODO: revision.userid or request.creator? + f"OBS User {revision.userid}", + "null@suse.de", + revision.time, + message, + submitted_revision.commit, + ) + + if commit == "EMPTY": + logging.warning("Empty merge. Ignoring the revision and the request") + self.git.merge_abort() + revision.commit = commit + return False + + if commit == "CONFLICT": + logging.info("Merge conflict. Downloading revision") + self.download(revision) + message = f"CONFLICT {message}" + commit = self.git.merge( + f"OBS User {revision.userid}", + "null@suse.de", + revision.time, + message, + submitted_revision.commit, + merged=True, + ) + + assert commit and commit != "CONFLICT" + logging.info(f"Merge with {submitted_revision.commit} into {commit}") + revision.commit = commit + + # TODO: There are more checks to do, like for example, the + # last commit into the non-devel branch should be a merge from + # the devel branch + if self.rebase_devel: + branch, _ = self.projects_info.get(request.source, (None, None)) + if branch == "devel": + self.git.repo.references[f"refs/heads/{branch}"].set_target(commit) + self._rebase_branch_history(request.source, submitted_revision) + + return True + + def matching_request(self, revision): + request = self.obs.request(revision.requestid) + if not request: + return None + + # to be handled by the caller + if request.type() != "submit": + return request + + if request.source not in self.projects_info: + logging.info("Request from a non exported project") + return None + + if request.target != revision.project: + # This seems to happen when the devel project gets + # reinitialized (for example, SR#943593 in 7zip, or + # SR#437901 in ColorFull) + logging.info("Request target different from current project") + return None + + if request.source == request.target: + # this is not a merge, but a different way to do a + # contribution to the (devel) project - see bindfs's rev 1 + logging.info("Request within the same project") + return None + + return request + + def import_revision(self, revision): + """Import a single revision into git""" + project = revision.project + branch, api_url = self.projects_info[project] + + logging.info(f"Importing [{revision}] to {branch}") + + self.obs.change_url(api_url) + + # Populate linkrev and replace srcmd5 from the linked + # revision. If the expansion fails, the revision will be ignored + # and not imported. + if not revision.check_expanded(): + logging.warning(f"Broken revision") + revision.ignored = True + return + + # When doing a SR, we see also a revision in the origin + # project with the outgoing request, but without changes in + # the project. We can ignore them. + # + # If there is a request ID, it will be filtered out later, + # when the target project is different from itself. + if revision.userid == "autobuild" and not revision.requestid: + logging.info("Ignoring autocommit") + revision.ignored = True + return + + if revision.userid == "buildservice-autocommit": + logging.info("Ignoring autocommit") + revision.ignored = True + return + + # Create the reference if the branch is new. If so return + # True. + new_branch = self.git.checkout(branch) + + if revision.requestid: + request = self.matching_request(revision) + if request: + if request.type() == "delete": + # TODO: after this comes a restore, this should be collapsed + # before even hitting git + logging.info("Delete request ignored") + revision.ignored = True + return + + logging.debug(f"Found matching request: #{revision.project} #{request}") + if new_branch: + self.import_new_revision_with_request(revision, request) + return + if self.import_revision_with_request(revision, request): + return + + # Import revision as a single commit (without merging) + self.download(revision) + + if new_branch or self.git.is_dirty(): + commit = self.git.commit( + f"OBS User {revision.userid}", + "null@suse.de", + revision.time, + # TODO: Normalize better the commit message + f"{revision.comment}\n\n{revision}", + # Create an empty commit only if is a new branch + allow_empty=new_branch, + ) + revision.commit = commit + logging.info(f"Commit {commit}") + else: + logging.info("Skip empty commit") + revision.ignored = True diff --git a/obs.py b/obs.py new file mode 100644 index 0000000..2de5a11 --- /dev/null +++ b/obs.py @@ -0,0 +1,140 @@ +import osc.core +import xml.etree.ElementTree as ET +import logging +import urllib.parse +from urllib.error import HTTPError +import time +import errno + +from request import Request + +# Add a retry wrapper for some of the HTTP actions. +def retry(func): + def wrapper(*args, **kwargs): + retry = 0 + while retry < 5: + try: + return func(*args, **kwargs) + except HTTPError as e: + if 500 <= e.code <= 599: + retry += 1 + logging.warning( + f"HTTPError {e.code} -- Retrying {args[0]} ({retry})" + ) + # TODO: remove when move to async + time.sleep(0.5) + else: + raise + except urllib.error.URLError as e: + if e.reason.errno in (errno.ENETUNREACH, errno.EADDRNOTAVAIL): + retry += 1 + logging.warning(f"URLError {e} -- Retrying {args[0]} ({retry})") + time.sleep(0.5) + else: + logging.warning(f"URLError {e.errno} uncaught") + raise + except OSError as e: + if e.errno in ( + errno.ENETUNREACH, + errno.EADDRNOTAVAIL, + ): # sporadically hits cloud VMs :( + retry += 1 + logging.warning(f"OSError {e} -- Retrying {args[0]} ({retry})") + # TODO: remove when move to async + time.sleep(0.5) + else: + logging.warning(f"OSError {e.errno} uncaught") + raise + + return wrapper + + +osc.core.http_GET = retry(osc.core.http_GET) + + +class OBS: + def __init__(self, url=None): + if url: + self.change_url(url) + + def change_url(self, url): + self.url = url + osc.conf.get_config(override_apiurl=url) + + def _xml(self, url_path, **params): + url = osc.core.makeurl(self.url, [url_path], params) + logging.debug(f"GET {url}") + return ET.parse(osc.core.http_GET(url)).getroot() + + def _meta(self, project, package, **params): + try: + root = self._xml(f"source/{project}/{package}/_meta", **params) + except HTTPError: + logging.error(f"Package [{project}/{package} {params}] has no meta") + return None + return root + + def _history(self, project, package, **params): + try: + root = self._xml(f"source/{project}/{package}/_history", **params) + except HTTPError: + logging.error(f"Package [{project}/{package} {params}] has no history") + return None + return root + + def _link(self, project, package, rev): + try: + root = self._xml(f"source/{project}/{package}/_link", rev=rev) + except HTTPError: + logging.info("Package has no link") + return None + except ET.ParseError: + logging.error( + f"Package [{project}/{package} rev={rev}] _link can't be parsed" + ) + return root + + def _request(self, requestid): + try: + root = self._xml(f"request/{requestid}") + except HTTPError: + logging.warning(f"Cannot fetch request {requestid}") + return None + return root + + def exists(self, project, package): + root = self._meta(project, package) + if root is None: + return False + return root.get("project") == project + + def devel_project(self, project, package): + root = self._meta(project, package) + devel = root.find("devel") + if devel is None: + return None + return devel.get("project") + + def request(self, requestid): + root = self._request(requestid) + if root is not None: + return Request().parse(root) + + def files(self, project, package, revision): + root = self._xml(f"source/{project}/{package}", rev=revision, expand=1) + return [ + (e.get("name"), int(e.get("size")), e.get("md5")) + for e in root.findall("entry") + ] + + def _download(self, project, package, name, revision): + url = osc.core.makeurl( + self.url, + ["source", project, package, urllib.parse.quote(name)], + {"rev": revision, "expand": 1}, + ) + return osc.core.http_GET(url) + + def download(self, project, package, name, revision, dirpath): + with (dirpath / name).open("wb") as f: + f.write(self._download(project, package, name, revision).read()) diff --git a/proxy_sha256.py b/proxy_sha256.py new file mode 100644 index 0000000..1d55cb2 --- /dev/null +++ b/proxy_sha256.py @@ -0,0 +1,105 @@ +import logging +import requests +import urllib +import functools +import hashlib + + +def _hash(hash_alg, file_or_path): + h = hash_alg() + + def __hash(f): + while chunk := f.read(1024 * 4): + h.update(chunk) + + if hasattr(file_or_path, "read"): + __hash(file_or_path) + else: + with file_or_path.open("rb") as f: + __hash(f) + return h.hexdigest() + + +md5 = functools.partial(_hash, hashlib.md5) +sha256 = functools.partial(_hash, hashlib.sha256) + + +class ProxySHA256: + def __init__(self, obs, url=None, enabled=True): + self.obs = obs + self.url = url if url else "http://source.dyn.cloud.suse.de" + self.enabled = enabled + self.hashes = None + self.texts = set() + + def load_package(self, package): + # _project is unreachable for the proxy - due to being a fake package + if package == "_project": + self.enabled = False + self.texts = set(["_config", "_service"]) + self.hashes = dict() + return + logging.info("Retrieve all previously defined SHA256") + response = requests.get(f"http://source.dyn.cloud.suse.de/package/{package}") + if response.status_code == 200: + json = response.json() + self.hashes = json["shas"] + self.texts = set(json["texts"]) + + def get(self, package, name, file_md5): + key = f"{file_md5}-{name}" + if self.hashes is None: + if self.enabled: + self.load_package(package) + else: + self.hashes = {} + return self.hashes.get(key, None) + + def _proxy_put(self, project, package, name, revision, file_md5, size): + quoted_name = urllib.parse.quote(name) + url = f"{self.obs.url}/public/source/{project}/{package}/{quoted_name}?rev={revision}" + response = requests.put( + self.url, + data={ + "hash": file_md5, + "filename": name, + "url": url, + "package": package, + }, + ) + if response.status_code != 200: + raise Exception(f"Redirector error on {self.url} for {url}") + + key = (file_md5, name) + self.hashes[key] = { + "sha256": response.content.decode("utf-8"), + "fsize": size, + } + return self.hashes[key] + + def _obs_put(self, project, package, name, revision, file_md5, size): + key = (file_md5, name) + self.hashes[key] = { + "sha256": sha256(self.obs._download(project, package, name, revision)), + "fsize": size, + } + return self.hashes[key] + + def put(self, project, package, name, revision, file_md5, size): + if not self.enabled: + return self._obs_put(project, package, name, revision, file_md5, size) + return self._proxy_put(project, package, name, revision, file_md5, size) + + def is_text(self, filename): + return filename in self.texts + + def get_or_put(self, project, package, name, revision, file_md5, size): + result = self.get(package, name, file_md5) + if not result: + result = self.put(project, package, name, revision, file_md5, size) + + # Sanity check + if result["fsize"] != size: + raise Exception(f"Redirector has different size for {name}") + + return result