155 lines
5.1 KiB
155 lines
5.1 KiB
from datetime import datetime
import time
import warnings
from lxml import etree as ET
from osc import conf
from osc.core import makeurl
from osc.core import http_GET
from osc.core import http_POST
from urllib.error import HTTPError
class OBSLock(object):
"""Implement a distributed lock using a shared OBS resource."""
def __init__(self, apiurl, project, ttl=3600, reason=None, needed=True):
self.apiurl = apiurl
self.project = project
self.lock = conf.config[project]['lock']
self.ns = conf.config[project]['lock-ns']
# TTL is measured in seconds
self.ttl = ttl
self.user = conf.config['api_host_options'][apiurl]['user']
self.reason = reason
self.reason_sub = None
self.locked = False
self.needed = needed
def _signature(self):
"""Create a signature with a timestamp."""
reason = str(self.reason)
if self.reason_sub:
reason += f' ({self.reason_sub})'
reason = reason.replace('@', 'at').replace('#', 'hash')
return f"{self.user}#{reason}@{datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.%f')}"
def _parse(self, signature):
"""Parse a signature into an user and a timestamp."""
user, reason, reason_sub, ts = None, None, None, None
rest, ts_str = signature.split('@')
user, reason = rest.split('#')
if ' (hold' in reason:
reason, reason_sub = reason.split(' (', 1)
reason_sub = reason_sub.rstrip(')')
ts = datetime.strptime(ts_str, '%Y-%m-%dT%H:%M:%S.%f')
except (AttributeError, ValueError):
return user, reason, reason_sub, ts
def _read(self):
url = makeurl(self.apiurl, ['source', self.lock, '_attribute', f'{self.ns}:LockedBy'])
root = ET.parse(http_GET(url)).getroot()
except HTTPError as e:
if e.code == 404:
return None
raise e
signature = None
signature = root.find('.//value').text
except (AttributeError, ValueError):
return signature
def _write(self, signature):
url = makeurl(self.apiurl, ['source', self.lock, '_attribute'])
data = f"""
<attribute namespace='{self.ns}' name='LockedBy'>
http_POST(url, data=data)
def acquire(self):
if not self.needed:
return self
# If the project doesn't have locks configured, raise a
# Warning (but continue the operation)
if not self.lock:
warnings.warn('Locking attribute is not found. Create one to avoid race conditions.')
return self
user, reason, reason_sub, ts = self._parse(self._read())
if user and ts:
now = datetime.utcnow()
if now < ts:
raise Exception(f'Lock acquired from the future [{ts}] by [{user}]. Try later.')
delta = now - ts
if delta.total_seconds() < self.ttl:
# Existing lock that has not expired.
stop = True
if user == self.user:
if reason.startswith('hold'):
# Command being issued during a hold.
self.reason_sub = reason
stop = False
elif reason == 'lock':
# Second pass to acquire hold.
stop = False
if stop:
print(f'Lock acquired by [{user}] {delta} ago, reason <{reason}>. Try later.')
user, _, _, _ = self._parse(self._read())
if user != self.user:
raise Exception(f'Race condition, [{user}] wins. Try later.')
self.locked = True
return self
def release(self, force=False):
if not force and not self.needed:
# If the project do not have locks configured, simply ignore
# the operation.
if not self.lock:
user, reason, reason_sub, _ = self._parse(self._read())
clear = False
if user == self.user:
if reason_sub:
self.reason = reason_sub
self.reason_sub = None
elif not reason.startswith('hold') or force:
# Only clear a command lock as hold can only be cleared by force.
clear = True
elif user is not None and force:
# Clear if a lock is present and force.
clear = True
if clear:
self.locked = False
def hold(self, message=None):
self.reason = 'hold'
if message:
self.reason += ': ' + message
__enter__ = acquire
def __exit__(self, exc_type, exc_val, exc_tb):