1
0
mirror of https://github.com/openSUSE/osc.git synced 2025-01-24 22:06:14 +01:00
Introduced new PackageQueryResult class and adapted existing
modules accordingly.
This commit is contained in:
Marcus Huewe 2015-06-23 17:52:37 +02:00
parent 84ba8c6e17
commit a80b286ac9
5 changed files with 75 additions and 47 deletions

View File

@ -10,7 +10,7 @@ import subprocess
class ArchError(packagequery.PackageError):
pass
class ArchQuery(packagequery.PackageQuery):
class ArchQuery(packagequery.PackageQuery, packagequery.PackageQueryResult):
def __init__(self, fh):
self.__file = fh
self.__path = os.path.abspath(fh.name)
@ -36,6 +36,7 @@ class ArchQuery(packagequery.PackageQuery):
if self_provides:
prv = '%s = %s' % (self.name(), self.fields['pkgver'][0])
self.fields.setdefault('provides', []).append(prv)
return self
def vercmp(self, archq):
res = cmp(int(self.epoch()), int(archq.epoch()))
@ -98,6 +99,10 @@ class ArchQuery(packagequery.PackageQuery):
pkgver = self.fields['pkgver'][0] if 'pkgver' in self.fields else None
return self.name() + '-' + pkgver + '-' + self.arch() + '.' + self.pkgsuffix
def gettag(self, tag):
# implement me, if needed
return None
@staticmethod
def query(filename, all_tags = False, *extra_tags):
f = open(filename, 'rb')

View File

@ -10,7 +10,7 @@ from . import packagequery
class DebError(packagequery.PackageError):
pass
class DebQuery(packagequery.PackageQuery):
class DebQuery(packagequery.PackageQuery, packagequery.PackageQueryResult):
default_tags = ('package', 'version', 'release', 'epoch', 'architecture', 'description',
'provides', 'depends', 'pre_depends', 'conflicts', 'breaks')
@ -43,6 +43,7 @@ class DebQuery(packagequery.PackageQuery):
except KeyError:
raise DebError(self.__path, 'missing \'control\' file in control.tar.gz')
self.__parse_control(control, all_tags, self_provides, *extra_tags)
return self
def __parse_control(self, control, all_tags=False, self_provides=True, *extra_tags):
data = control.readline().strip()

View File

@ -48,8 +48,52 @@ class PackageQueries(dict):
class PackageQuery:
"""abstract base class for all package types"""
def read(self, all_tags = False, *extra_tags):
"""Returns a PackageQueryResult instance"""
raise NotImplementedError
# Hmmm... this should be a module function (inherting this stuff
# does not make much sense) (the same is true for the queryhdrmd5 method)
@staticmethod
def query(filename, all_tags=False, extra_rpmtags=(), extra_debtags=(), self_provides=True):
f = open(filename, 'rb')
magic = f.read(7)
f.seek(0)
extra_tags = ()
pkgquery = None
if magic[:4] == '\xed\xab\xee\xdb':
from . import rpmquery
pkgquery = rpmquery.RpmQuery(f)
extra_tags = extra_rpmtags
elif magic == '!<arch>':
from . import debquery
pkgquery = debquery.DebQuery(f)
extra_tags = extra_debtags
elif magic[:5] == '<?xml':
f.close()
return None
elif magic[:5] == '\375\067zXZ' or magic[:2] == '\037\213':
from . import archquery
pkgquery = archquery.ArchQuery(f)
else:
raise PackageError(filename, 'unsupported package type. magic: \'%s\'' % magic)
pkgqueryresult = pkgquery.read(all_tags, self_provides, *extra_tags)
f.close()
return pkgqueryresult
@staticmethod
def queryhdrmd5(filename):
f = open(filename, 'rb')
magic = f.read(7)
f.seek(0)
if magic[:4] == '\xed\xab\xee\xdb':
from . import rpmquery
f.close()
return rpmquery.RpmQuery.queryhdrmd5(filename)
return None
class PackageQueryResult:
"""abstract base class that represents the result of a package query"""
def name(self):
raise NotImplementedError
@ -83,7 +127,7 @@ class PackageQuery:
def obsoletes(self):
raise NotImplementedError
def gettag(self):
def gettag(self, tag):
raise NotImplementedError
def vercmp(self, pkgquery):
@ -99,44 +143,6 @@ class PackageQuery:
evr = epoch + ":" + evr
return evr
@staticmethod
def query(filename, all_tags=False, extra_rpmtags=(), extra_debtags=(), self_provides=True):
f = open(filename, 'rb')
magic = f.read(7)
f.seek(0)
extra_tags = ()
pkgquery = None
if magic[:4] == '\xed\xab\xee\xdb':
from . import rpmquery
pkgquery = rpmquery.RpmQuery(f)
extra_tags = extra_rpmtags
elif magic == '!<arch>':
from . import debquery
pkgquery = debquery.DebQuery(f)
extra_tags = extra_debtags
elif magic[:5] == '<?xml':
f.close()
return None
elif magic[:5] == '\375\067zXZ' or magic[:2] == '\037\213':
from . import archquery
pkgquery = archquery.ArchQuery(f)
else:
raise PackageError(filename, 'unsupported package type. magic: \'%s\'' % magic)
pkgquery.read(all_tags, self_provides, *extra_tags)
f.close()
return pkgquery
@staticmethod
def queryhdrmd5(filename):
f = open(filename, 'rb')
magic = f.read(7)
f.seek(0)
if magic[:4] == '\xed\xab\xee\xdb':
from . import rpmquery
f.close()
return rpmquery.RpmQuery.queryhdrmd5(filename)
return None
if __name__ == '__main__':
import sys
try:

View File

@ -13,6 +13,7 @@ except ImportError:
# project modules
import osc.util.rpmquery
import osc.util.packagequery
def namespace(name):
return "{http://linux.duke.edu/metadata/%s}" % name
@ -52,7 +53,7 @@ def queries(directory):
@param directory path to a repository directory (parent directory of
repodata directory)
@return list of RepoDataQuery instances
@return list of RepoDataQueryResult instances
@raise IOError if repomd.xml contains no primary location
"""
path = primaryPath(directory)
@ -63,16 +64,16 @@ def queries(directory):
packageQueries = []
for packageElement in root:
packageQuery = RepoDataQuery(directory, packageElement)
packageQuery = RepoDataQueryResult(directory, packageElement)
packageQueries.append(packageQuery)
return packageQueries
class RepoDataQuery(object):
"""PackageQuery that reads in data from the repodata directory files."""
class RepoDataQueryResult(osc.util.packagequery.PackageQueryResult):
"""PackageQueryResult that reads in data from the repodata directory files."""
def __init__(self, directory, element):
"""Creates a RepoDataQuery from the a package Element under a metadata
"""Creates a RepoDataQueryResult from the a package Element under a metadata
Element in a primary.xml file.
@param directory repository directory path. Used to convert relative
@ -147,6 +148,20 @@ class RepoDataQuery(object):
def requires(self):
return self.__parseEntryCollection("requires")
def conflicts(self):
return self.__parseEntryCollection('conflicts')
def obsoletes(self):
return self.__parseEntryCollection('obsoletes')
def canonname(self):
return osc.util.rpmquery.RpmQuery.filename(self.name(), None,
self.version(), self.release(), self.arch())
def gettag(self, tag):
# implement me, if needed
return None
def vercmp(self, other):
res = osc.util.rpmquery.RpmQuery.rpmvercmp(str(self.epoch()), str(other.epoch()))
if res != 0:

View File

@ -48,7 +48,7 @@ class RpmHeaderEntry:
self.count = count
self.data = None
class RpmQuery(packagequery.PackageQuery):
class RpmQuery(packagequery.PackageQuery, packagequery.PackageQueryResult):
LEAD_SIZE = 96
LEAD_MAGIC = 0xedabeedb
HEADER_MAGIC = 0x8eade801
@ -101,6 +101,7 @@ class RpmQuery(packagequery.PackageQuery):
try: # this may fail for -debug* packages
self.__read_data(i, data)
except: pass
return self
def __read_lead(self):
data = self.__file.read(self.LEAD_SIZE)