1
0
mirror of https://github.com/openSUSE/osc.git synced 2025-01-22 13:06:15 +01:00
github.com_openSUSE_osc/behave/features/steps/common.py

321 lines
8.9 KiB
Python

import errno
import os
import re
import shutil
import subprocess
import behave
def debug(context, *args):
if not context.config.userdata.get("DEBUG", False):
return
msg = " ".join((str(i).strip() for i in args))
print(f"DEBUG: {msg}")
def makedirs(path):
try:
os.makedirs(path)
except OSError as e:
if e.errno != errno.EEXIST:
raise
def run(cmd, shell=True, cwd=None, env=None):
"""
Run a command.
Return exitcode, stdout, stderr
"""
proc = subprocess.Popen(
cmd,
shell=shell,
cwd=cwd,
env=env,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
encoding="utf-8",
universal_newlines=True,
errors="surrogateescape",
)
stdout, stderr = proc.communicate()
return proc.returncode, stdout, stderr
def check_exit_code(context):
# check if the previous command finished successfully
# or if the exit code was tested in a scenario
if not getattr(context, "cmd", None):
return
if context.cmd_exitcode_checked:
return
the_exit_code_is(context, 0)
def run_in_context(context, cmd, can_fail=False, **run_args):
check_exit_code(context)
context.cmd = cmd
if hasattr(context.scenario, "working_dir") and 'cwd' not in run_args:
run_args['cwd'] = context.scenario.working_dir
if hasattr(context.scenario, "PATH"):
env = os.environ.copy()
path = context.scenario.PATH
env["PATH"] = path.replace("$PATH", env["PATH"])
run_args["env"] = env
debug(context, "Running command:", cmd)
context.cmd_exitcode, context.cmd_stdout, context.cmd_stderr = run(cmd, **run_args)
context.cmd_exitcode_checked = False
debug(context, "> return code:", context.cmd_exitcode)
debug(context, "> stdout:", context.cmd_stdout)
debug(context, "> stderr:", context.cmd_stderr)
if not can_fail and context.cmd_exitcode != 0:
raise AssertionError('Running command "%s" failed: %s' % (cmd, context.cmd_exitcode))
@behave.step("stdout contains \"{text}\"")
def step_impl(context, text):
if re.search(text.format(context=context), context.cmd_stdout):
return
raise AssertionError("Stdout doesn't contain expected pattern: %s" % text)
@behave.step("stdout doesn't contain \"{text}\"")
def step_impl(context, text):
if not re.search(text.format(context=context), context.cmd_stdout):
return
raise AssertionError("Stdout is not supposed to contain pattern: %s" % text)
@behave.step("stderr contains \"{text}\"")
def step_impl(context, text):
if re.search(text.format(context=context), context.cmd_stderr):
return
raise AssertionError("Stderr doesn't contain expected pattern: %s" % text)
@behave.step("stderr doesn't contain \"{text}\"")
def step_impl(context, text):
if not re.search(text.format(context=context), context.cmd_stderr):
return
raise AssertionError("Stderr is not supposed to contain pattern: %s" % text)
@behave.step("stdout is")
def step_impl(context):
expected = context.text.format(context=context).rstrip().split("\n")
found = context.cmd_stdout.rstrip().split("\n")
if found == expected:
return
expected_str = "\n".join(expected)
found_str = "\n".join(found)
raise AssertionError(f"Stdout is not:\n{expected_str}\n\nActual stdout:\n{found_str}")
@behave.step("stdout matches")
def step_impl(context):
expected = context.text.format(context=context).rstrip()
found = context.cmd_stdout.rstrip()
if re.match(expected, found, re.MULTILINE):
return
raise AssertionError(f"Stdout doesn't match:\n{expected}\n\nActual stdout:\n{found}")
@behave.step("I search '{pattern}' in stdout and store named groups in '{context_attribute}'")
def step_impl(context, pattern, context_attribute):
pattern = r"{}".format(pattern)
result = []
for match in re.finditer(pattern, context.cmd_stdout):
result.append(match.groupdict())
setattr(context, context_attribute, result)
@behave.step("stderr is")
def step_impl(context):
expected = context.text.format(context=context).rstrip().split("\n")
found = context.cmd_stderr.rstrip().split("\n")
if found == expected:
return
expected_str = "\n".join(expected)
found_str = "\n".join(found)
raise AssertionError(f"Stderr is not:\n{expected_str}\n\nActual stderr:\n{found_str}")
@behave.step('I set working directory to "{path}"')
def step_impl(context, path):
path = path.format(context=context)
context.scenario.working_dir = path
@behave.step('I set PATH to "{path}"')
def step_impl(context, path):
path = path.format(context=context)
context.scenario.PATH = path
@behave.step('I copy file "{source}" to "{destination}"')
def step_impl(context, source, destination):
# substitutions
source = source.format(context=context)
destination = destination.format(context=context)
# if destination is a directory, append the source filename
if destination.endswith("/"):
destination = os.path.join(destination, os.path.basename(source))
# copy file without attributes
makedirs(os.path.dirname(destination))
shutil.copyfile(source, destination)
@behave.step('file "{path}" exists')
def step_impl(context, path):
path = path.format(context=context)
if not os.path.isfile(path):
raise AssertionError(f"File doesn't exist: {path}")
@behave.step('file "{path}" does not exist')
def step_impl(context, path):
path = path.format(context=context)
if os.path.isfile(path):
raise AssertionError(f"File exists: {path}")
@behave.step('file "{one}" is identical to "{two}"')
def step_impl(context, one, two):
one = one.format(context=context)
two = two.format(context=context)
data_one = open(one, "r").read()
data_two = open(two, "r").read()
if data_one != data_two:
raise AssertionError(f"Files differ: {one} != {two}")
@behave.step('directory "{path}" exists')
def step_impl(context, path):
path = path.format(context=context)
if not os.path.isdir(path):
raise AssertionError(f"Directory doesn't exist: {path}")
@behave.step('directory "{path}" does not exist')
def step_impl(context, path):
path = path.format(context=context)
if os.path.isdir(path):
raise AssertionError(f"Directory exists: {path}")
@behave.step('I create file "{path}" with perms "{mode}"')
def step_impl(context, path, mode):
path = path.format(context=context)
mode = int(mode, 8)
content = context.text.format(context=context).rstrip()
makedirs(os.path.dirname(path))
open(path, "w").write(content)
os.chmod(path, mode)
@behave.step("the exit code is {exitcode}")
def the_exit_code_is(context, exitcode):
if context.cmd_exitcode != int(exitcode):
lines = [
f"Command has exited with code {context.cmd_exitcode}: {context.cmd}",
"> stdout:",
context.cmd_stdout.strip(),
"",
"> stderr:",
context.cmd_stderr.strip(),
]
raise AssertionError("\n".join(lines))
context.cmd_exitcode_checked = True
@behave.step('directory listing of "{path}" is')
def step_impl(context, path):
path = path.format(context=context)
expected = context.text.format(context=context).rstrip().split('\n')
expected = [i for i in expected if i.strip()]
found = os.listdir(path)
expected = set(expected)
found = set(found)
if found == expected:
return
extra = sorted(set(found) - set(expected))
missing = sorted(set(expected) - set(found))
msg = []
if extra:
msg.append("Unexpected files found on disk:")
for fn in extra:
msg.append(f" {fn}")
if missing:
msg.append("Files missing on disk:")
for fn in missing:
msg.append(f" {fn}")
msg = "\n".join(msg)
raise AssertionError(f"Directory listing does not match:\n{msg}")
@behave.step('directory tree in "{path}" is')
def step_impl(context, path):
path = path.format(context=context)
path = os.path.abspath(path)
expected = context.text.format(context=context).rstrip().split('\n')
expected = [i for i in expected if i.strip()]
found = []
for root, dirs, files in os.walk(path):
for fn in files:
file_abspath = os.path.join(root, fn)
file_relpath = file_abspath[len(path) + 1:]
found.append(file_relpath)
# found = os.listdir(path)
expected = set(expected)
found = set(found)
if found == expected:
return
extra = sorted(set(found) - set(expected))
missing = sorted(set(expected) - set(found))
msg = []
if extra:
msg.append("Unexpected files found on disk:")
for fn in extra:
msg.append(f" {fn}")
if missing:
msg.append("Files missing on disk:")
for fn in missing:
msg.append(f" {fn}")
msg = "\n".join(msg)
raise AssertionError(f"Directory listing does not match:\n{msg}")