1
0
mirror of https://github.com/openSUSE/osc.git synced 2025-01-14 17:46:17 +01:00
github.com_openSUSE_osc/osc/util/repodata.py

233 lines
7.0 KiB
Python
Raw Normal View History

"""Module for reading repodata directory (created with createrepo) for package
information instead of scanning individual rpms."""
# standard modules
import gzip
import os.path
try:
# Works up to Python 3.8, needed for Python < 3.3 (inc 2.7)
from xml.etree import cElementTree as ET
except ImportError:
# will import a fast implementation from 3.3 onwards, needed
# for 3.9+
from xml.etree import ElementTree as ET
# project modules
import osc.util.rpmquery
import osc.util.packagequery
def namespace(name):
return "{http://linux.duke.edu/metadata/%s}" % name
OPERATOR_BY_FLAGS = {
"EQ" : "=",
"LE" : "<=",
"GE" : ">=",
"LT" : "<",
"GT" : ">"
}
def primaryPath(directory):
"""Returns path to the primary repository data file.
2010-02-28 02:30:13 +01:00
@param directory repository directory that contains the repodata subdirectory
@return str path to primary repository data file
@raise IOError if repomd.xml contains no primary location
"""
metaDataPath = os.path.join(directory, "repodata", "repomd.xml")
elementTree = ET.parse(metaDataPath)
root = elementTree.getroot()
2010-02-28 02:30:13 +01:00
for dataElement in root:
if dataElement.get("type") == "primary":
locationElement = dataElement.find(namespace("repo") + "location")
# even though the repomd.xml file is under repodata, the location a
# attribute is relative to parent directory (directory).
primaryPath = os.path.join(directory, locationElement.get("href"))
break
else:
raise IOError("'%s' contains no primary location" % metaDataPath)
2010-02-28 02:30:13 +01:00
return primaryPath
def queries(directory):
"""Returns a list of RepoDataQueries constructed from the repodata under
the directory.
2010-02-28 02:30:13 +01:00
@param directory path to a repository directory (parent directory of
repodata directory)
@return list of RepoDataQueryResult instances
@raise IOError if repomd.xml contains no primary location
"""
path = primaryPath(directory)
2010-02-28 02:30:13 +01:00
gunzippedPrimary = gzip.GzipFile(path)
elementTree = ET.parse(gunzippedPrimary)
root = elementTree.getroot()
2010-02-28 02:30:13 +01:00
packageQueries = []
for packageElement in root:
packageQuery = RepoDataQueryResult(directory, packageElement)
packageQueries.append(packageQuery)
2010-02-28 02:30:13 +01:00
return packageQueries
def _to_bytes_or_None(method):
def _method(self, *args, **kwargs):
res = method(self, *args, **kwargs)
if res is None:
return None
return res.encode()
return _method
def _to_bytes_list(method):
def _method(self, *args, **kwargs):
res = method(self, *args, **kwargs)
return [data.encode() for data in res]
return _method
class RepoDataQueryResult(osc.util.packagequery.PackageQueryResult):
"""PackageQueryResult that reads in data from the repodata directory files."""
2010-02-28 02:30:13 +01:00
def __init__(self, directory, element):
"""Creates a RepoDataQueryResult from the a package Element under a metadata
Element in a primary.xml file.
2010-02-28 02:30:13 +01:00
@param directory repository directory path. Used to convert relative
paths to full paths.
@param element package Element
"""
self.__directory = os.path.abspath(directory)
self.__element = element
2010-02-28 02:30:13 +01:00
def __formatElement(self):
return self.__element.find(namespace("common") + "format")
2010-02-28 02:30:13 +01:00
def __parseEntry(self, element):
entry = element.get("name")
flags = element.get("flags")
2010-02-28 02:30:13 +01:00
if flags is not None:
version = element.get("ver")
operator = OPERATOR_BY_FLAGS[flags]
entry += " %s %s" % (operator, version)
2010-02-28 02:30:13 +01:00
release = element.get("rel")
if release is not None:
2010-02-05 15:14:48 +01:00
entry += "-%s" % release
2010-02-28 02:30:13 +01:00
return entry
2010-02-28 02:30:13 +01:00
def __parseEntryCollection(self, collection):
formatElement = self.__formatElement()
collectionElement = formatElement.find(namespace("rpm") + collection)
2010-02-28 02:30:13 +01:00
entries = []
if collectionElement is not None:
for entryElement in collectionElement.findall(namespace("rpm") +
"entry"):
entry = self.__parseEntry(entryElement)
entries.append(entry)
2010-02-28 02:30:13 +01:00
return entries
2010-02-28 02:30:13 +01:00
def __versionElement(self):
return self.__element.find(namespace("common") + "version")
2010-02-28 02:30:13 +01:00
@_to_bytes_or_None
def arch(self):
return self.__element.find(namespace("common") + "arch").text
2010-02-28 02:30:13 +01:00
@_to_bytes_or_None
def description(self):
return self.__element.find(namespace("common") + "description").text
2010-02-28 02:30:13 +01:00
def distribution(self):
return None
2010-02-28 02:30:13 +01:00
@_to_bytes_or_None
def epoch(self):
return self.__versionElement().get("epoch")
2010-02-28 02:30:13 +01:00
@_to_bytes_or_None
def name(self):
return self.__element.find(namespace("common") + "name").text
2010-02-28 02:30:13 +01:00
def path(self):
locationElement = self.__element.find(namespace("common") + "location")
relativePath = locationElement.get("href")
absolutePath = os.path.join(self.__directory, relativePath)
2010-02-28 02:30:13 +01:00
return absolutePath
2010-02-28 02:30:13 +01:00
@_to_bytes_list
def provides(self):
return self.__parseEntryCollection("provides")
2010-02-28 02:30:13 +01:00
@_to_bytes_or_None
def release(self):
return self.__versionElement().get("rel")
2010-02-28 02:30:13 +01:00
@_to_bytes_list
def requires(self):
return self.__parseEntryCollection("requires")
2010-02-28 02:30:13 +01:00
@_to_bytes_list
def conflicts(self):
return self.__parseEntryCollection('conflicts')
@_to_bytes_list
def obsoletes(self):
return self.__parseEntryCollection('obsoletes')
@_to_bytes_list
def recommends(self):
return self.__parseEntryCollection('recommends')
@_to_bytes_list
def suggests(self):
return self.__parseEntryCollection('suggests')
@_to_bytes_list
def supplements(self):
return self.__parseEntryCollection('supplements')
@_to_bytes_list
def enhances(self):
return self.__parseEntryCollection('enhances')
def canonname(self):
if self.release() is None:
release = None
else:
release = self.release()
return osc.util.rpmquery.RpmQuery.filename(self.name(), None,
self.version(), release, self.arch())
def gettag(self, tag):
# implement me, if needed
return None
def vercmp(self, other):
# if either self.epoch() or other.epoch() is None, the vercmp will do
# the correct thing because one is transformed into b'None' and the
# other one into b"b'<epoch>'" (and 'b' is greater than 'N')
res = osc.util.rpmquery.RpmQuery.rpmvercmp(str(self.epoch()).encode(), str(other.epoch()).encode())
if res != 0:
return res
res = osc.util.rpmquery.RpmQuery.rpmvercmp(self.version(), other.version())
if res != 0:
return res
res = osc.util.rpmquery.RpmQuery.rpmvercmp(self.release(), other.release())
return res
2010-02-28 02:30:13 +01:00
@_to_bytes_or_None
def version(self):
return self.__versionElement().get("ver")