mirror of
https://github.com/openSUSE/osc.git
synced 2024-11-10 06:46:15 +01:00
- added the following new modules:
* util/packagequery.py: it's used to query a RPM or DEB package. It also contains a base class for all package types (PackageQuery()) * util/debquery.py: query a DEB package (name, version, release, provides, requires etc.) - adapted util/rpmquery.py to use PackageQuery() as a base class - minor changes in util/ar.py
This commit is contained in:
parent
572b790345
commit
4bb893a114
@ -1 +1 @@
|
||||
__all__ = ['ar', 'cpio', 'rpmquery']
|
||||
__all__ = ['ar', 'cpio', 'debquery', 'packagequery', 'rpmquery']
|
||||
|
@ -87,14 +87,22 @@ class Ar:
|
||||
hdr_len = 60
|
||||
hdr_pat = re.compile('^(.{16})(.{12})(.{6})(.{6})(.{8})(.{10})(.{2})', re.DOTALL)
|
||||
|
||||
def __init__(self, fn):
|
||||
self.filename = fn
|
||||
# file object: will be closed in __del__()
|
||||
self.__file = None
|
||||
def __init__(self, fn = None, fh = None):
|
||||
if fn == None and fh == None:
|
||||
raise ArError('either \'fn\' or \'fh\' must be != None')
|
||||
if fh != None:
|
||||
self.__file = fh
|
||||
self.__closefile = False
|
||||
self.filename = fh.name
|
||||
else:
|
||||
# file object: will be closed in __del__()
|
||||
self.__file = None
|
||||
self.__closefile = True
|
||||
self.filename = fn
|
||||
self._init_datastructs()
|
||||
|
||||
def __del__(self):
|
||||
if self.__file:
|
||||
if self.__file and self.__closefile:
|
||||
self.__file.close()
|
||||
|
||||
def _init_datastructs(self):
|
||||
|
98
osc/util/debquery.py
Normal file
98
osc/util/debquery.py
Normal file
@ -0,0 +1,98 @@
|
||||
import ar
|
||||
import re
|
||||
import tarfile
|
||||
import packagequery
|
||||
|
||||
class DebError(packagequery.PackageError):
|
||||
pass
|
||||
|
||||
class DebQuery(packagequery.PackageQuery):
|
||||
def __init__(self, fh):
|
||||
self.__file = fh
|
||||
self.fields = {}
|
||||
|
||||
def read(self):
|
||||
arfile = ar.Ar(fh = self.__file)
|
||||
arfile.read()
|
||||
debbin = arfile.get_file('debian-binary')
|
||||
if debbin is None:
|
||||
raise DebError('no debian binary')
|
||||
if debbin.read() != '2.0\n':
|
||||
raise DebError('invalid debian binary format')
|
||||
control = arfile.get_file('control.tar.gz')
|
||||
if control is None:
|
||||
raise DebError('missing control.tar.gz')
|
||||
tar = tarfile.open(fileobj = control)
|
||||
try:
|
||||
control = tar.extractfile('./control')
|
||||
except KeyError:
|
||||
raise DebError('missing \'control\' file in control.tar.gz')
|
||||
self.__parse_control(control)
|
||||
|
||||
def __parse_control(self, control):
|
||||
data = control.readline().strip()
|
||||
while data:
|
||||
field, val = re.split(':\s*', data.strip(), 1)
|
||||
data = control.readline()
|
||||
while data and re.match('\s+', data):
|
||||
val += '\n' + data.strip()
|
||||
data = control.readline().rstrip()
|
||||
# a hyphen is not allowed in dict keys
|
||||
self.fields[field.replace('-', '_').lower()] = val
|
||||
versrel = self.fields['version'].rsplit('-', 1)
|
||||
if len(versrel) == 2:
|
||||
self.fields['version'] = versrel[0]
|
||||
self.fields['release'] = versrel[1]
|
||||
else:
|
||||
self.fields['release'] = None
|
||||
self.fields['provides'] = [ i.strip() for i in re.split(',\s*', self.fields.get('provides', '')) if i ]
|
||||
self.fields['depends'] = [ i.strip() for i in re.split(',\s*', self.fields.get('depends', '')) if i ]
|
||||
self.fields['pre_depends'] = [ i.strip() for i in re.split(',\s*', self.fields.get('pre_depends', '')) if i ]
|
||||
# add self provides entry
|
||||
self.fields['provides'].append('%s = %s' % (self.name(), '-'.join(versrel)))
|
||||
|
||||
def name(self):
|
||||
return self.fields['package']
|
||||
|
||||
def version(self):
|
||||
return self.fields['version']
|
||||
|
||||
def release(self):
|
||||
return self.fields['release']
|
||||
|
||||
def arch(self):
|
||||
return self.fields['architecture']
|
||||
|
||||
def description(self):
|
||||
return self.fields['description']
|
||||
|
||||
def provides(self):
|
||||
return self.fields['provides']
|
||||
|
||||
def requires(self):
|
||||
return self.fields['depends']
|
||||
|
||||
def getTag(self, num):
|
||||
return self.fields.get(num, None)
|
||||
|
||||
@staticmethod
|
||||
def query(filename):
|
||||
f = open(filename, 'rb')
|
||||
debq = DebQuery(f)
|
||||
debq.read()
|
||||
f.close()
|
||||
return debq
|
||||
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
try:
|
||||
debq = DebQuery.query(sys.argv[1])
|
||||
except DebError, e:
|
||||
print e.msg
|
||||
sys.exit(2)
|
||||
print debq.name(), debq.version(), debq.release(), debq.arch()
|
||||
print debq.description()
|
||||
print '##########'
|
||||
print '\n'.join(debq.provides())
|
||||
print '##########'
|
||||
print '\n'.join(debq.requires())
|
68
osc/util/packagequery.py
Normal file
68
osc/util/packagequery.py
Normal file
@ -0,0 +1,68 @@
|
||||
class PackageError(Exception):
|
||||
"""base class for all package related errors"""
|
||||
def __init__(self, msg):
|
||||
Exception.__init__(self)
|
||||
self.msg = msg
|
||||
|
||||
class PackageQuery():
|
||||
"""abstract base class for all package types"""
|
||||
def read(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def name(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def version(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def release(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def arch(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def description(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def provides(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def requires(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def getTag(self):
|
||||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def query(filename):
|
||||
f = open(filename, 'rb')
|
||||
magic = f.read(7)
|
||||
f.seek(0)
|
||||
pkgq = None
|
||||
if magic[:4] == '\xed\xab\xee\xdb':
|
||||
import rpmquery
|
||||
pkgq = rpmquery.RpmQuery(f)
|
||||
elif magic == '!<arch>':
|
||||
import debquery
|
||||
pkgq = debquery.DebQuery(f)
|
||||
else:
|
||||
raise PackageError('unsupported package type. magic: \'%s\' (%s)' % (magic, filename))
|
||||
pkgq.read()
|
||||
f.close()
|
||||
return pkgq
|
||||
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
try:
|
||||
pkgq = PackageQuery.query(sys.argv[1])
|
||||
except PackageError, e:
|
||||
print e.msg
|
||||
sys.exit(2)
|
||||
print pkgq.name()
|
||||
print pkgq.version()
|
||||
print pkgq.release()
|
||||
print pkgq.description()
|
||||
print '##########'
|
||||
print '\n'.join(pkgq.provides())
|
||||
print '##########'
|
||||
print '\n'.join(pkgq.requires())
|
@ -1,10 +1,9 @@
|
||||
import os
|
||||
import sys
|
||||
import struct
|
||||
import packagequery
|
||||
|
||||
class RpmError(Exception):
|
||||
def __init__(self, msg):
|
||||
self.msg = msg
|
||||
class RpmError(packagequery.PackageError):
|
||||
pass
|
||||
|
||||
class RpmHeaderError(RpmError):
|
||||
pass
|
||||
@ -45,7 +44,7 @@ class RpmHeaderEntry():
|
||||
self.count = count
|
||||
self.data = None
|
||||
|
||||
class RpmQuery():
|
||||
class RpmQuery(packagequery.PackageQuery):
|
||||
LEAD_SIZE = 96
|
||||
LEAD_MAGIC = 0xedabeedb
|
||||
HEADER_MAGIC = 0x8eade801
|
||||
@ -211,10 +210,12 @@ def unpack_string(data):
|
||||
return val
|
||||
|
||||
if __name__ == '__main__':
|
||||
import sys
|
||||
try:
|
||||
rpmq = RpmQuery.query(sys.argv[1])
|
||||
except RpmError, e:
|
||||
print e.msg
|
||||
sys.exit(2)
|
||||
print rpmq.name(), rpmq.version(), rpmq.release(), rpmq.arch(), rpmq.url()
|
||||
print rpmq.summary()
|
||||
print rpmq.description()
|
||||
|
Loading…
Reference in New Issue
Block a user