1
0
mirror of https://github.com/openSUSE/osc.git synced 2025-01-03 21:36:15 +01:00
github.com_openSUSE_osc/osc/util/repodata.py
Adam Williamson 13a13a87c4 Fix ElementTree imports for Python 3.9
Importing `cElementTree` has been deprecated since Python 3.3 -
importing `ElementTree` automatically uses the fastest
implementation available - and is finally removed in Python 3.9.
Importing cElementTree directly (not as part of xml) is an even
older relic, it's for Ye Time Before ElementTree Was Added To
Python and it was instead an external module...which was before
Python 2.5.

We still need to work with Python 2.7 for now, so we use a try/
except to handle both 2.7 and 3.9 cases. Also, let's not repeat
this import 12 times in one file for some reason.

Signed-off-by: Adam Williamson <awilliam@redhat.com>
2020-06-02 15:13:10 -07:00

233 lines
7.0 KiB
Python

"""Module for reading repodata directory (created with createrepo) for package
information instead of scanning individual rpms."""
# standard modules
import gzip
import os.path
try:
# Works up to Python 3.8, needed for Python < 3.3 (inc 2.7)
from xml.etree import cElementTree as ET
except ImportError:
# will import a fast implementation from 3.3 onwards, needed
# for 3.9+
from xml.etree import ElementTree as ET
# project modules
import osc.util.rpmquery
import osc.util.packagequery
def namespace(name):
return "{http://linux.duke.edu/metadata/%s}" % name
OPERATOR_BY_FLAGS = {
"EQ" : "=",
"LE" : "<=",
"GE" : ">=",
"LT" : "<",
"GT" : ">"
}
def primaryPath(directory):
"""Returns path to the primary repository data file.
@param directory repository directory that contains the repodata subdirectory
@return str path to primary repository data file
@raise IOError if repomd.xml contains no primary location
"""
metaDataPath = os.path.join(directory, "repodata", "repomd.xml")
elementTree = ET.parse(metaDataPath)
root = elementTree.getroot()
for dataElement in root:
if dataElement.get("type") == "primary":
locationElement = dataElement.find(namespace("repo") + "location")
# even though the repomd.xml file is under repodata, the location a
# attribute is relative to parent directory (directory).
primaryPath = os.path.join(directory, locationElement.get("href"))
break
else:
raise IOError("'%s' contains no primary location" % metaDataPath)
return primaryPath
def queries(directory):
"""Returns a list of RepoDataQueries constructed from the repodata under
the directory.
@param directory path to a repository directory (parent directory of
repodata directory)
@return list of RepoDataQueryResult instances
@raise IOError if repomd.xml contains no primary location
"""
path = primaryPath(directory)
gunzippedPrimary = gzip.GzipFile(path)
elementTree = ET.parse(gunzippedPrimary)
root = elementTree.getroot()
packageQueries = []
for packageElement in root:
packageQuery = RepoDataQueryResult(directory, packageElement)
packageQueries.append(packageQuery)
return packageQueries
def _to_bytes_or_None(method):
def _method(self, *args, **kwargs):
res = method(self, *args, **kwargs)
if res is None:
return None
return res.encode()
return _method
def _to_bytes_list(method):
def _method(self, *args, **kwargs):
res = method(self, *args, **kwargs)
return [data.encode() for data in res]
return _method
class RepoDataQueryResult(osc.util.packagequery.PackageQueryResult):
"""PackageQueryResult that reads in data from the repodata directory files."""
def __init__(self, directory, element):
"""Creates a RepoDataQueryResult from the a package Element under a metadata
Element in a primary.xml file.
@param directory repository directory path. Used to convert relative
paths to full paths.
@param element package Element
"""
self.__directory = os.path.abspath(directory)
self.__element = element
def __formatElement(self):
return self.__element.find(namespace("common") + "format")
def __parseEntry(self, element):
entry = element.get("name")
flags = element.get("flags")
if flags is not None:
version = element.get("ver")
operator = OPERATOR_BY_FLAGS[flags]
entry += " %s %s" % (operator, version)
release = element.get("rel")
if release is not None:
entry += "-%s" % release
return entry
def __parseEntryCollection(self, collection):
formatElement = self.__formatElement()
collectionElement = formatElement.find(namespace("rpm") + collection)
entries = []
if collectionElement is not None:
for entryElement in collectionElement.findall(namespace("rpm") +
"entry"):
entry = self.__parseEntry(entryElement)
entries.append(entry)
return entries
def __versionElement(self):
return self.__element.find(namespace("common") + "version")
@_to_bytes_or_None
def arch(self):
return self.__element.find(namespace("common") + "arch").text
@_to_bytes_or_None
def description(self):
return self.__element.find(namespace("common") + "description").text
def distribution(self):
return None
@_to_bytes_or_None
def epoch(self):
return self.__versionElement().get("epoch")
@_to_bytes_or_None
def name(self):
return self.__element.find(namespace("common") + "name").text
def path(self):
locationElement = self.__element.find(namespace("common") + "location")
relativePath = locationElement.get("href")
absolutePath = os.path.join(self.__directory, relativePath)
return absolutePath
@_to_bytes_list
def provides(self):
return self.__parseEntryCollection("provides")
@_to_bytes_or_None
def release(self):
return self.__versionElement().get("rel")
@_to_bytes_list
def requires(self):
return self.__parseEntryCollection("requires")
@_to_bytes_list
def conflicts(self):
return self.__parseEntryCollection('conflicts')
@_to_bytes_list
def obsoletes(self):
return self.__parseEntryCollection('obsoletes')
@_to_bytes_list
def recommends(self):
return self.__parseEntryCollection('recommends')
@_to_bytes_list
def suggests(self):
return self.__parseEntryCollection('suggests')
@_to_bytes_list
def supplements(self):
return self.__parseEntryCollection('supplements')
@_to_bytes_list
def enhances(self):
return self.__parseEntryCollection('enhances')
def canonname(self):
if self.release() is None:
release = None
else:
release = self.release()
return osc.util.rpmquery.RpmQuery.filename(self.name(), None,
self.version(), release, self.arch())
def gettag(self, tag):
# implement me, if needed
return None
def vercmp(self, other):
# if either self.epoch() or other.epoch() is None, the vercmp will do
# the correct thing because one is transformed into b'None' and the
# other one into b"b'<epoch>'" (and 'b' is greater than 'N')
res = osc.util.rpmquery.RpmQuery.rpmvercmp(str(self.epoch()).encode(), str(other.epoch()).encode())
if res != 0:
return res
res = osc.util.rpmquery.RpmQuery.rpmvercmp(self.version(), other.version())
if res != 0:
return res
res = osc.util.rpmquery.RpmQuery.rpmvercmp(self.release(), other.release())
return res
@_to_bytes_or_None
def version(self):
return self.__versionElement().get("ver")