mirror of
https://github.com/openSUSE/osc.git
synced 2024-11-10 14:56:14 +01:00
96210b6dac
Any directory passed to --prefer-pkgs will be searched for a repodata directory. If the directory does not contain a repodata directory, then each ancestor directory is checked. This allows for the user error of specifying an individual architecture directory (e.g. x86_64) instead of the parent repository directory that contains the repodata: repository/ x86_64/ *.rpm repodata/ *.xml.gz The use case for this feature is it allows snapshots of the OBS repositories to be offloaded to an network-attached filesystem. repodata directories are used as the xml.gz files are faster to read than the 100s of rpms in a given snapshot. These snapshots are used to track older rpm sets that may be deployed for testing.
118 lines
3.6 KiB
Python
118 lines
3.6 KiB
Python
class PackageError(Exception):
|
|
"""base class for all package related errors"""
|
|
def __init__(self, msg):
|
|
Exception.__init__(self)
|
|
self.msg = msg
|
|
|
|
class PackageQueries(dict):
|
|
"""Dict of package name keys and package query values. When assigning a
|
|
package query, to a name, the package is evaluated to see if it matches the
|
|
wanted architecture and if it has a greater version than the current value.
|
|
"""
|
|
|
|
# map debian arches to common obs arches
|
|
architectureMap = {'i386': ['i586', 'i686'], 'amd64': ['x86_64']}
|
|
|
|
def __init__(self, wantedArchitecture):
|
|
self.wantedArchitecture = wantedArchitecture
|
|
super(PackageQueries, self).__init__()
|
|
|
|
def add(self, query):
|
|
"""Adds package query to dict if it is of the correct architecture and
|
|
is newer (has a greater version) than the currently assigned package.
|
|
|
|
@param a PackageQuery
|
|
"""
|
|
self.__setitem__(query.name(), query)
|
|
|
|
def __setitem__(self, name, query):
|
|
if name != query.name():
|
|
raise ValueError("key '%s' does not match "
|
|
"package query name '%s'" % (name, query.name()))
|
|
|
|
architecture = query.arch()
|
|
|
|
if (architecture in [self.wantedArchitecture, 'noarch', 'all'] or
|
|
self.wantedArchitecture in self.architectureMap.get(architecture,
|
|
[])):
|
|
currentQuery = self.get(name)
|
|
|
|
# if current query does not exist or is older than this new query
|
|
if currentQuery is None or currentQuery.vercmp(query) <= 0:
|
|
super(PackageQueries, self).__setitem__(name, query)
|
|
|
|
class PackageQuery:
|
|
"""abstract base class for all package types"""
|
|
def read(self, all_tags = False, *extra_tags):
|
|
raise NotImplementedError
|
|
|
|
def name(self):
|
|
raise NotImplementedError
|
|
|
|
def version(self):
|
|
raise NotImplementedError
|
|
|
|
def release(self):
|
|
raise NotImplementedError
|
|
|
|
def epoch(self):
|
|
raise NotImplementedError
|
|
|
|
def arch(self):
|
|
raise NotImplementedError
|
|
|
|
def description(self):
|
|
raise NotImplementedError
|
|
|
|
def path(self):
|
|
raise NotImplementedError
|
|
|
|
def provides(self):
|
|
raise NotImplementedError
|
|
|
|
def requires(self):
|
|
raise NotImplementedError
|
|
|
|
def getTag(self):
|
|
raise NotImplementedError
|
|
|
|
def vercmp(self, pkgq):
|
|
raise NotImplementedError
|
|
|
|
@staticmethod
|
|
def query(filename, all_tags = False, extra_rpmtags = (), extra_debtags = ()):
|
|
f = open(filename, 'rb')
|
|
magic = f.read(7)
|
|
f.seek(0)
|
|
extra_tags = ()
|
|
pkgq = None
|
|
if magic[:4] == '\xed\xab\xee\xdb':
|
|
import rpmquery
|
|
pkgq = rpmquery.RpmQuery(f)
|
|
extra_tags = extra_rpmtags
|
|
elif magic == '!<arch>':
|
|
import debquery
|
|
pkgq = debquery.DebQuery(f)
|
|
extra_tags = extra_debtags
|
|
else:
|
|
raise PackageError('unsupported package type. magic: \'%s\' (%s)' % (magic, filename))
|
|
pkgq.read(all_tags, *extra_tags)
|
|
f.close()
|
|
return pkgq
|
|
|
|
if __name__ == '__main__':
|
|
import sys
|
|
try:
|
|
pkgq = PackageQuery.query(sys.argv[1])
|
|
except PackageError, e:
|
|
print e.msg
|
|
sys.exit(2)
|
|
print pkgq.name()
|
|
print pkgq.version()
|
|
print pkgq.release()
|
|
print pkgq.description()
|
|
print '##########'
|
|
print '\n'.join(pkgq.provides())
|
|
print '##########'
|
|
print '\n'.join(pkgq.requires())
|