Merge pull request #2214 from jberry-suse/origin-manager-update-listener-remote
osclib/origin_listener: start additional listeners for remote origins.
This commit is contained in:
commit
9c45fc378b
@ -1,8 +1,10 @@
|
|||||||
import json
|
import json
|
||||||
from osclib.core import package_kind
|
from osclib.core import package_kind
|
||||||
|
from osclib.core import project_remote_list
|
||||||
from osclib.origin import origin_updatable_map
|
from osclib.origin import origin_updatable_map
|
||||||
from osclib.origin import origin_update
|
from osclib.origin import origin_update
|
||||||
from osclib.PubSubConsumer import PubSubConsumer
|
from osclib.PubSubConsumer import PubSubConsumer
|
||||||
|
import threading
|
||||||
|
|
||||||
|
|
||||||
class OriginSourceChangeListener(PubSubConsumer):
|
class OriginSourceChangeListener(PubSubConsumer):
|
||||||
@ -10,10 +12,28 @@ class OriginSourceChangeListener(PubSubConsumer):
|
|||||||
self.apiurl = apiurl
|
self.apiurl = apiurl
|
||||||
self.project = project
|
self.project = project
|
||||||
self.dry = dry
|
self.dry = dry
|
||||||
|
self.listeners = {}
|
||||||
|
|
||||||
amqp_prefix = 'suse' if self.apiurl.endswith('suse.de') else 'opensuse'
|
amqp_prefix = 'suse' if self.apiurl.endswith('suse.de') else 'opensuse'
|
||||||
super().__init__(amqp_prefix, logger)
|
super().__init__(amqp_prefix, logger)
|
||||||
|
|
||||||
|
def run(self, runtime=None):
|
||||||
|
super().run(runtime=runtime)
|
||||||
|
|
||||||
|
for listener in self.listeners.values():
|
||||||
|
listener.run(runtime=runtime)
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
super().stop()
|
||||||
|
|
||||||
|
for listener in self.listeners.values():
|
||||||
|
listener.stop()
|
||||||
|
|
||||||
|
def start_consuming(self):
|
||||||
|
super().start_consuming()
|
||||||
|
|
||||||
|
self.check_remotes()
|
||||||
|
|
||||||
def routing_keys(self):
|
def routing_keys(self):
|
||||||
return [self._prefix + k for k in [
|
return [self._prefix + k for k in [
|
||||||
'.obs.package.update',
|
'.obs.package.update',
|
||||||
@ -32,11 +52,11 @@ class OriginSourceChangeListener(PubSubConsumer):
|
|||||||
raise Exception('Unrequested message: {}'.format(method.routing_key))
|
raise Exception('Unrequested message: {}'.format(method.routing_key))
|
||||||
|
|
||||||
def on_message_package_update(self, payload):
|
def on_message_package_update(self, payload):
|
||||||
origins = origin_updatable_map(self.apiurl)
|
origins = self.origin_updatable_map()
|
||||||
self.update_consider(origins, payload['project'], payload['package'])
|
self.update_consider(origins, payload['project'], payload['package'])
|
||||||
|
|
||||||
def on_message_request_create(self, payload):
|
def on_message_request_create(self, payload):
|
||||||
origins = origin_updatable_map(self.apiurl, pending=True)
|
origins = self.origin_updatable_map(pending=True)
|
||||||
for action in payload['actions']:
|
for action in payload['actions']:
|
||||||
# The following code demonstrates the quality of the data structure.
|
# The following code demonstrates the quality of the data structure.
|
||||||
# The base structure is inconsistent enough and yet the event data
|
# The base structure is inconsistent enough and yet the event data
|
||||||
@ -68,9 +88,22 @@ class OriginSourceChangeListener(PubSubConsumer):
|
|||||||
|
|
||||||
self.update_consider(origins, project, package)
|
self.update_consider(origins, project, package)
|
||||||
|
|
||||||
|
def check_remotes(self):
|
||||||
|
origins = self.origin_updatable_map()
|
||||||
|
remotes = project_remote_list(self.apiurl)
|
||||||
|
for remote, apiurl in remotes.items():
|
||||||
|
for origin in origins:
|
||||||
|
if origin.startswith(remote + ':') and apiurl not in self.listeners:
|
||||||
|
self.logger.info('starting remote listener due to {} origin'.format(origin))
|
||||||
|
self.listeners[apiurl] = OriginSourceChangeListenerRemote(apiurl, self, remote)
|
||||||
|
threading.Thread(target=self.listeners[apiurl].run).start()
|
||||||
|
|
||||||
|
def origin_updatable_map(self, pending=None):
|
||||||
|
return origin_updatable_map(self.apiurl, pending=pending)
|
||||||
|
|
||||||
def update_consider(self, origins, origin_project, package):
|
def update_consider(self, origins, origin_project, package):
|
||||||
if origin_project not in origins:
|
if origin_project not in origins:
|
||||||
self.logger.info('skipped irrelevant project: {}'.format(origin_project))
|
self.logger.info('skipped irrelevant origin: {}'.format(origin_project))
|
||||||
return
|
return
|
||||||
|
|
||||||
for project in origins[origin_project]:
|
for project in origins[origin_project]:
|
||||||
@ -86,3 +119,21 @@ class OriginSourceChangeListener(PubSubConsumer):
|
|||||||
else:
|
else:
|
||||||
# This eliminates the possibility for deletes by listener.
|
# This eliminates the possibility for deletes by listener.
|
||||||
self.logger.info('skipped updating non-existant package {}/{}'.format(project, package))
|
self.logger.info('skipped updating non-existant package {}/{}'.format(project, package))
|
||||||
|
|
||||||
|
class OriginSourceChangeListenerRemote(OriginSourceChangeListener):
|
||||||
|
def __init__(self, apiurl, parent, prefix):
|
||||||
|
self.parent = parent
|
||||||
|
self.prefix = prefix
|
||||||
|
|
||||||
|
super().__init__(apiurl, self.parent.logger)
|
||||||
|
self._run_until = self.parent._run_until
|
||||||
|
|
||||||
|
def check_remotes(self):
|
||||||
|
pass
|
||||||
|
|
||||||
|
def origin_updatable_map(self, pending=None):
|
||||||
|
return self.parent.origin_updatable_map(pending=pending)
|
||||||
|
|
||||||
|
def update_consider(self, origins, origin_project, package):
|
||||||
|
origin_project = '{}:{}'.format(self.prefix, origin_project)
|
||||||
|
self.parent.update_consider(origins, origin_project, package)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user