git-importer/lib/obs.py
nkrapp 7678967ae0
implement file caching
to prevent having to download files multiple times
2022-11-03 14:05:11 +01:00

202 lines
6.3 KiB
Python

import errno
import logging
import shutil
import time
import urllib.parse
import xml.etree.ElementTree as ET
from urllib.error import HTTPError
from pathlib import Path
import osc.core
from lib.request import Request
from lib.user import User
# Add a retry wrapper for some of the HTTP actions.
def retry(func):
def wrapper(*args, **kwargs):
retry = 0
while retry < 5:
try:
return func(*args, **kwargs)
except HTTPError as e:
if 500 <= e.code <= 599:
retry += 1
logging.warning(
f"HTTPError {e.code} -- Retrying {args[0]} ({retry})"
)
# TODO: remove when move to async
time.sleep(0.5)
else:
raise
except urllib.error.URLError as e:
if e.reason.errno in (errno.ENETUNREACH, errno.EADDRNOTAVAIL):
retry += 1
logging.warning(f"URLError {e} -- Retrying {args[0]} ({retry})")
time.sleep(0.5)
else:
logging.warning(f"URLError {e.errno} uncaught")
raise
except OSError as e:
if e.errno in (
errno.ENETUNREACH,
errno.EADDRNOTAVAIL,
): # sporadically hits cloud VMs :(
retry += 1
logging.warning(f"OSError {e} -- Retrying {args[0]} ({retry})")
# TODO: remove when move to async
time.sleep(0.5)
else:
logging.warning(f"OSError {e.errno} uncaught")
raise
return wrapper
osc.core.http_GET = retry(osc.core.http_GET)
class OBS:
def __init__(self, url=None):
if url:
self.change_url(url)
def change_url(self, url):
self.url = url
osc.conf.get_config(override_apiurl=url)
def _xml(self, url_path, **params):
url = osc.core.makeurl(self.url, [url_path], params)
logging.debug(f"GET {url}")
return ET.parse(osc.core.http_GET(url)).getroot()
def _meta(self, project, package, **params):
try:
root = self._xml(f"source/{project}/{package}/_meta", **params)
except HTTPError:
logging.error(f"Package [{project}/{package} {params}] has no meta")
return None
return root
def _history(self, project, package, **params):
try:
root = self._xml(f"source/{project}/{package}/_history", **params)
except HTTPError:
logging.error(f"Package [{project}/{package} {params}] has no history")
return None
return root
def _user(self, userid, **params):
try:
root = self._xml(f"/person/{userid}", **params)
except HTTPError:
logging.error(f"UserID {userid} not found")
return None
return root
def _link(self, project, package, rev):
try:
root = self._xml(f"source/{project}/{package}/_link", rev=rev)
except HTTPError:
logging.info("Package has no link")
return None
except ET.ParseError:
logging.error(
f"Package [{project}/{package} rev={rev}] _link can't be parsed"
)
return root
def _request(self, requestid):
try:
root = self._xml(f"request/{requestid}")
except HTTPError:
logging.warning(f"Cannot fetch request {requestid}")
return None
return root
def exists(self, project, package):
root = self._meta(project, package)
if root is None:
return False
return root.get("project") == project
def devel_project(self, project, package):
root = self._meta(project, package)
devel = root.find("devel")
if devel is None:
return None
return devel.get("project")
def request(self, requestid):
root = self._request(requestid)
if root is not None:
return Request().parse(root)
def user(self, userid):
root = self._user(userid)
if root is not None:
return User().parse(root, userid)
def files(self, project, package, revision):
root = self._xml(f"source/{project}/{package}", rev=revision, expand=1)
return [
(e.get("name"), int(e.get("size")), e.get("md5"))
for e in root.findall("entry")
]
def _download(self, project, package, name, revision):
url = osc.core.makeurl(
self.url,
["source", project, package, urllib.parse.quote(name)],
{"rev": revision, "expand": 1},
)
return osc.core.http_GET(url)
def download(
self,
project: str,
package: str,
name: str,
revision: str,
dirpath: str,
file_md5: str,
) -> None:
cached_file = self._path_from_md5(name, dirpath, file_md5)
if not self.in_cache(name, dirpath, file_md5):
with (dirpath / name).open("wb") as f:
f.write(self._download(project, package, name, revision).read())
shutil.copy(dirpath / name, cached_file)
else:
shutil.copy(cached_file, dirpath / name)
def list(self, project, package, srcmd5, linkrev):
params = {"rev": srcmd5, "expand": "1"}
if linkrev:
params["linkrev"] = linkrev
try:
root = self._xml(f"source/{project}/{package}", **params)
except HTTPError as e:
if e.code == 400:
logging.error(
f"Package [{project}/{package} {params}] can't be expanded: {e}"
)
return None
raise e
return root
def _path_from_md5(self, name, dirpath, md5):
cache = dirpath.joinpath(".cache/")
if not Path(cache).exists():
cache.mkdir()
filepath = cache.joinpath(f"{md5[0:3]}/{md5[3:6]}/{md5[6:9]}/")
filepath.mkdir(parents=True, exist_ok=True)
return filepath.joinpath(f"{md5[9:]}-{name}")
def in_cache(self, name, dirpath, md5):
if self._path_from_md5(name, dirpath, md5).is_file():
return True
return False