1
0
mirror of https://github.com/openSUSE/osc.git synced 2025-01-14 01:26:23 +01:00

- rewrote Request class and friends

- rewrote Action class: instances only provide attributes for their specific type (for details see class doc).
  Renamed "dst_project" attribute to "tgt_project" and "dst_package" attribute to "tgt_package" (only affects
  types which have a <target /> element)
- added AbstractState class: Base class which represents state-like objects (<review />, <state />)
- rewrote ReviewState and RequestState classes
- rewrote Request class: apart from internal rewrites the format of the "__str__" and "list_view" methods
  slightly changed

Now it should be much easier to create new requests without constructing the
request xml by hand.
Example:
 r = Request()
 r.add_action('submit', src_project='foo', src_package='bar', tgt_project='targetprj', tgt_package='targetpkg')
 r.add_action('set_bugowner', tgt_project='foobar', person_name='buguser')
 r.add_action('delete', tgt_project='prj', tgt_package='deleteme')
 print r.to_str()
 ->
<request>
  <action type="submit">
    <source package="bar" project="foo" />
    <target package="targetpkg" project="targetprj" />
  </action>
  <action type="set_bugowner">
    <target project="foobar" />
    <person name="buguser" />
  </action>
  <action type="delete">
    <target package="deleteme" project="prj" />
  </action>
</request>
This commit is contained in:
Marcus Huewe 2010-12-30 02:30:37 +01:00
parent 8c26c74620
commit b2838dd88d

View File

@ -2149,245 +2149,364 @@ rev: %s
store_write_string(dir, '_osclib_version', __store_version__ + '\n') store_write_string(dir, '_osclib_version', __store_version__ + '\n')
return Package(dir, progress_obj=progress_obj, size_limit=size_limit) return Package(dir, progress_obj=progress_obj, size_limit=size_limit)
class ReviewState:
"""for objects to represent the review state in a request"""
def __init__(self, state=None, by_user=None, by_group=None, who=None, when=None, comment=None):
self.state = state
self.by_user = by_user
self.by_group = by_group
self.who = who
self.when = when
self.comment = comment
class RequestState: class AbstractState:
"""for objects to represent the "state" of a request""" """
def __init__(self, name=None, who=None, when=None, comment=None): Base class which represents state-like objects (<review />, <state />).
self.name = name """
self.who = who def __init__(self, tag):
self.when = when self.__tag = tag
self.comment = comment
def get_node_attrs(self):
"""return attributes for the tag/element"""
raise NotImplementedError()
def get_node_name(self):
"""return tag/element name"""
return self.__tag
def get_comment(self):
"""return data from <comment /> tag"""
raise NotImplementedError()
def to_xml(self):
"""serialize object to XML"""
root = ET.Element(self.get_node_name())
for attr in self.get_node_attrs():
val = getattr(self, attr)
if not val is None:
root.set(attr, val)
if self.get_comment():
ET.SubElement(root, 'comment').text = self.get_comment()
return root
def to_str(self):
"""return "pretty" XML data"""
root = self.to_xml()
xmlindent(root)
return ET.tostring(root)
class ReviewState(AbstractState):
"""Represents the review state in a request"""
def __init__(self, review_node):
if not review_node.get('state'):
raise oscerr.APIError('invalid review node (state attr expected): %s' % \
ET.tostring(review_node))
AbstractState.__init__(self, review_node.tag)
self.state = review_node.get('state')
self.by_user = review_node.get('by_user')
self.by_group = review_node.get('by_group')
self.who = review_node.get('who')
self.when = review_node.get('when')
self.comment = ''
if not review_node.find('comment') is None and \
review_node.find('comment').text:
self.comment = review_node.find('comment').text.strip()
def get_node_attrs(self):
return ('state', 'by_user', 'by_group', 'who', 'when')
def get_comment(self):
return self.comment
class RequestState(AbstractState):
"""Represents the state of a request"""
def __init__(self, state_node):
if not state_node.get('name'):
raise oscerr.APIError('invalid request state node (name attr expected): %s' % \
ET.tostring(state_node))
AbstractState.__init__(self, state_node.tag)
self.name = state_node.get('name')
self.who = state_node.get('who')
self.when = state_node.get('when')
self.comment = ''
if not state_node.find('comment') is None and \
state_node.find('comment').text:
self.comment = state_node.find('comment').text.strip()
def get_node_attrs(self):
return ('name', 'who', 'when')
def get_comment(self):
return self.comment
class Action: class Action:
"""represents an action""" """
def __init__(self, type, src_project, src_package, src_rev, dst_project, dst_package, src_update, role_person, role_group, role): Represents a <action /> element of a Request.
This class is quite common so that it can be used for all different
action types. Note: instances only provide attributes for their specific
type.
Examples:
r = Action('set_bugowner', tgt_project='foo', person_name='buguser')
# available attributes: r.type (== 'set_bugowner'), r.tgt_project (== 'foo'), r.tgt_package (== None)
r.to_str() ->
<action type="set_bugowner">
<target project="foo" />
<person name="buguser" />
</action>
##
r = Action('delete', tgt_project='foo', tgt_package='bar')
# available attributes: r.type (== 'delete'), r.tgt_project (== 'foo'), r.tgt_package (=='bar')
r.to_str() ->
<action type="delete">
<target package="bar" project="foo" />
</action>
"""
# allowed types + the corresponding (allowed) attributes
type_args = {'submit': ('src_project', 'src_package', 'src_rev', 'tgt_project', 'tgt_package', 'opt_sourceupdate', 'opt_updatelink'),
'add_role': ('tgt_project', 'tgt_package', 'person_name', 'person_role', 'group_name', 'group_role'),
'set_bugowner': ('tgt_project', 'tgt_package', 'person_name'),
'delete': ('tgt_project', 'tgt_package'),
'change_devel': ('src_project', 'src_package', 'tgt_project', 'tgt_package')}
# attribute prefix to element name map (only needed for abbreviated attributes)
prefix_to_elm = {'src': 'source', 'tgt': 'target', 'opt': 'options'}
def __init__(self, type, **kwargs):
if not type in Action.type_args.keys():
raise oscerr.WrongArgs('invalid action type: \'%s\'' % type)
self.type = type self.type = type
self.src_project = src_project for i in kwargs.keys():
self.src_package = src_package if not i in Action.type_args[type]:
self.src_rev = src_rev raise oscerr.WrongArgs('invalid argument: \'%s\'' % i)
self.dst_project = dst_project # set all type specific attributes
self.dst_package = dst_package for i in Action.type_args[type]:
self.src_update = src_update if kwargs.has_key(i):
self.role_person = role_person setattr(self, i, kwargs[i])
self.role_group = role_group else:
self.role = role setattr(self, i, None)
def to_xml(self):
"""
Serialize object to XML.
The xml tag names and attributes are constructed from the instance's attributes.
Example:
self.group_name -> tag name is "group", attribute name is "name"
self.src_project -> tag name is "source" (translated via prefix_to_elm dict),
attribute name is "project"
Attributes prefixed with "opt_" need a special handling, the resulting xml should
look like this: opt_updatelink -> <options><updatelink>value</updatelink></options>.
Attributes which are "None" will be skipped.
"""
root = ET.Element('action', type=self.type)
for i in Action.type_args[self.type]:
prefix, attr = i.split('_', 1)
val = getattr(self, i)
if val is None:
continue
elm = root.find(Action.prefix_to_elm.get(prefix, prefix))
if elm is None:
elm = ET.Element(Action.prefix_to_elm.get(prefix, prefix))
root.append(elm)
if prefix == 'opt':
ET.SubElement(elm, attr).text = val
else:
elm.set(attr, val)
return root
def to_str(self):
"""return "pretty" XML data"""
root = self.to_xml()
xmlindent(root)
return ET.tostring(root)
@staticmethod
def from_xml(action_node):
"""create action from XML"""
if action_node is None or \
not action_node.get('type') in Action.type_args.keys() or \
not action_node.tag in ('action', 'submit'):
raise oscerr.WrongArgs('invalid argument')
elm_to_prefix = dict([(i[1], i[0]) for i in Action.prefix_to_elm.items()])
kwargs = {}
for node in action_node:
prefix = elm_to_prefix.get(node.tag, node.tag)
if prefix == 'opt':
data = [('opt_%s' % opt.tag, opt.text.strip()) for opt in node if opt.text]
else:
data = [('%s_%s' % (prefix, k), v) for k, v in node.items()]
kwargs.update(dict(data))
return Action(action_node.get('type'), **kwargs)
class Request: class Request:
"""represent a request and holds its metadata """Represents a request (<request />)"""
it has methods to read in metadata from xml,
different views, ..."""
def __init__(self): def __init__(self):
self.reqid = None self.reqid = None
self.state = RequestState() self.title = ''
self.who = None self.description = ''
self.when = None self.state = None
self.last_author = None
self.descr = None
self.actions = [] self.actions = []
self.statehistory = [] self.statehistory = []
self.reviews = [] self.reviews = []
self.readonly = False
def read(self, root): def read(self, root):
self.reqid = int(root.get('id')) """read in a request"""
actions = root.findall('action') self.readonly = True
if len(actions) == 0: if not root.get('id'):
actions = [ root.find('submit') ] # for old style requests raise oscerr.APIError('invalid request: %s\n' % ET.tostring(root))
self.reqid = root.get('id')
if root.find('state') is None:
raise oscerr.APIError('invalid request (state expected): %s\n' % ET.tostring(root))
self.state = RequestState(root.find('state'))
action_nodes = root.findall('action')
if not action_nodes:
# check for old-style requests
for i in root.findall('submit'):
i.set('type', 'submit')
action_nodes.append(i)
for action in action_nodes:
self.actions.append(Action.from_xml(action))
for review in root.findall('review'):
self.reviews.append(ReviewState(review))
for hist_state in root.findall('history'):
self.statehistory.append(RequestState(hist_state))
if not root.find('title') is None:
self.title = root.find('title').text.strip()
if not root.find('description') is None and root.find('description').text:
self.description = root.find('description').text.strip()
for action in actions: def add_action(self, type, **kwargs):
action_type = action.get('type', 'submit') """add a new action to the request"""
try: self.actions.append(Action(type, **kwargs))
src_prj = src_pkg = src_rev = dst_prj = dst_pkg = src_update = role = role_person = role_group = None
if action.findall('source'):
n = action.find('source')
src_prj = n.get('project', None)
src_pkg = n.get('package', None)
src_rev = n.get('rev', None)
if action.findall('target'):
n = action.find('target')
dst_prj = n.get('project', None)
dst_pkg = n.get('package', None)
if action.findall('options'):
n = action.find('options')
if n.findall('sourceupdate'):
src_update = n.find('sourceupdate').text.strip()
if action.findall('person'):
n = action.find('person')
role_person = n.get('name', None)
role = n.get('role', None)
if action.findall('group'):
n = action.find('add_role')
role_group = n.get('name', None)
role = n.get('role', None)
self.add_action(action_type, src_prj, src_pkg, src_rev, dst_prj, dst_pkg, src_update, role_person, role_group, role)
except:
msg = 'invalid request format:\n%s' % ET.tostring(root)
raise oscerr.APIError(msg)
# read the state
n = root.find('state')
self.state.name, self.state.who, self.state.when \
= n.get('name'), n.get('who'), n.get('when')
try:
self.state.comment = n.find('comment').text.strip()
except:
self.state.comment = None
# read the review states
for r in root.findall('review'):
s = ReviewState()
s.state = r.get('state')
s.by_user = r.get('by_user')
s.by_group = r.get('by_group')
s.who = r.get('who')
s.when = r.get('when')
try:
s.comment = r.find('comment').text.strip()
except:
s.comment = None
self.reviews.append(s)
# read the state history
for h in root.findall('history'):
s = RequestState()
s.name = h.get('name')
s.who = h.get('who')
s.when = h.get('when')
try:
s.comment = h.find('comment').text.strip()
except:
s.comment = None
self.statehistory.append(s)
self.statehistory.reverse()
# read a description, if it exists
try:
n = root.find('description').text
self.descr = n
except:
pass
def add_action(self, type, src_prj, src_pkg, src_rev, dst_prj, dst_pkg, src_update, role_person, role_group, role):
self.actions.append(Action(type, src_prj, src_pkg, src_rev,
dst_prj, dst_pkg, src_update, role_person, role_group, role)
)
def get_creator(self): def get_creator(self):
"""return the creator of the request"""
if len(self.statehistory): if len(self.statehistory):
return self.statehistory[-1].who return self.statehistory[0].who
return self.state.who return self.state.who
def to_xml(self):
"""serialize object to XML"""
root = ET.Element('request')
if not self.reqid is None:
root.set('id', self.reqid)
for action in self.actions:
root.append(action.to_xml())
if not self.state is None:
root.append(self.state.to_xml())
for review in self.reviews:
root.append(review.to_xml())
for hist in self.statehistory:
root.append(hist.to_xml())
if self.title:
ET.SubElement(root, 'title').text = self.title
if self.description:
ET.SubElement(root, 'description').text = self.description
return root
def to_str(self):
"""return "pretty" XML data"""
root = self.to_xml()
xmlindent(root)
return ET.tostring(root)
def _format_action(self, action, show_srcupdate=False):
"""
format an action depending on the action's type.
A formatted str is returned.
"""
def prj_pkg_join(prj, pkg):
if not pkg:
return prj or ''
return '%s/%s' % (prj, pkg)
d = {'type': '%s:' % action.type}
if action.type == 'set_bugowner':
d['source'] = action.person_name
d['target'] = prj_pkg_join(action.tgt_project, action.tgt_package)
elif action.type == 'change_devel':
d['source'] = prj_pkg_join(action.tgt_project, action.tgt_package)
d['target'] = 'developed in %s' % prj_pkg_join(action.src_project, action.src_package)
elif action.type == 'submit':
srcupdate = ' '
if action.opt_sourceupdate and show_srcupdate:
srcupdate = '(%s)' % action.opt_sourceupdate
d['source'] = '%s%s ->' % (prj_pkg_join(action.src_project, action.src_package), srcupdate)
tgt_package = action.tgt_package
if action.src_package == action.tgt_package:
tgt_package = ''
d['target'] = prj_pkg_join(action.tgt_project, tgt_package)
elif action.type == 'add_role':
roles = []
if action.person_name and action.person_role:
roles.append('person: %s as %s' % (action.person_name, action.person_role))
if action.group_name and action.group_role:
roles.append('group: %s as %s' % (action.group_name, action.group_role))
d['source'] = ', '.join(roles)
d['target'] = prj_pkg_join(action.tgt_project, action.tgt_package)
elif action.type == 'delete':
d['source'] = ''
d['target'] = prj_pkg_join(action.tgt_project, action.tgt_package)
return d
def list_view(self): def list_view(self):
ret = '%6d State:%-7s By:%-12s When:%-12s' % (self.reqid, self.state.name, self.state.who, self.state.when) """return "list view" format"""
import textwrap, locale
lines = ['%6s State:%-10s By:%-12s When:%-19s' % (self.reqid, self.state.name, self.state.who, self.state.when)]
tmpl = ' %(type)-16s %(source)-50s %(target)s'
for action in self.actions:
lines.append(tmpl % self._format_action(action))
history = ['%s(%s)' % (hist.name, hist.who) for hist in self.statehistory]
if history:
lines.append(' From: %s' % ' -> '.join(history))
if self.description:
descr = self.description.encode(locale.getpreferredencoding(), 'replace')
lines.append(textwrap.fill(descr, width=80, initial_indent=' Descr: ',
subsequent_indent=' '))
return '\n'.join(lines)
for a in self.actions: def __str__(self):
dst = "%s/%s" % (a.dst_project, a.dst_package) """return "detailed" format"""
if a.src_package == a.dst_package: import locale
dst = a.dst_project lines = ['Request: #%s\n' % self.reqid]
for action in self.actions:
tmpl = ' %(type)-13s %(source)s %(target)s'
if action.type == 'delete':
# remove 1 whitespace because source is empty
tmpl = ' %(type)-12s %(source)s %(target)s'
lines.append(tmpl % self._format_action(action, show_srcupdate=True))
lines.append('\n\nMessage:')
if self.description:
lines.append(self.description.encode(locale.getpreferredencoding(), 'replace'))
else:
lines.append('<no message>')
lines.append('\nState: %-10s %-12s %s' % (self.state.name, self.state.when, self.state.who))
lines.append('Comment: %s' % (self.state.comment or '<no comment>'))
sr_source="" indent = '\n '
if a.type=="submit": tmpl = '%(state)-10s %(by_user)s %(by_group)s %(when)-12s %(who)s %(comment)s'
sr_source="%s/%s -> " % (a.src_project, a.src_package) reviews = []
if a.type=="add_role": for review in reversed(self.reviews):
if a.role_person is not None: d = {'state': review.state}
sr_source="%s as %s" % (a.role_person, a.role) d['by_user'] = review.by_user or '<no by_user>'
if a.role_group is not None: d['by_group'] = review.by_group or '<no by_group>'
sr_source="%s as %s" % (a.role_group, a.role) d['when'] = review.when
if a.type=="change_devel": d['who'] = review.who
dst = "developed in %s/%s" % (a.src_project, a.src_package) d['comment'] = review.comment
sr_source="%s/%s" % (a.dst_project, a.dst_package) reviews.append(tmpl % d)
if reviews:
lines.append('\nReview: %s' % indent.join(reviews))
ret += '\n %s: %-50s %-20s ' % \ tmpl = '%(name)-10s %(when)-12s %(who)s'
(a.type, sr_source, dst) histories = []
for hist in reversed(self.statehistory):
d = {'name': hist.name, 'when': hist.when,
'who': hist.who}
histories.append(tmpl % d)
if histories:
lines.append('\nHistory: %s' % indent.join(histories))
if self.statehistory and self.statehistory[0]: return '\n'.join(lines)
who = []
for h in self.statehistory:
who.append("%s(%s)" % (h.who,h.name))
who.reverse()
ret += "\n From: %s" % (' -> '.join(who))
if self.descr:
txt = re.sub(r'[^[:isprint:]]', '_', self.descr)
import textwrap
lines = txt.splitlines()
wrapper = textwrap.TextWrapper( width = 80,
initial_indent=' Descr: ',
subsequent_indent=' ')
ret += "\n" + wrapper.fill(lines[0])
wrapper.initial_indent = ' '
for line in lines[1:]:
ret += "\n" + wrapper.fill(line)
ret += "\n"
return ret
def __cmp__(self, other): def __cmp__(self, other):
return cmp(self.reqid, other.reqid) return cmp(self.reqid, other.reqid)
def __str__(self):
action_list=""
for action in self.actions:
action_list=action_list+" %s: " % (action.type)
if action.type=="submit":
r=""
if action.src_rev:
r="(r%s)" % (action.src_rev)
m=""
if action.src_update:
m="(%s)" % (action.src_update)
action_list=action_list+" %s/%s%s%s -> %s" % ( action.src_project, action.src_package, r, m, action.dst_project )
if action.dst_package:
action_list=action_list+"/%s" % ( action.dst_package )
elif action.type=="delete":
action_list=action_list+" %s" % ( action.dst_project )
if action.dst_package:
action_list=action_list+"/%s" % ( action.dst_package )
elif action.type=="change_devel":
action_list=action_list+" %s/%s developed in %s/%s" % \
( action.dst_project, action.dst_package, action.src_project, action.src_package )
action_list=action_list+"\n"
s = """\
Request #%s:
%s
Message:
%s
State: %-10s %s %s
Comment: %s
""" % (self.reqid,
action_list,
self.descr,
self.state.name, self.state.when, self.state.who,
self.state.comment)
if len(self.reviews):
reviewitems = [ '%-10s %s %s %s %s %s' \
% (i.state, i.by_user, i.by_group, i.when, i.who, i.comment) \
for i in self.reviews ]
s += '\nReview: ' + '\n '.join(reviewitems)
s += '\n'
if len(self.statehistory):
histitems = [ '%-10s %s %s' \
% (i.name, i.when, i.who) \
for i in self.statehistory ]
s += '\nHistory: ' + '\n '.join(histitems)
s += '\n'
return s
def shorttime(t): def shorttime(t):
"""format time as Apr 02 18:19 """format time as Apr 02 18:19