mirror of
https://github.com/openSUSE/osc.git
synced 2025-09-07 21:58:41 +02:00
Merge pull request #1537 from dmach/osc-log-patch
Support osc log --patch
This commit is contained in:
114
behave/features/log.feature
Normal file
114
behave/features/log.feature
Normal file
@@ -0,0 +1,114 @@
|
||||
Feature: `osc log` command
|
||||
|
||||
|
||||
Scenario: Run `osc log` on a package
|
||||
Given I execute osc with args "log test:factory/test-pkgA"
|
||||
Then the exit code is 0
|
||||
And stdout matches
|
||||
"""
|
||||
----------------------------------------------------------------------------
|
||||
r3 | Admin | ....-..-.. ..:..:.. | dc997133b8ddfaf084b471b05c2643b3 | 3 |
|
||||
|
||||
Version 3
|
||||
----------------------------------------------------------------------------
|
||||
r2 | Admin | ....-..-.. ..:..:.. | 0ea55feb9cdd741ba7f523ed58a4f099 | 2 |
|
||||
|
||||
Version 2
|
||||
----------------------------------------------------------------------------
|
||||
r1 | Admin | ....-..-.. ..:..:.. | e675755e79e0d69483d311e96d6b719e | 1 |
|
||||
|
||||
Initial commit
|
||||
----------------------------------------------------------------------------
|
||||
"""
|
||||
|
||||
|
||||
Scenario: Run `osc log` on single revision of a package
|
||||
Given I execute osc with args "log test:factory/test-pkgA --revision=2"
|
||||
Then the exit code is 0
|
||||
And stdout matches
|
||||
"""
|
||||
----------------------------------------------------------------------------
|
||||
r2 | Admin | ....-..-.. ..:..:.. | 0ea55feb9cdd741ba7f523ed58a4f099 | 2 |
|
||||
|
||||
Version 2
|
||||
----------------------------------------------------------------------------
|
||||
"""
|
||||
|
||||
|
||||
Scenario: Run `osc log` on revision range of a package
|
||||
Given I execute osc with args "log test:factory/test-pkgA --revision=1:2"
|
||||
Then the exit code is 0
|
||||
And stdout matches
|
||||
"""
|
||||
----------------------------------------------------------------------------
|
||||
r2 | Admin | ....-..-.. ..:..:.. | 0ea55feb9cdd741ba7f523ed58a4f099 | 2 |
|
||||
|
||||
Version 2
|
||||
----------------------------------------------------------------------------
|
||||
r1 | Admin | ....-..-.. ..:..:.. | e675755e79e0d69483d311e96d6b719e | 1 |
|
||||
|
||||
Initial commit
|
||||
----------------------------------------------------------------------------
|
||||
"""
|
||||
|
||||
|
||||
@wip
|
||||
Scenario: Run `osc log --patch` on revision range of a package
|
||||
Given I execute osc with args "log test:factory/test-pkgA --revision=1:2 --patch"
|
||||
Then the exit code is 0
|
||||
And stdout matches
|
||||
"""
|
||||
----------------------------------------------------------------------------
|
||||
r2 \| Admin \| ....-..-.. ..:..:.. \| 0ea55feb9cdd741ba7f523ed58a4f099 \| 2 \|
|
||||
|
||||
Version 2
|
||||
|
||||
|
||||
changes files:
|
||||
--------------
|
||||
--- test-pkgA.changes
|
||||
\+\+\+ test-pkgA.changes
|
||||
@@ -2 \+2 @@
|
||||
-Tue Jan 4 11:22:33 UTC 2022 - Geeko Packager <email@example.com>
|
||||
\+Mon Jan 3 11:22:33 UTC 2022 - Geeko Packager <email@example.com>
|
||||
@@ -4 \+4 @@
|
||||
-- Release upstream version 2
|
||||
\+- Release upstream version 1
|
||||
|
||||
spec files:
|
||||
-----------
|
||||
--- test-pkgA.spec
|
||||
\+\+\+ test-pkgA.spec
|
||||
@@ -1,5 \+1,5 @@
|
||||
Name: test-pkgA
|
||||
-Version: 2
|
||||
\+Version: 1
|
||||
Release: 1
|
||||
License: GPL-2.0
|
||||
Summary: Test package
|
||||
|
||||
----------------------------------------------------------------------------
|
||||
r1 \| Admin \| ....-..-.. ..:..:.. \| e675755e79e0d69483d311e96d6b719e \| 1 \|
|
||||
|
||||
Initial commit
|
||||
|
||||
|
||||
changes files:
|
||||
--------------
|
||||
|
||||
\+\+\+\+\+\+ deleted changes files:
|
||||
--- test-pkgA.changes
|
||||
|
||||
old:
|
||||
----
|
||||
test-pkgA.changes
|
||||
test-pkgA.spec
|
||||
|
||||
spec files:
|
||||
-----------
|
||||
|
||||
\+\+\+\+\+\+ deleted spec files:
|
||||
--- test-pkgA.spec
|
||||
|
||||
|
||||
"""
|
@@ -39,6 +39,7 @@ from .core import *
|
||||
from .grabber import OscFileGrabber
|
||||
from .meter import create_text_meter
|
||||
from .output import get_user_input
|
||||
from .output import pipe_to_pager
|
||||
from .util import cpio, rpmquery, safewriter
|
||||
from .util.helper import _html_escape, format_table
|
||||
|
||||
@@ -7661,13 +7662,15 @@ Please submit there instead, or use --nodevelproject to force direct submission.
|
||||
|
||||
@cmdln.option('-r', '--revision', metavar='rev',
|
||||
help='show log of the specified revision')
|
||||
@cmdln.option("-p", "--patch", action="store_true",
|
||||
help='show patch for each revision; NOTE: use this option carefully because it loads patches on demand in a pager')
|
||||
@cmdln.option('', '--csv', action='store_true',
|
||||
help='generate output in CSV (separated by |)')
|
||||
help='generate output in CSV')
|
||||
@cmdln.option('', '--xml', action='store_true',
|
||||
help='generate output in XML')
|
||||
@cmdln.option('-D', '--deleted', action='store_true',
|
||||
@cmdln.option('-D', '--deleted', action='store_true', default=None,
|
||||
help='work on deleted package')
|
||||
@cmdln.option('-M', '--meta', action='store_true',
|
||||
@cmdln.option('-M', '--meta', action='store_true', default=None,
|
||||
help='checkout out meta data instead of sources')
|
||||
def do_log(self, subcmd, opts, *args):
|
||||
"""
|
||||
@@ -7695,8 +7698,8 @@ Please submit there instead, or use --nodevelproject to force direct submission.
|
||||
if opts.xml:
|
||||
format = 'xml'
|
||||
|
||||
log = '\n'.join(get_commitlog(apiurl, project, package, rev, format, opts.meta, opts.deleted, rev_upper))
|
||||
run_pager(log)
|
||||
lines = get_commitlog(apiurl, project, package, rev, format, opts.meta, opts.deleted, rev_upper, patch=opts.patch)
|
||||
pipe_to_pager(lines, add_newlines=True)
|
||||
|
||||
@cmdln.option('-v', '--verbose', action='store_true',
|
||||
help='verbose run of local services for debugging purposes')
|
||||
|
201
osc/core.py
201
osc/core.py
@@ -6,6 +6,7 @@
|
||||
|
||||
import codecs
|
||||
import copy
|
||||
import csv
|
||||
import datetime
|
||||
import difflib
|
||||
import errno
|
||||
@@ -77,6 +78,8 @@ from .obs_scm.store import store_write_initial_packages
|
||||
from .obs_scm.store import store_write_last_buildroot
|
||||
from .obs_scm.store import store_write_project
|
||||
from .obs_scm.store import store_write_string
|
||||
from .output import get_default_pager
|
||||
from .output import run_pager
|
||||
from .output import sanitize_text
|
||||
from .util import xdg
|
||||
from .util.helper import decode_list, decode_it, raw_input, _html_escape
|
||||
@@ -1915,16 +1918,6 @@ def get_default_editor():
|
||||
return 'vi'
|
||||
|
||||
|
||||
def get_default_pager():
|
||||
system = platform.system()
|
||||
if system == 'Linux':
|
||||
dist = _get_linux_distro()
|
||||
if dist == 'debian':
|
||||
return 'pager'
|
||||
return 'less'
|
||||
return 'more'
|
||||
|
||||
|
||||
def format_diff_line(line):
|
||||
if line.startswith(b"+++") or line.startswith(b"---") or line.startswith(b"Index:"):
|
||||
line = b"\x1b[1m" + line + b"\x1b[0m"
|
||||
@@ -1943,41 +1936,6 @@ def highlight_diff(diff):
|
||||
return diff
|
||||
|
||||
|
||||
def run_pager(message, tmp_suffix=''):
|
||||
if not message:
|
||||
return
|
||||
|
||||
if not sys.stdout.isatty():
|
||||
if isinstance(message, str):
|
||||
print(message)
|
||||
else:
|
||||
sys.stdout.buffer.write(message)
|
||||
else:
|
||||
tmpfile = tempfile.NamedTemporaryFile(suffix=tmp_suffix)
|
||||
if isinstance(message, str):
|
||||
tmpfile.write(bytes(message, 'utf-8'))
|
||||
else:
|
||||
tmpfile.write(message)
|
||||
tmpfile.flush()
|
||||
|
||||
env = os.environ.copy()
|
||||
|
||||
pager = os.getenv("PAGER", default="").strip()
|
||||
pager = pager or get_default_pager()
|
||||
|
||||
# LESS env is not always set and we need -R to display escape sequences properly
|
||||
less_opts = os.getenv("LESS", default="")
|
||||
if "-R" not in less_opts:
|
||||
less_opts += " -R"
|
||||
env["LESS"] = less_opts
|
||||
|
||||
cmd = shlex.split(pager) + [tmpfile.name]
|
||||
try:
|
||||
run_external(*cmd, env=env)
|
||||
finally:
|
||||
tmpfile.close()
|
||||
|
||||
|
||||
def run_editor(filename):
|
||||
cmd = _editor_command()
|
||||
cmd.append(filename)
|
||||
@@ -4700,82 +4658,97 @@ def print_jobhistory(apiurl: str, prj: str, current_package: str, repository: st
|
||||
|
||||
|
||||
def get_commitlog(
|
||||
apiurl: str, prj: str, package: str, revision, format="text", meta=False, deleted=False, revision_upper=None
|
||||
apiurl: str,
|
||||
prj: str,
|
||||
package: str,
|
||||
revision: Optional[str],
|
||||
format: str = "text",
|
||||
meta: Optional[bool] = None,
|
||||
deleted: Optional[bool] = None,
|
||||
revision_upper: Optional[str] = None,
|
||||
patch: Optional[bool] = None,
|
||||
):
|
||||
if package is None:
|
||||
package = "_project"
|
||||
|
||||
query = {}
|
||||
if deleted:
|
||||
query['deleted'] = 1
|
||||
if meta:
|
||||
query['meta'] = 1
|
||||
from . import obs_api
|
||||
revision_list = obs_api.Package.get_revision_list(apiurl, prj, package, deleted=deleted, meta=meta)
|
||||
|
||||
u = makeurl(apiurl, ['source', prj, package, '_history'], query)
|
||||
f = http_GET(u)
|
||||
root = ET.parse(f).getroot()
|
||||
|
||||
r = []
|
||||
if format == 'xml':
|
||||
r.append('<?xml version="1.0"?>')
|
||||
r.append('<log>')
|
||||
revisions = root.findall('revision')
|
||||
revisions.reverse()
|
||||
for node in revisions:
|
||||
srcmd5 = node.find('srcmd5').text
|
||||
try:
|
||||
rev = int(node.get('rev'))
|
||||
# vrev = int(node.get('vrev')) # what is the meaning of vrev?
|
||||
try:
|
||||
if not revision_is_empty(revision) and revision_upper is not None:
|
||||
if rev > int(revision_upper) or rev < int(revision):
|
||||
continue
|
||||
elif not revision_is_empty(revision) and rev != int(revision):
|
||||
continue
|
||||
except ValueError:
|
||||
if revision != srcmd5:
|
||||
continue
|
||||
except ValueError:
|
||||
# this part should _never_ be reached but...
|
||||
return ['an unexpected error occured - please file a bug']
|
||||
version = node.find('version').text
|
||||
user = node.find('user').text
|
||||
try:
|
||||
comment = node.find('comment').text.encode(locale.getpreferredencoding(), 'replace')
|
||||
except:
|
||||
comment = b'<no message>'
|
||||
try:
|
||||
requestid = node.find('requestid').text.encode(locale.getpreferredencoding(), 'replace')
|
||||
except:
|
||||
requestid = ""
|
||||
t = time.gmtime(int(node.find('time').text))
|
||||
t = time.strftime('%Y-%m-%d %H:%M:%S', t)
|
||||
|
||||
if format == 'csv':
|
||||
s = '%s|%s|%s|%s|%s|%s|%s' % (rev, user, t, srcmd5, version,
|
||||
decode_it(comment).replace('\\', '\\\\').replace('\n', '\\n').replace('|', '\\|'), requestid)
|
||||
r.append(s)
|
||||
elif format == 'xml':
|
||||
r.append('<logentry')
|
||||
r.append(f' revision="{rev}" srcmd5="{srcmd5}">')
|
||||
r.append(f'<author>{user}</author>')
|
||||
r.append(f'<date>{t}</date>')
|
||||
r.append(f'<requestid>{requestid}</requestid>')
|
||||
r.append(f'<msg>{_private.api.xml_escape(decode_it(comment))}</msg>')
|
||||
r.append('</logentry>')
|
||||
# TODO: consider moving the following block to Package.get_revision_list()
|
||||
# keep only entries matching the specified revision
|
||||
if not revision_is_empty(revision):
|
||||
if isinstance(revision, str) and len(revision) == 32:
|
||||
# revision is srcmd5
|
||||
revision_list = [i for i in revision_list if i.srcmd5 == revision]
|
||||
else:
|
||||
if requestid:
|
||||
requestid = decode_it(b"rq" + requestid)
|
||||
s = '-' * 76 + \
|
||||
f'\nr{rev} | {user} | {t} | {srcmd5} | {version} | {requestid}\n' + \
|
||||
'\n' + decode_it(comment)
|
||||
r.append(s)
|
||||
revision = int(revision)
|
||||
if revision_is_empty(revision_upper):
|
||||
revision_list = [i for i in revision_list if i.rev == revision]
|
||||
else:
|
||||
revision_upper = int(revision_upper)
|
||||
revision_list = [i for i in revision_list if i.rev <= revision_upper and i.rev >= revision]
|
||||
|
||||
if format not in ['csv', 'xml']:
|
||||
r.append('-' * 76)
|
||||
if format == 'xml':
|
||||
r.append('</log>')
|
||||
return r
|
||||
if format == "csv":
|
||||
f = io.StringIO()
|
||||
writer = csv.writer(f, dialect="unix")
|
||||
for revision in reversed(revision_list):
|
||||
writer.writerow(
|
||||
(
|
||||
revision.rev,
|
||||
revision.user,
|
||||
revision.get_time_str(),
|
||||
revision.srcmd5,
|
||||
revision.comment,
|
||||
revision.requestid,
|
||||
)
|
||||
)
|
||||
f.seek(0)
|
||||
yield from f.read().splitlines()
|
||||
return
|
||||
|
||||
if format == "xml":
|
||||
root = ET.Element("log")
|
||||
for revision in reversed(revision_list):
|
||||
entry = ET.SubElement(root, "logentry")
|
||||
entry.attrib["revision"] = str(revision.rev)
|
||||
entry.attrib["srcmd5"] = revision.srcmd5
|
||||
ET.SubElement(entry, "author").text = revision.user
|
||||
ET.SubElement(entry, "date").text = revision.get_time_str()
|
||||
ET.SubElement(entry, "requestid").text = str(revision.requestid) if revision.requestid else ""
|
||||
ET.SubElement(entry, "msg").text = revision.comment or ""
|
||||
xmlindent(root)
|
||||
yield from ET.tostring(root, encoding="utf-8").decode("utf-8").splitlines()
|
||||
return
|
||||
|
||||
if format == "text":
|
||||
for revision in reversed(revision_list):
|
||||
entry = (
|
||||
f"r{revision.rev}",
|
||||
revision.user,
|
||||
revision.get_time_str(),
|
||||
revision.srcmd5,
|
||||
revision.version,
|
||||
f"rq{revision.requestid}" if revision.requestid else ""
|
||||
)
|
||||
yield 76 * "-"
|
||||
yield " | ".join(entry)
|
||||
yield ""
|
||||
yield revision.comment or "<no message>"
|
||||
yield ""
|
||||
if patch:
|
||||
rdiff = server_diff_noex(
|
||||
apiurl,
|
||||
prj,
|
||||
package,
|
||||
revision.rev,
|
||||
prj,
|
||||
package,
|
||||
revision.rev - 1,
|
||||
)
|
||||
yield highlight_diff(rdiff).decode("utf-8", errors="replace")
|
||||
return
|
||||
|
||||
raise ValueError(f"Invalid format: {format}")
|
||||
|
||||
|
||||
def runservice(apiurl: str, prj: str, package: str):
|
||||
|
@@ -3,6 +3,7 @@ from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-i
|
||||
from .flag import Flag
|
||||
from .group_role import GroupRole
|
||||
from .package_devel import PackageDevel
|
||||
from .package_revision import PackageRevision
|
||||
from .person_role import PersonRole
|
||||
from .simple_flag import SimpleFlag
|
||||
from .status import Status
|
||||
@@ -126,3 +127,20 @@ class Package(XmlModel):
|
||||
}
|
||||
response = cls.xml_request("POST", apiurl, url_path, url_query)
|
||||
return Status.from_file(response, apiurl=apiurl)
|
||||
|
||||
@classmethod
|
||||
def get_revision_list(cls, apiurl: str, project: str, package: str, deleted: Optional[bool] = None, meta: Optional[bool] = None):
|
||||
from xml.etree import ElementTree as ET
|
||||
|
||||
url_path = ["source", project, package, "_history"]
|
||||
url_query = {
|
||||
"meta": meta,
|
||||
"deleted": deleted,
|
||||
}
|
||||
response = cls.xml_request("GET", apiurl, url_path, url_query)
|
||||
root = ET.parse(response).getroot()
|
||||
assert root.tag == "revisionlist"
|
||||
result = []
|
||||
for node in root:
|
||||
result.append(PackageRevision.from_xml(node, apiurl=apiurl))
|
||||
return result
|
||||
|
35
osc/obs_api/package_revision.py
Normal file
35
osc/obs_api/package_revision.py
Normal file
@@ -0,0 +1,35 @@
|
||||
from ..util.models import * # pylint: disable=wildcard-import,unused-wildcard-import
|
||||
|
||||
|
||||
class PackageRevision(XmlModel):
|
||||
XML_TAG = "revision"
|
||||
|
||||
rev: int = Field(
|
||||
xml_attribute=True,
|
||||
)
|
||||
|
||||
vrev: Optional[int] = Field(
|
||||
xml_attribute=True,
|
||||
)
|
||||
|
||||
srcmd5: str = Field(
|
||||
)
|
||||
|
||||
version: str = Field(
|
||||
)
|
||||
|
||||
time: int = Field(
|
||||
)
|
||||
|
||||
user: str = Field(
|
||||
)
|
||||
|
||||
comment: Optional[str] = Field(
|
||||
)
|
||||
|
||||
requestid: Optional[int] = Field(
|
||||
)
|
||||
|
||||
def get_time_str(self):
|
||||
import time
|
||||
return time.strftime("%Y-%m-%d %H:%M:%S", time.gmtime(self.time))
|
@@ -1,6 +1,9 @@
|
||||
from .key_value_table import KeyValueTable
|
||||
from .input import get_user_input
|
||||
from .output import get_default_pager
|
||||
from .output import pipe_to_pager
|
||||
from .output import print_msg
|
||||
from .output import run_pager
|
||||
from .output import sanitize_text
|
||||
from .output import safe_print
|
||||
from .output import safe_write
|
||||
|
@@ -1,7 +1,12 @@
|
||||
import os
|
||||
import platform
|
||||
import re
|
||||
import shlex
|
||||
import subprocess
|
||||
import sys
|
||||
import tempfile
|
||||
from typing import Dict
|
||||
from typing import List
|
||||
from typing import Optional
|
||||
from typing import TextIO
|
||||
from typing import Union
|
||||
@@ -140,10 +145,90 @@ def safe_write(file: TextIO, text: Union[str, bytes], *, add_newline: bool = Fal
|
||||
"""
|
||||
text = sanitize_text(text)
|
||||
if isinstance(text, bytes):
|
||||
file.buffer.write(text)
|
||||
if add_newline:
|
||||
file.buffer.write(os.linesep.encode("utf-8"))
|
||||
if hasattr(file, "buffer"):
|
||||
file.buffer.write(text)
|
||||
if add_newline:
|
||||
file.buffer.write(os.linesep.encode("utf-8"))
|
||||
else:
|
||||
# file has no "buffer" attribute, let's try to write the bytes directly
|
||||
file.write(text)
|
||||
if add_newline:
|
||||
file.write(os.linesep.encode("utf-8"))
|
||||
else:
|
||||
file.write(text)
|
||||
if add_newline:
|
||||
file.write(os.linesep)
|
||||
|
||||
|
||||
def get_default_pager():
|
||||
from ..core import _get_linux_distro
|
||||
|
||||
system = platform.system()
|
||||
if system == 'Linux':
|
||||
dist = _get_linux_distro()
|
||||
if dist == 'debian':
|
||||
return 'pager'
|
||||
return 'less'
|
||||
return 'more'
|
||||
|
||||
|
||||
def get_pager():
|
||||
"""
|
||||
Return (pager, env) where
|
||||
``pager`` is a list with parsed pager command
|
||||
``env`` is copy of os.environ() with added variables specific to the pager
|
||||
"""
|
||||
env = os.environ.copy()
|
||||
pager = os.getenv("PAGER", default="").strip()
|
||||
pager = pager or get_default_pager()
|
||||
|
||||
# LESS env is not always set and we need -R to display escape sequences properly
|
||||
less_opts = os.getenv("LESS", default="")
|
||||
if "-R" not in less_opts:
|
||||
less_opts += " -R"
|
||||
env["LESS"] = less_opts
|
||||
|
||||
return shlex.split(pager), env
|
||||
|
||||
|
||||
def run_pager(message: Union[bytes, str], tmp_suffix: str = ""):
|
||||
from ..core import run_external
|
||||
|
||||
if not message:
|
||||
return
|
||||
|
||||
if not tty.IS_INTERACTIVE:
|
||||
safe_write(sys.stdout, message)
|
||||
return
|
||||
|
||||
mode = "w+b" if isinstance(message, bytes) else "w+"
|
||||
with tempfile.NamedTemporaryFile(mode=mode, suffix=tmp_suffix) as tmpfile:
|
||||
safe_write(tmpfile, message)
|
||||
tmpfile.flush()
|
||||
|
||||
pager, env = get_pager()
|
||||
cmd = pager + [tmpfile.name]
|
||||
run_external(*cmd, env=env)
|
||||
|
||||
|
||||
def pipe_to_pager(lines: Union[List[bytes], List[str]], *, add_newlines=False):
|
||||
"""
|
||||
Pipe ``lines`` to the pager.
|
||||
If running in a non-interactive terminal, print the data instead.
|
||||
Add a newline after each line if ``add_newlines`` is ``True``.
|
||||
"""
|
||||
if not tty.IS_INTERACTIVE:
|
||||
for line in lines:
|
||||
safe_write(sys.stdout, line, add_newline=add_newlines)
|
||||
return
|
||||
|
||||
pager, env = get_pager()
|
||||
with subprocess.Popen(pager, stdin=subprocess.PIPE, encoding="utf-8", env=env) as proc:
|
||||
try:
|
||||
for line in lines:
|
||||
safe_write(proc.stdin, line, add_newline=add_newlines)
|
||||
proc.stdin.flush()
|
||||
proc.stdin.close()
|
||||
except BrokenPipeError:
|
||||
pass
|
||||
proc.wait()
|
||||
|
@@ -594,6 +594,8 @@ class XmlModel(BaseModel):
|
||||
return value
|
||||
|
||||
if field.origin_type is int:
|
||||
if not value or not value.strip():
|
||||
return None
|
||||
value = int(value)
|
||||
return value
|
||||
|
||||
|
@@ -185,6 +185,38 @@ class TestXmlModel(unittest.TestCase):
|
||||
self.assertEqual(m.child[1]._apiurl, apiurl)
|
||||
self.assertEqual(m.child[2]._apiurl, apiurl)
|
||||
|
||||
def test_empty_int_optional(self):
|
||||
class TestModel(XmlModel):
|
||||
XML_TAG = "model"
|
||||
num_attr: Optional[int] = Field(xml_attribute=True)
|
||||
num_elem: Optional[int] = Field()
|
||||
|
||||
data = textwrap.dedent(
|
||||
"""
|
||||
<model num_attr="">
|
||||
<num_elem> </num_elem>
|
||||
</model>
|
||||
"""
|
||||
).strip()
|
||||
m = TestModel.from_string(data)
|
||||
self.assertEqual(m.num_attr, None)
|
||||
self.assertEqual(m.num_elem, None)
|
||||
|
||||
def test_empty_int(self):
|
||||
class TestModel(XmlModel):
|
||||
XML_TAG = "model"
|
||||
num_attr: int = Field(xml_attribute=True)
|
||||
num_elem: int = Field()
|
||||
|
||||
data = textwrap.dedent(
|
||||
"""
|
||||
<model num_attr="">
|
||||
<num_elem> </num_elem>
|
||||
</model>
|
||||
"""
|
||||
).strip()
|
||||
self.assertRaises(TypeError, TestModel.from_string, data)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
@@ -1,10 +1,12 @@
|
||||
import contextlib
|
||||
import io
|
||||
import tempfile
|
||||
import unittest
|
||||
|
||||
import osc.conf
|
||||
from osc.output import KeyValueTable
|
||||
from osc.output import print_msg
|
||||
from osc.output import safe_write
|
||||
from osc.output import sanitize_text
|
||||
from osc.output import tty
|
||||
|
||||
@@ -238,5 +240,23 @@ class TestSanitization(unittest.TestCase):
|
||||
self.assertEqual(sanitized, b"0;this is the window title")
|
||||
|
||||
|
||||
class TestSafeWrite(unittest.TestCase):
|
||||
def test_string_to_file(self):
|
||||
with tempfile.NamedTemporaryFile(mode="w+") as f:
|
||||
safe_write(f, "string")
|
||||
|
||||
def test_bytes_to_file(self):
|
||||
with tempfile.NamedTemporaryFile(mode="wb+") as f:
|
||||
safe_write(f, b"bytes")
|
||||
|
||||
def test_string_to_stringio(self):
|
||||
with io.StringIO() as f:
|
||||
safe_write(f, "string")
|
||||
|
||||
def test_bytes_to_bytesio(self):
|
||||
with io.BytesIO() as f:
|
||||
safe_write(f, b"bytes")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
Reference in New Issue
Block a user