Split the work done in rabbit-openqa into smaller chunks

As the rabbitmq heartbeart interval is a hard limit, we need to make sure
we're not overloading the time in between
This commit is contained in:
Stephan Kulow 2020-03-13 15:19:07 +01:00
parent d98dd90928
commit 236cf1ab38
2 changed files with 31 additions and 6 deletions

View File

@ -32,11 +32,13 @@ class Project(object):
self.replace_string = self.api.attribute_value_load('OpenQAMapping') self.replace_string = self.api.attribute_value_load('OpenQAMapping')
def init(self): def init(self):
for p in self.api.get_staging_projects(): projects = set()
if self.api.is_adi_project(p): for project in self.api.get_staging_projects():
if self.api.is_adi_project(project):
continue continue
self.staging_projects[p] = self.initial_staging_state(p) self.staging_projects[project] = self.initial_staging_state(project)
self.update_staging_status(p) projects.add(project)
return projects
def staging_letter(self, name): def staging_letter(self, name):
return name.split(':')[-1] return name.split(':')[-1]
@ -163,6 +165,7 @@ class Listener(PubSubConsumer):
self.amqp_prefix = amqp_prefix self.amqp_prefix = amqp_prefix
self.openqa_url = openqa_url self.openqa_url = openqa_url
self.openqa = OpenQA_Client(server=openqa_url) self.openqa = OpenQA_Client(server=openqa_url)
self.projects_to_check = set()
def routing_keys(self): def routing_keys(self):
ret = [] ret = []
@ -178,12 +181,33 @@ class Listener(PubSubConsumer):
def start_consuming(self): def start_consuming(self):
# now we are (re-)connected to the bus and need to fetch the # now we are (re-)connected to the bus and need to fetch the
# initial state # initial state
self.projects_to_check = set()
for project in self.projects: for project in self.projects:
self.logger.info('Fetching ISOs of %s', project.name) self.logger.info('Fetching ISOs of %s', project.name)
project.init() for sproj in project.init():
self.projects_to_check.add((project, sproj))
self.logger.info('Finished fetching initial ISOs, listening') self.logger.info('Finished fetching initial ISOs, listening')
super(Listener, self).start_consuming() super(Listener, self).start_consuming()
def interval(self):
if len(self.projects_to_check):
return 5
return super(Listener, self).interval()
def check_some_projects(self):
count = 0
limit = 5
while len(self.projects_to_check):
project, staging = self.projects_to_check.pop()
project.update_staging_status(staging)
count += 1
if count >= limit:
return
def still_alive(self):
self.check_some_projects()
super(Listener, self).still_alive()
def jobs_for_iso(self, iso): def jobs_for_iso(self, iso):
values = { values = {
'iso': iso, 'iso': iso,

View File

@ -80,10 +80,11 @@ class Listener(PubSubConsumer):
def check_some_repos(self): def check_some_repos(self):
count = 0 count = 0
limit = 25 limit = 15
while len(self.repositories_to_check): while len(self.repositories_to_check):
project, repository = self.repositories_to_check.pop() project, repository = self.repositories_to_check.pop()
self.logger.debug(f"Check repo {project}/{repository}") self.logger.debug(f"Check repo {project}/{repository}")
self.update_repo(project, repository)
count += 1 count += 1
if count >= limit: if count >= limit:
return return