mirror of
https://github.com/openSUSE/osc.git
synced 2024-12-29 11:16:14 +01:00
Merge pull request #1327 from dmach/q
Add XPathQuery class for translating keyword arguments to an xpath query
This commit is contained in:
commit
dffe549742
153
osc/util/xpath.py
Normal file
153
osc/util/xpath.py
Normal file
@ -0,0 +1,153 @@
|
||||
from . import xml
|
||||
|
||||
|
||||
class XPathQuery:
|
||||
"""
|
||||
A query object that translates keyword arguments into a xpath query.
|
||||
The query objects can combined using `&` and `|` operators.
|
||||
|
||||
Inspired with:
|
||||
https://docs.djangoproject.com/en/dev/topics/db/queries/#complex-lookups-with-q-objects
|
||||
"""
|
||||
|
||||
VALID_OPS = ["eq", "contains"]
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
self.xpath = ""
|
||||
self.last_op = None
|
||||
|
||||
for key, value in kwargs.items():
|
||||
if value is None:
|
||||
continue
|
||||
key, op, value, op_not = self._parse(key, value)
|
||||
self._apply(key, op, value, op_not)
|
||||
|
||||
def __str__(self):
|
||||
return self.xpath
|
||||
|
||||
def _parse(self, key, value):
|
||||
op = "eq"
|
||||
op_not = False
|
||||
|
||||
parts = key.split("__")
|
||||
for valid_op in self.VALID_OPS:
|
||||
# there must always be a field name followed by 0+ operators
|
||||
# in this case there's only the name
|
||||
if len(parts) == 1:
|
||||
continue
|
||||
|
||||
if parts[-2:] == ["not", valid_op]:
|
||||
op = parts[-1]
|
||||
op_not = True
|
||||
parts = parts[:-2]
|
||||
break
|
||||
elif parts[-1] == valid_op:
|
||||
op = parts[-1]
|
||||
parts = parts[:-1]
|
||||
break
|
||||
elif parts[-1] == "not":
|
||||
op_not = True
|
||||
parts = parts[:-1]
|
||||
break
|
||||
|
||||
key = "__".join(parts)
|
||||
return key, op, value, op_not
|
||||
|
||||
def _apply(self, key, op, value, op_not=False):
|
||||
if "__" in key:
|
||||
prefix, key = key.rsplit("__", 1)
|
||||
prefix = prefix.replace("__", "/")
|
||||
else:
|
||||
prefix = ""
|
||||
|
||||
if isinstance(value, (list, tuple)):
|
||||
q = XPathQuery()
|
||||
for i in value:
|
||||
if op_not:
|
||||
# translate name__not=["foo", "bar"] into XPathQuery(name__not="foo") & XPathQuery(name__not="bar")
|
||||
q &= XPathQuery()._apply(key, op, i, op_not)
|
||||
else:
|
||||
# translate name=["foo", "bar"] into XPathQuery(name="foo") | XPathQuery(name="bar")
|
||||
q |= XPathQuery()._apply(key, op, i, op_not)
|
||||
|
||||
if prefix:
|
||||
q.xpath = f"{prefix}[{q.xpath}]"
|
||||
|
||||
self &= q
|
||||
return self
|
||||
|
||||
if isinstance(value, bool):
|
||||
value = str(int(value))
|
||||
|
||||
prefix = xml.xml_escape(prefix)
|
||||
key = xml.xml_escape(key)
|
||||
key = f"@{key}"
|
||||
value = xml.xml_escape(value)
|
||||
value = f"'{value}'"
|
||||
|
||||
q = XPathQuery()
|
||||
if op == "eq":
|
||||
q.xpath = f"{key}={value}"
|
||||
elif op == "contains":
|
||||
q.xpath = f"contains({key}, {value})"
|
||||
else:
|
||||
raise ValueError(f"Invalid operator: {op}")
|
||||
|
||||
if op_not:
|
||||
q.xpath = f"not({q.xpath})"
|
||||
|
||||
if prefix:
|
||||
q.xpath = f"{prefix}[{q.xpath}]"
|
||||
|
||||
self &= q
|
||||
return self
|
||||
|
||||
@staticmethod
|
||||
def _imerge(q1, op, q2):
|
||||
"""
|
||||
Merge `q2` into `q1`.
|
||||
"""
|
||||
if not q1.xpath and not q2.xpath:
|
||||
return
|
||||
|
||||
if not q1.xpath:
|
||||
q1.xpath = q2.xpath
|
||||
q1.last_op = q2.last_op
|
||||
return
|
||||
|
||||
if not q2.xpath:
|
||||
return
|
||||
|
||||
assert op is not None
|
||||
|
||||
if q1.last_op not in (None, op):
|
||||
q1.xpath = f"({q1.xpath})"
|
||||
|
||||
q1.xpath += f" {op} "
|
||||
|
||||
if q2.last_op in (None, op):
|
||||
q1.xpath += f"{q2.xpath}"
|
||||
else:
|
||||
q1.xpath += f"({q2.xpath})"
|
||||
|
||||
q1.last_op = op
|
||||
|
||||
def __and__(self, other):
|
||||
result = XPathQuery()
|
||||
self._imerge(result, None, self)
|
||||
self._imerge(result, "and", other)
|
||||
return result
|
||||
|
||||
def __iand__(self, other):
|
||||
self._imerge(self, "and", other)
|
||||
return self
|
||||
|
||||
def __or__(self, other):
|
||||
result = XPathQuery()
|
||||
self._imerge(result, None, self)
|
||||
self._imerge(result, "or", other)
|
||||
return result
|
||||
|
||||
def __ior__(self, other):
|
||||
self._imerge(self, "or", other)
|
||||
return self
|
109
tests/test_xpath.py
Normal file
109
tests/test_xpath.py
Normal file
@ -0,0 +1,109 @@
|
||||
import unittest
|
||||
|
||||
from osc.util.xpath import XPathQuery as Q
|
||||
|
||||
|
||||
class TestQuery(unittest.TestCase):
|
||||
def test_noop(self):
|
||||
q = Q(name="foo")
|
||||
self.assertEqual(str(q), "@name='foo'")
|
||||
|
||||
def test_not(self):
|
||||
q = Q(name__not="foo")
|
||||
self.assertEqual(str(q), "not(@name='foo')")
|
||||
|
||||
def test_eq(self):
|
||||
q = Q(name__eq="foo")
|
||||
self.assertEqual(str(q), "@name='foo'")
|
||||
|
||||
def test_not_eq(self):
|
||||
q = Q(name__not__eq="foo")
|
||||
self.assertEqual(str(q), "not(@name='foo')")
|
||||
|
||||
def test_contains(self):
|
||||
q = Q(name__contains="foo")
|
||||
self.assertEqual(str(q), "contains(@name, 'foo')")
|
||||
|
||||
def test_and(self):
|
||||
q1 = Q(name="foo")
|
||||
q2 = Q(name="bar")
|
||||
q = q1 & q2
|
||||
self.assertEqual(str(q), "@name='foo' and @name='bar'")
|
||||
|
||||
q3 = Q(name="baz")
|
||||
q = q & q3
|
||||
self.assertEqual(str(q), "@name='foo' and @name='bar' and @name='baz'")
|
||||
|
||||
def test_or(self):
|
||||
q1 = Q(name="foo")
|
||||
q2 = Q(name="bar")
|
||||
q = q1 | q2
|
||||
self.assertEqual(str(q), "@name='foo' or @name='bar'")
|
||||
|
||||
q3 = Q(name="baz")
|
||||
q = q | q3
|
||||
self.assertEqual(str(q), "@name='foo' or @name='bar' or @name='baz'")
|
||||
|
||||
def test_and_or(self):
|
||||
q1 = Q(name="foo")
|
||||
q2 = Q(name="bar")
|
||||
q = q1 & q2
|
||||
self.assertEqual(str(q), "@name='foo' and @name='bar'")
|
||||
|
||||
q3 = Q(name="baz")
|
||||
q = q | q3
|
||||
self.assertEqual(str(q), "(@name='foo' and @name='bar') or @name='baz'")
|
||||
|
||||
q4 = Q(name="xyz")
|
||||
q = q | q4
|
||||
self.assertEqual(str(q), "(@name='foo' and @name='bar') or @name='baz' or @name='xyz'")
|
||||
|
||||
def test_or_and(self):
|
||||
q1 = Q(name="foo")
|
||||
q2 = Q(name="bar")
|
||||
q = q1 | q2
|
||||
self.assertEqual(str(q), "@name='foo' or @name='bar'")
|
||||
|
||||
q3 = Q(name="baz")
|
||||
q = q & q3
|
||||
self.assertEqual(str(q), "(@name='foo' or @name='bar') and @name='baz'")
|
||||
|
||||
q4 = Q(name="xyz")
|
||||
q = q & q4
|
||||
self.assertEqual(str(q), "(@name='foo' or @name='bar') and @name='baz' and @name='xyz'")
|
||||
|
||||
def test_and_or_and(self):
|
||||
q1 = Q(name="foo")
|
||||
q2 = Q(name="bar")
|
||||
q3 = Q(name="baz")
|
||||
q4 = Q(name="xyz")
|
||||
q = (q1 & q2) | (q3 & q4)
|
||||
self.assertEqual(str(q), "(@name='foo' and @name='bar') or (@name='baz' and @name='xyz')")
|
||||
|
||||
def test_or_and_or(self):
|
||||
q1 = Q(name="foo")
|
||||
q2 = Q(name="bar")
|
||||
q3 = Q(name="baz")
|
||||
q4 = Q(name="xyz")
|
||||
q = (q1 | q2) & (q3 | q4)
|
||||
self.assertEqual(str(q), "(@name='foo' or @name='bar') and (@name='baz' or @name='xyz')")
|
||||
|
||||
def test_multiple_kwargs(self):
|
||||
q = Q(name1="foo", name2="bar")
|
||||
self.assertEqual(str(q), "@name1='foo' and @name2='bar'")
|
||||
|
||||
def test_eq_list(self):
|
||||
q = Q(name=["foo", "bar", "baz"])
|
||||
self.assertEqual(str(q), "@name='foo' or @name='bar' or @name='baz'")
|
||||
|
||||
def test_not_eq_list(self):
|
||||
q = Q(name__not=["foo", "bar", "baz"])
|
||||
self.assertEqual(str(q), "not(@name='foo') and not(@name='bar') and not(@name='baz')")
|
||||
|
||||
def test_review_state(self):
|
||||
q = Q(state__name=["new"])
|
||||
self.assertEqual(str(q), "state[@name='new']")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
Loading…
Reference in New Issue
Block a user