osclib/origin_listener: start additional listeners for remote origins.
For example, SLE-15-SP2 has openSUSE.org:openSUSE:Factory as an origin. The events for that project are not included on the IBS message bus and thus package updates to that project will be missed. When origins contain a remote prefix another listener needs to be started pointing at the remote OBS instance message bus. The resulting messages need to be prefixed before being considered by the primary listener.
This commit is contained in:
parent
791d449777
commit
ac6e61a15e
@ -1,8 +1,10 @@
|
||||
import json
|
||||
from osclib.core import package_kind
|
||||
from osclib.core import project_remote_list
|
||||
from osclib.origin import origin_updatable_map
|
||||
from osclib.origin import origin_update
|
||||
from osclib.PubSubConsumer import PubSubConsumer
|
||||
import threading
|
||||
|
||||
|
||||
class OriginSourceChangeListener(PubSubConsumer):
|
||||
@ -10,10 +12,28 @@ class OriginSourceChangeListener(PubSubConsumer):
|
||||
self.apiurl = apiurl
|
||||
self.project = project
|
||||
self.dry = dry
|
||||
self.listeners = {}
|
||||
|
||||
amqp_prefix = 'suse' if self.apiurl.endswith('suse.de') else 'opensuse'
|
||||
super().__init__(amqp_prefix, logger)
|
||||
|
||||
def run(self, runtime=None):
|
||||
super().run(runtime=runtime)
|
||||
|
||||
for listener in self.listeners.values():
|
||||
listener.run(runtime=runtime)
|
||||
|
||||
def stop(self):
|
||||
super().stop()
|
||||
|
||||
for listener in self.listeners.values():
|
||||
listener.stop()
|
||||
|
||||
def start_consuming(self):
|
||||
super().start_consuming()
|
||||
|
||||
self.check_remotes()
|
||||
|
||||
def routing_keys(self):
|
||||
return [self._prefix + k for k in [
|
||||
'.obs.package.update',
|
||||
@ -68,6 +88,16 @@ class OriginSourceChangeListener(PubSubConsumer):
|
||||
|
||||
self.update_consider(origins, project, package)
|
||||
|
||||
def check_remotes(self):
|
||||
origins = self.origin_updatable_map()
|
||||
remotes = project_remote_list(self.apiurl)
|
||||
for remote, apiurl in remotes.items():
|
||||
for origin in origins:
|
||||
if origin.startswith(remote + ':') and apiurl not in self.listeners:
|
||||
self.logger.info('starting remote listener due to {} origin'.format(origin))
|
||||
self.listeners[apiurl] = OriginSourceChangeListenerRemote(apiurl, self, remote)
|
||||
threading.Thread(target=self.listeners[apiurl].run).start()
|
||||
|
||||
def origin_updatable_map(self, pending=None):
|
||||
return origin_updatable_map(self.apiurl, pending=pending)
|
||||
|
||||
@ -89,3 +119,21 @@ class OriginSourceChangeListener(PubSubConsumer):
|
||||
else:
|
||||
# This eliminates the possibility for deletes by listener.
|
||||
self.logger.info('skipped updating non-existant package {}/{}'.format(project, package))
|
||||
|
||||
class OriginSourceChangeListenerRemote(OriginSourceChangeListener):
|
||||
def __init__(self, apiurl, parent, prefix):
|
||||
self.parent = parent
|
||||
self.prefix = prefix
|
||||
|
||||
super().__init__(apiurl, self.parent.logger)
|
||||
self._run_until = self.parent._run_until
|
||||
|
||||
def check_remotes(self):
|
||||
pass
|
||||
|
||||
def origin_updatable_map(self, pending=None):
|
||||
return self.parent.origin_updatable_map(pending=pending)
|
||||
|
||||
def update_consider(self, origins, origin_project, package):
|
||||
origin_project = '{}:{}'.format(self.prefix, origin_project)
|
||||
self.parent.update_consider(origins, origin_project, package)
|
||||
|
Loading…
x
Reference in New Issue
Block a user