From 28326a185d4fdcf396d0cae9cbc7d35b0f169d16 Mon Sep 17 00:00:00 2001 From: Daniel Mach Date: Mon, 15 May 2023 21:54:14 +0200 Subject: [PATCH] Add XPathQuery class for translating keyword arguments to an xpath query --- osc/util/xpath.py | 153 ++++++++++++++++++++++++++++++++++++++++++++ tests/test_xpath.py | 109 +++++++++++++++++++++++++++++++ 2 files changed, 262 insertions(+) create mode 100644 osc/util/xpath.py create mode 100644 tests/test_xpath.py diff --git a/osc/util/xpath.py b/osc/util/xpath.py new file mode 100644 index 00000000..2aebb5b8 --- /dev/null +++ b/osc/util/xpath.py @@ -0,0 +1,153 @@ +from . import xml + + +class XPathQuery: + """ + A query object that translates keyword arguments into a xpath query. + The query objects can combined using `&` and `|` operators. + + Inspired with: + https://docs.djangoproject.com/en/dev/topics/db/queries/#complex-lookups-with-q-objects + """ + + VALID_OPS = ["eq", "contains"] + + def __init__(self, **kwargs): + self.xpath = "" + self.last_op = None + + for key, value in kwargs.items(): + if value is None: + continue + key, op, value, op_not = self._parse(key, value) + self._apply(key, op, value, op_not) + + def __str__(self): + return self.xpath + + def _parse(self, key, value): + op = "eq" + op_not = False + + parts = key.split("__") + for valid_op in self.VALID_OPS: + # there must always be a field name followed by 0+ operators + # in this case there's only the name + if len(parts) == 1: + continue + + if parts[-2:] == ["not", valid_op]: + op = parts[-1] + op_not = True + parts = parts[:-2] + break + elif parts[-1] == valid_op: + op = parts[-1] + parts = parts[:-1] + break + elif parts[-1] == "not": + op_not = True + parts = parts[:-1] + break + + key = "__".join(parts) + return key, op, value, op_not + + def _apply(self, key, op, value, op_not=False): + if "__" in key: + prefix, key = key.rsplit("__", 1) + prefix = prefix.replace("__", "/") + else: + prefix = "" + + if isinstance(value, (list, tuple)): + q = XPathQuery() + for i in value: + if op_not: + # translate name__not=["foo", "bar"] into XPathQuery(name__not="foo") & XPathQuery(name__not="bar") + q &= XPathQuery()._apply(key, op, i, op_not) + else: + # translate name=["foo", "bar"] into XPathQuery(name="foo") | XPathQuery(name="bar") + q |= XPathQuery()._apply(key, op, i, op_not) + + if prefix: + q.xpath = f"{prefix}[{q.xpath}]" + + self &= q + return self + + if isinstance(value, bool): + value = str(int(value)) + + prefix = xml.xml_escape(prefix) + key = xml.xml_escape(key) + key = f"@{key}" + value = xml.xml_escape(value) + value = f"'{value}'" + + q = XPathQuery() + if op == "eq": + q.xpath = f"{key}={value}" + elif op == "contains": + q.xpath = f"contains({key}, {value})" + else: + raise ValueError(f"Invalid operator: {op}") + + if op_not: + q.xpath = f"not({q.xpath})" + + if prefix: + q.xpath = f"{prefix}[{q.xpath}]" + + self &= q + return self + + @staticmethod + def _imerge(q1, op, q2): + """ + Merge `q2` into `q1`. + """ + if not q1.xpath and not q2.xpath: + return + + if not q1.xpath: + q1.xpath = q2.xpath + q1.last_op = q2.last_op + return + + if not q2.xpath: + return + + assert op is not None + + if q1.last_op not in (None, op): + q1.xpath = f"({q1.xpath})" + + q1.xpath += f" {op} " + + if q2.last_op in (None, op): + q1.xpath += f"{q2.xpath}" + else: + q1.xpath += f"({q2.xpath})" + + q1.last_op = op + + def __and__(self, other): + result = XPathQuery() + self._imerge(result, None, self) + self._imerge(result, "and", other) + return result + + def __iand__(self, other): + self._imerge(self, "and", other) + return self + + def __or__(self, other): + result = XPathQuery() + self._imerge(result, None, self) + self._imerge(result, "or", other) + return result + + def __ior__(self, other): + self._imerge(self, "or", other) + return self diff --git a/tests/test_xpath.py b/tests/test_xpath.py new file mode 100644 index 00000000..91a57b29 --- /dev/null +++ b/tests/test_xpath.py @@ -0,0 +1,109 @@ +import unittest + +from osc.util.xpath import XPathQuery as Q + + +class TestQuery(unittest.TestCase): + def test_noop(self): + q = Q(name="foo") + self.assertEqual(str(q), "@name='foo'") + + def test_not(self): + q = Q(name__not="foo") + self.assertEqual(str(q), "not(@name='foo')") + + def test_eq(self): + q = Q(name__eq="foo") + self.assertEqual(str(q), "@name='foo'") + + def test_not_eq(self): + q = Q(name__not__eq="foo") + self.assertEqual(str(q), "not(@name='foo')") + + def test_contains(self): + q = Q(name__contains="foo") + self.assertEqual(str(q), "contains(@name, 'foo')") + + def test_and(self): + q1 = Q(name="foo") + q2 = Q(name="bar") + q = q1 & q2 + self.assertEqual(str(q), "@name='foo' and @name='bar'") + + q3 = Q(name="baz") + q = q & q3 + self.assertEqual(str(q), "@name='foo' and @name='bar' and @name='baz'") + + def test_or(self): + q1 = Q(name="foo") + q2 = Q(name="bar") + q = q1 | q2 + self.assertEqual(str(q), "@name='foo' or @name='bar'") + + q3 = Q(name="baz") + q = q | q3 + self.assertEqual(str(q), "@name='foo' or @name='bar' or @name='baz'") + + def test_and_or(self): + q1 = Q(name="foo") + q2 = Q(name="bar") + q = q1 & q2 + self.assertEqual(str(q), "@name='foo' and @name='bar'") + + q3 = Q(name="baz") + q = q | q3 + self.assertEqual(str(q), "(@name='foo' and @name='bar') or @name='baz'") + + q4 = Q(name="xyz") + q = q | q4 + self.assertEqual(str(q), "(@name='foo' and @name='bar') or @name='baz' or @name='xyz'") + + def test_or_and(self): + q1 = Q(name="foo") + q2 = Q(name="bar") + q = q1 | q2 + self.assertEqual(str(q), "@name='foo' or @name='bar'") + + q3 = Q(name="baz") + q = q & q3 + self.assertEqual(str(q), "(@name='foo' or @name='bar') and @name='baz'") + + q4 = Q(name="xyz") + q = q & q4 + self.assertEqual(str(q), "(@name='foo' or @name='bar') and @name='baz' and @name='xyz'") + + def test_and_or_and(self): + q1 = Q(name="foo") + q2 = Q(name="bar") + q3 = Q(name="baz") + q4 = Q(name="xyz") + q = (q1 & q2) | (q3 & q4) + self.assertEqual(str(q), "(@name='foo' and @name='bar') or (@name='baz' and @name='xyz')") + + def test_or_and_or(self): + q1 = Q(name="foo") + q2 = Q(name="bar") + q3 = Q(name="baz") + q4 = Q(name="xyz") + q = (q1 | q2) & (q3 | q4) + self.assertEqual(str(q), "(@name='foo' or @name='bar') and (@name='baz' or @name='xyz')") + + def test_multiple_kwargs(self): + q = Q(name1="foo", name2="bar") + self.assertEqual(str(q), "@name1='foo' and @name2='bar'") + + def test_eq_list(self): + q = Q(name=["foo", "bar", "baz"]) + self.assertEqual(str(q), "@name='foo' or @name='bar' or @name='baz'") + + def test_not_eq_list(self): + q = Q(name__not=["foo", "bar", "baz"]) + self.assertEqual(str(q), "not(@name='foo') and not(@name='bar') and not(@name='baz')") + + def test_review_state(self): + q = Q(state__name=["new"]) + self.assertEqual(str(q), "state[@name='new']") + + +if __name__ == "__main__": + unittest.main()