forked from adamm/git-importer
7bc4d6c8b1
As we're downloading packages in parallel, it could happen that we copy a file that isn't fully copied yet
206 lines
6.4 KiB
Python
206 lines
6.4 KiB
Python
import errno
|
|
import logging
|
|
import os
|
|
import shutil
|
|
import time
|
|
import urllib.parse
|
|
import xml.etree.ElementTree as ET
|
|
from urllib.error import HTTPError
|
|
|
|
import osc.core
|
|
|
|
from lib.proxy_sha256 import md5
|
|
from lib.request import Request
|
|
from lib.user import User
|
|
|
|
|
|
# Add a retry wrapper for some of the HTTP actions.
|
|
def retry(func):
|
|
def wrapper(*args, **kwargs):
|
|
retry = 0
|
|
while retry < 5:
|
|
try:
|
|
return func(*args, **kwargs)
|
|
except HTTPError as e:
|
|
if 500 <= e.code <= 599:
|
|
retry += 1
|
|
logging.warning(
|
|
f"HTTPError {e.code} -- Retrying {args[0]} ({retry})"
|
|
)
|
|
# TODO: remove when move to async
|
|
time.sleep(0.5)
|
|
else:
|
|
raise
|
|
except urllib.error.URLError as e:
|
|
if e.reason.errno in (errno.ENETUNREACH, errno.EADDRNOTAVAIL):
|
|
retry += 1
|
|
logging.warning(f"URLError {e} -- Retrying {args[0]} ({retry})")
|
|
time.sleep(0.5)
|
|
else:
|
|
logging.warning(f"URLError {e.errno} uncaught")
|
|
raise
|
|
except OSError as e:
|
|
if e.errno in (
|
|
errno.ENETUNREACH,
|
|
errno.EADDRNOTAVAIL,
|
|
): # sporadically hits cloud VMs :(
|
|
retry += 1
|
|
logging.warning(f"OSError {e} -- Retrying {args[0]} ({retry})")
|
|
# TODO: remove when move to async
|
|
time.sleep(0.5)
|
|
else:
|
|
logging.warning(f"OSError {e.errno} uncaught")
|
|
raise
|
|
|
|
return wrapper
|
|
|
|
|
|
osc.core.http_GET = retry(osc.core.http_GET)
|
|
|
|
|
|
class OBS:
|
|
def __init__(self, url=None):
|
|
if url:
|
|
self.change_url(url)
|
|
|
|
def change_url(self, url):
|
|
self.url = url
|
|
osc.conf.get_config(override_apiurl=url)
|
|
|
|
def _xml(self, url_path, **params):
|
|
url = osc.core.makeurl(self.url, [url_path], params)
|
|
logging.debug(f"GET {url}")
|
|
return ET.parse(osc.core.http_GET(url)).getroot()
|
|
|
|
def _meta(self, project, package, **params):
|
|
try:
|
|
root = self._xml(f"source/{project}/{package}/_meta", **params)
|
|
except HTTPError:
|
|
logging.error(f"Package [{project}/{package} {params}] has no meta")
|
|
return None
|
|
return root
|
|
|
|
def _history(self, project, package, **params):
|
|
try:
|
|
root = self._xml(f"source/{project}/{package}/_history", **params)
|
|
except HTTPError:
|
|
logging.error(f"Package [{project}/{package} {params}] has no history")
|
|
return None
|
|
return root
|
|
|
|
def _user(self, userid, **params):
|
|
try:
|
|
root = self._xml(f"/person/{userid}", **params)
|
|
except HTTPError:
|
|
logging.error(f"UserID {userid} not found")
|
|
return None
|
|
return root
|
|
|
|
def _link(self, project, package, rev):
|
|
try:
|
|
root = self._xml(f"source/{project}/{package}/_link", rev=rev)
|
|
except HTTPError:
|
|
logging.info("Package has no link")
|
|
return None
|
|
except ET.ParseError:
|
|
logging.error(
|
|
f"Package [{project}/{package} rev={rev}] _link can't be parsed"
|
|
)
|
|
return root
|
|
|
|
def _request(self, requestid):
|
|
try:
|
|
root = self._xml(f"request/{requestid}")
|
|
except HTTPError:
|
|
logging.warning(f"Cannot fetch request {requestid}")
|
|
return None
|
|
return root
|
|
|
|
def exists(self, project, package):
|
|
root = self._meta(project, package)
|
|
if root is None:
|
|
return False
|
|
return root.get("project") == project
|
|
|
|
def devel_project(self, project, package):
|
|
root = self._meta(project, package)
|
|
devel = root.find("devel")
|
|
if devel is None:
|
|
return None
|
|
return devel.get("project")
|
|
|
|
def request(self, requestid):
|
|
root = self._request(requestid)
|
|
if root is not None:
|
|
return Request().parse(root)
|
|
|
|
def user(self, userid):
|
|
root = self._user(userid)
|
|
if root is not None:
|
|
return User().parse(root, userid)
|
|
|
|
def files(self, project, package, revision):
|
|
root = self._xml(f"source/{project}/{package}", rev=revision, expand=1)
|
|
return [
|
|
(e.get("name"), int(e.get("size")), e.get("md5"))
|
|
for e in root.findall("entry")
|
|
]
|
|
|
|
def _download(self, project, package, name, revision):
|
|
url = osc.core.makeurl(
|
|
self.url,
|
|
["source", project, package, urllib.parse.quote(name)],
|
|
{"rev": revision, "expand": 1},
|
|
)
|
|
return osc.core.http_GET(url)
|
|
|
|
def download(
|
|
self,
|
|
project: str,
|
|
package: str,
|
|
name: str,
|
|
revision: str,
|
|
dirpath: str,
|
|
cachedir: str,
|
|
file_md5: str,
|
|
) -> None:
|
|
|
|
cached_file = self._path_from_md5(name, cachedir, file_md5)
|
|
if not self.in_cache(name, cachedir, file_md5):
|
|
with (dirpath / name).open("wb") as f:
|
|
f.write(self._download(project, package, name, revision).read())
|
|
|
|
# Validate the MD5 of the downloaded file
|
|
if md5(dirpath / name) != file_md5:
|
|
raise Exception(f"Download error in {name}")
|
|
|
|
shutil.copy(dirpath / name, cached_file.with_suffix(".new"))
|
|
os.rename(cached_file.with_suffix(".new"), cached_file)
|
|
else:
|
|
shutil.copy(cached_file, dirpath / name)
|
|
|
|
def list(self, project, package, srcmd5, linkrev):
|
|
params = {"rev": srcmd5, "expand": "1"}
|
|
if linkrev:
|
|
params["linkrev"] = linkrev
|
|
|
|
try:
|
|
root = self._xml(f"source/{project}/{package}", **params)
|
|
except HTTPError as e:
|
|
if e.code == 400:
|
|
logging.error(
|
|
f"Package [{project}/{package} {params}] can't be expanded: {e}"
|
|
)
|
|
return None
|
|
raise e
|
|
|
|
return root
|
|
|
|
def _path_from_md5(self, name, cachedir, md5):
|
|
filepath = cachedir / md5[:3]
|
|
filepath.mkdir(parents=True, exist_ok=True)
|
|
return filepath / md5[3:]
|
|
|
|
def in_cache(self, name, cachedir, md5):
|
|
return self._path_from_md5(name, cachedir, md5).exists()
|