mirror of
https://github.com/openSUSE/osc.git
synced 2025-01-19 03:46:14 +01:00
ebb2f2ee0d
The following abstract methods are added to the PackageQueryResult class: recommends(), suggests(), supplements(), and enhances(). Note that not all package/metadata formats have a notion of these weak dependencies. rpm rpmmd deb arch recommends x x x suggests x x x x supplements x x enhances x x x (where "x" represents "supported"). In case of an unsupported weak dependency, the implementation returns an empty list. We need the weak dependency support in order to fix #363 ("osc build -p ../rpms/tw doesnt send recommends to the server which makes client side build behave differently to server side build").
189 lines
5.9 KiB
Python
189 lines
5.9 KiB
Python
"""Module for reading repodata directory (created with createrepo) for package
|
|
information instead of scanning individual rpms."""
|
|
|
|
# standard modules
|
|
import gzip
|
|
import os.path
|
|
|
|
# cElementTree can be standard or 3rd-party depending on python version
|
|
try:
|
|
from xml.etree import cElementTree as ET
|
|
except ImportError:
|
|
import cElementTree as ET
|
|
|
|
# project modules
|
|
import osc.util.rpmquery
|
|
import osc.util.packagequery
|
|
|
|
def namespace(name):
|
|
return "{http://linux.duke.edu/metadata/%s}" % name
|
|
|
|
OPERATOR_BY_FLAGS = {
|
|
"EQ" : "=",
|
|
"LE" : "<=",
|
|
"GE" : ">="
|
|
}
|
|
|
|
def primaryPath(directory):
|
|
"""Returns path to the primary repository data file.
|
|
|
|
@param directory repository directory that contains the repodata subdirectory
|
|
@return str path to primary repository data file
|
|
@raise IOError if repomd.xml contains no primary location
|
|
"""
|
|
metaDataPath = os.path.join(directory, "repodata", "repomd.xml")
|
|
elementTree = ET.parse(metaDataPath)
|
|
root = elementTree.getroot()
|
|
|
|
for dataElement in root:
|
|
if dataElement.get("type") == "primary":
|
|
locationElement = dataElement.find(namespace("repo") + "location")
|
|
# even though the repomd.xml file is under repodata, the location a
|
|
# attribute is relative to parent directory (directory).
|
|
primaryPath = os.path.join(directory, locationElement.get("href"))
|
|
break
|
|
else:
|
|
raise IOError("'%s' contains no primary location" % metaDataPath)
|
|
|
|
return primaryPath
|
|
|
|
def queries(directory):
|
|
"""Returns a list of RepoDataQueries constructed from the repodata under
|
|
the directory.
|
|
|
|
@param directory path to a repository directory (parent directory of
|
|
repodata directory)
|
|
@return list of RepoDataQueryResult instances
|
|
@raise IOError if repomd.xml contains no primary location
|
|
"""
|
|
path = primaryPath(directory)
|
|
|
|
gunzippedPrimary = gzip.GzipFile(path)
|
|
elementTree = ET.parse(gunzippedPrimary)
|
|
root = elementTree.getroot()
|
|
|
|
packageQueries = []
|
|
for packageElement in root:
|
|
packageQuery = RepoDataQueryResult(directory, packageElement)
|
|
packageQueries.append(packageQuery)
|
|
|
|
return packageQueries
|
|
|
|
class RepoDataQueryResult(osc.util.packagequery.PackageQueryResult):
|
|
"""PackageQueryResult that reads in data from the repodata directory files."""
|
|
|
|
def __init__(self, directory, element):
|
|
"""Creates a RepoDataQueryResult from the a package Element under a metadata
|
|
Element in a primary.xml file.
|
|
|
|
@param directory repository directory path. Used to convert relative
|
|
paths to full paths.
|
|
@param element package Element
|
|
"""
|
|
self.__directory = os.path.abspath(directory)
|
|
self.__element = element
|
|
|
|
def __formatElement(self):
|
|
return self.__element.find(namespace("common") + "format")
|
|
|
|
def __parseEntry(self, element):
|
|
entry = element.get("name")
|
|
flags = element.get("flags")
|
|
|
|
if flags is not None:
|
|
version = element.get("ver")
|
|
operator = OPERATOR_BY_FLAGS[flags]
|
|
entry += " %s %s" % (operator, version)
|
|
|
|
release = element.get("rel")
|
|
if release is not None:
|
|
entry += "-%s" % release
|
|
|
|
return entry
|
|
|
|
def __parseEntryCollection(self, collection):
|
|
formatElement = self.__formatElement()
|
|
collectionElement = formatElement.find(namespace("rpm") + collection)
|
|
|
|
entries = []
|
|
if collectionElement is not None:
|
|
for entryElement in collectionElement.findall(namespace("rpm") +
|
|
"entry"):
|
|
entry = self.__parseEntry(entryElement)
|
|
entries.append(entry)
|
|
|
|
return entries
|
|
|
|
def __versionElement(self):
|
|
return self.__element.find(namespace("common") + "version")
|
|
|
|
def arch(self):
|
|
return self.__element.find(namespace("common") + "arch").text
|
|
|
|
def description(self):
|
|
return self.__element.find(namespace("common") + "description").text
|
|
|
|
def distribution(self):
|
|
return None
|
|
|
|
def epoch(self):
|
|
return self.__versionElement().get("epoch")
|
|
|
|
def name(self):
|
|
return self.__element.find(namespace("common") + "name").text
|
|
|
|
def path(self):
|
|
locationElement = self.__element.find(namespace("common") + "location")
|
|
relativePath = locationElement.get("href")
|
|
absolutePath = os.path.join(self.__directory, relativePath)
|
|
|
|
return absolutePath
|
|
|
|
def provides(self):
|
|
return self.__parseEntryCollection("provides")
|
|
|
|
def release(self):
|
|
return self.__versionElement().get("rel")
|
|
|
|
def requires(self):
|
|
return self.__parseEntryCollection("requires")
|
|
|
|
def conflicts(self):
|
|
return self.__parseEntryCollection('conflicts')
|
|
|
|
def obsoletes(self):
|
|
return self.__parseEntryCollection('obsoletes')
|
|
|
|
def recommends(self):
|
|
return self.__parseEntryCollection('recommends')
|
|
|
|
def suggests(self):
|
|
return self.__parseEntryCollection('suggests')
|
|
|
|
def supplements(self):
|
|
return self.__parseEntryCollection('supplements')
|
|
|
|
def enhances(self):
|
|
return self.__parseEntryCollection('enhances')
|
|
|
|
def canonname(self):
|
|
return osc.util.rpmquery.RpmQuery.filename(self.name(), None,
|
|
self.version(), self.release(), self.arch())
|
|
|
|
def gettag(self, tag):
|
|
# implement me, if needed
|
|
return None
|
|
|
|
def vercmp(self, other):
|
|
res = osc.util.rpmquery.RpmQuery.rpmvercmp(str(self.epoch()), str(other.epoch()))
|
|
if res != 0:
|
|
return res
|
|
res = osc.util.rpmquery.RpmQuery.rpmvercmp(self.version(), other.version())
|
|
if res != 0:
|
|
return res
|
|
res = osc.util.rpmquery.RpmQuery.rpmvercmp(self.release(), other.release())
|
|
return res
|
|
|
|
def version(self):
|
|
return self.__versionElement().get("ver")
|