Accepting request 105040 from home:aspiers:branches:openSUSE:Tools

Mon Feb  13 15:52:19 GMT 2012 - aspiers@suse.com
Add test suite and fix two bugs it found:

  1. --subdir was not working
  2. --scm bzr was not working

FWIW it also works on SLE11 now.

I will issue a separate request for my enhancements to
tar_scm, since they are much more intrusive (but have
about 90% test coverage).

OBS-URL: https://build.opensuse.org/request/show/105040
OBS-URL: https://build.opensuse.org/package/show/openSUSE:Tools/obs-service-tar_scm?expand=0&rev=32
This commit is contained in:
Adrian Schröter 2012-02-15 16:34:15 +00:00 committed by Git OBS Bridge
parent 81a2059b9f
commit e7fbd5920f
19 changed files with 1009 additions and 34 deletions

30
bzrfixtures.py Normal file
View File

@ -0,0 +1,30 @@
#!/usr/bin/python
import os
from fixtures import Fixtures
from utils import mkfreshdir, run_bzr
class BzrFixtures(Fixtures):
def init(self):
self.create_repo()
self.create_commits(2)
def run(self, cmd):
return run_bzr(self.repo_path, cmd)
def create_repo(self):
os.makedirs(self.repo_path)
os.chdir(self.repo_path)
self.run('init')
self.run('whoami "%s"' % self.name_and_email)
self.wd = self.repo_path
print "created repo", self.repo_path
def do_commit(self, newly_created):
self.run('add .')
self.run('commit -m%d' % self.next_commit_rev)
def record_rev(self, rev_num):
self.revs[rev_num] = str(rev_num)
self.scmlogs.annotate("Recorded rev %d" % rev_num)

15
bzrtests.py Normal file
View File

@ -0,0 +1,15 @@
#!/usr/bin/python
from commontests import CommonTests
from bzrfixtures import BzrFixtures
from utils import run_bzr
class BzrTests(CommonTests):
scm = 'bzr'
initial_clone_command = 'bzr checkout'
update_cache_command = 'bzr update'
fixtures_class = BzrFixtures
def default_version(self):
return self.rev(2)

176
commontests.py Normal file
View File

@ -0,0 +1,176 @@
#!/usr/bin/python
from pprint import pprint, pformat
from testassertions import TestAssertions
from testenv import TestEnvironment
from utils import mkfreshdir
class CommonTests(TestEnvironment, TestAssertions):
def basename(self, name='repo', version=None):
if version is None:
version = self.default_version()
return '%s-%s' % (name, version)
def test_plain(self):
self.tar_scm_std()
self.assertTarOnly(self.basename())
def test_exclude(self):
self.tar_scm_std('--exclude', '.' + self.scm)
self.assertTarOnly(self.basename())
def test_subdir(self):
self.tar_scm_std('--subdir', self.fixtures.subdir)
self.assertTarOnly(self.basename(), tarchecker=self.assertSubdirTar)
def test_history_depth_obsolete(self):
(stdout, stderr, ret) = self.tar_scm_std('--history-depth', '1')
self.assertRegexpMatches(stdout, 'obsolete')
# self.assertTarOnly(self.basename())
# self.assertRegexpMatches(self.scmlogs.read()[0], '^%s clone --depth=1')
# def test_history_depth_full(self):
# self.tar_scm_std('--history-depth', 'full')
# self.assertTarOnly(self.basename())
# self.assertRegexpMatches(self.scmlogs.read()[0], '^git clone --depth=999999+')
def test_filename(self):
name = 'myfilename'
self.tar_scm_std('--filename', name)
self.assertTarOnly(self.basename(name=name))
def test_version(self):
version = '0.5'
self.tar_scm_std('--version', version)
self.assertTarOnly(self.basename(version=version))
def test_filename_version(self):
filename = 'myfilename'
version = '0.6'
self.tar_scm_std('--filename', filename, '--version', version)
self.assertTarOnly(self.basename(filename, version))
def test_revision_nop(self):
self.tar_scm_std('--revision', self.rev(2))
th = self.assertTarOnly(self.basename())
self.assertTarMemberContains(th, self.basename() + '/a', '2')
def test_revision(self):
self.fixtures.create_commits(2)
self.tar_scm_std('--revision', self.rev(2))
th = self.assertTarOnly(self.basename())
self.assertTarMemberContains(th, self.basename() + '/a', '2')
def test_revision_no_cache(self):
self._revision(use_cache=False)
def test_revision_subdir_no_cache(self):
self._revision(use_cache=False, use_subdir=True)
def _revision(self, use_cache=True, use_subdir=False):
version = '3.0'
args_tag2 = [
'--version', version,
'--revision', self.rev(2),
]
if use_subdir:
args_tag2 += [ '--subdir', self.fixtures.subdir ]
self._sequential_calls_with_revision(
version,
[
(0, args_tag2, '2', False),
(0, args_tag2, '2', use_cache),
(2, args_tag2, '2', use_cache),
(0, args_tag2, '2', use_cache),
(2, args_tag2, '2', use_cache),
(0, args_tag2, '2', use_cache),
],
use_cache
)
def test_revision_master_alternating_no_cache(self):
self._revision_master_alternating(use_cache=False)
def test_revision_master_alternating_subdir_no_cache(self):
self._revision_master_alternating(use_cache=False, use_subdir=True)
def _revision_master_alternating(self, use_cache=True, use_subdir=False):
version = '4.0'
args_head = [
'--version', version,
]
if use_subdir:
args_head += [ '--subdir', self.fixtures.subdir ]
args_tag2 = args_head + [ '--revision', self.rev(2) ]
self._sequential_calls_with_revision(
version,
[
(0, args_tag2, '2', False),
(0, args_head, '2', use_cache),
(2, args_tag2, '2', use_cache),
(0, args_head, '4', use_cache),
(2, args_tag2, '2', use_cache),
(0, args_head, '6', use_cache),
(0, args_tag2, '2', use_cache),
],
use_cache
)
def _sequential_calls_with_revision(self, version, calls, use_cache=True):
mkfreshdir(self.pkgdir)
basename = self.basename(version = version)
if not use_cache:
self.disableCache()
while calls:
new_commits, args, expected, expect_cache_hit = calls.pop(0)
if new_commits > 0:
self.fixtures.create_commits(new_commits)
self.scmlogs.annotate("about to run: " + pformat(args))
self.scmlogs.annotate("expecting tar to contain: " + expected)
self.tar_scm_std(*args)
logpath = self.scmlogs.current_log_path
loglines = self.scmlogs.read()
if expect_cache_hit:
self.assertRanUpdate(logpath, loglines)
else:
self.assertRanInitialClone(logpath, loglines)
if self.fixtures.subdir in args:
th = self.assertTarOnly(basename, tarchecker=self.assertSubdirTar)
tarent = 'b'
else:
th = self.assertTarOnly(basename)
tarent = 'a'
self.assertTarMemberContains(th, basename + '/' + tarent, expected)
self.scmlogs.next()
self.postRun()
def test_switch_revision_and_subdir_no_cache(self):
self._switch_revision_and_subdir(use_cache=False)
def _switch_revision_and_subdir(self, use_cache=True):
version = '5.0'
args = [
'--version', version,
]
args_subdir = args+ [ '--subdir', self.fixtures.subdir ]
args_tag2 = args + [ '--revision', self.rev(2) ]
self._sequential_calls_with_revision(
version,
[
(0, args_tag2, '2', False),
(0, args_subdir, '2', use_cache and self.scm != 'svn'),
(2, args_tag2, '2', use_cache),
(0, args_subdir, '4', use_cache),
(2, args_tag2, '2', use_cache),
(0, args_subdir, '6', use_cache),
(0, args_tag2, '2', use_cache),
],
use_cache
)

78
fixtures.py Normal file
View File

@ -0,0 +1,78 @@
#!/usr/bin/python
import os
import shutil
class Fixtures:
name = 'tar_scm test suite'
email = 'root@localhost'
name_and_email = '%s <%s>' % (name, email)
subdir = 'subdir'
subdir1 = 'subdir1'
subdir2 = 'subdir2'
next_commit_rev = 1
def __init__(self, container_dir, scmlogs):
self.container_dir = container_dir
self.scmlogs = scmlogs
self.repo_path = self.container_dir + '/repo'
self.repo_url = 'file://' + self.repo_path
# Keys are stringified integers representing commit sequence numbers;
# values can be passed to --revision
self.revs = { }
def setup(self):
print self.__class__.__name__ + ": setting up fixtures"
self.init_fixtures_dir()
self.init()
def init_fixtures_dir(self):
if os.path.exists(self.repo_path):
shutil.rmtree(self.repo_path)
def init(self):
raise NotImplementedError, \
self.__class__.__name__ + " didn't implement init()"
def create_commits(self, num_commits):
self.scmlogs.annotate("Creating %d commits ..." % num_commits)
if num_commits == 0:
return
for i in xrange(0, num_commits):
new_rev = self.create_commit()
self.record_rev(new_rev)
self.scmlogs.annotate("Created %d commits; now at %s" % (num_commits, new_rev))
def create_commit(self):
os.chdir(self.wd)
newly_created = self.prep_commit()
self.do_commit(newly_created)
new_rev = self.next_commit_rev
self.next_commit_rev += 1
return new_rev
def prep_commit(self):
"""
Caller should ensure correct cwd.
Returns list of newly created files.
"""
newly_created = [ ]
if not os.path.exists('a'):
newly_created.append('a')
if not os.path.exists(self.subdir):
os.mkdir(self.subdir)
# This will take care of adding subdir/b too
newly_created.append(self.subdir)
for fn in ('a', self.subdir + '/b'):
f = open(fn, 'w')
f.write(str(self.next_commit_rev))
f.close()
return newly_created

48
gitfixtures.py Normal file
View File

@ -0,0 +1,48 @@
#!/usr/bin/python
import os
from fixtures import Fixtures
from utils import mkfreshdir, run_git
class GitFixtures(Fixtures):
def init(self):
self.create_repo()
self.timestamps = { }
self.sha1s = { }
self.create_commits(2)
def run(self, cmd):
return run_git(self.repo_path, cmd)
def create_repo(self):
os.makedirs(self.repo_path)
os.chdir(self.repo_path)
self.run('init')
self.run('config user.name test')
self.run('config user.email test@test.com')
self.wd = self.repo_path
print "created repo", self.repo_path
def do_commit(self, newly_created):
self.run('add .')
self.run('commit -m%d' % self.next_commit_rev)
def get_metadata(self, formatstr):
return self.run('log -n1 --pretty=format:"%s"' % formatstr)[0]
def record_rev(self, rev_num):
tag = 'tag' + str(rev_num)
self.run('tag ' + tag)
self.revs[rev_num] = tag
self.timestamps[tag] = self.get_metadata('%at')
self.sha1s[tag] = self.get_metadata('%h')
self.scmlogs.annotate(
"Recorded rev %d: id %s, timestamp %s, SHA1 %s" % \
(rev_num,
tag,
self.timestamps[tag],
self.sha1s[tag])
)

15
gittests.py Normal file
View File

@ -0,0 +1,15 @@
#!/usr/bin/python
from commontests import CommonTests
from gitfixtures import GitFixtures
from utils import run_git
class GitTests(CommonTests):
scm = 'git'
initial_clone_command = 'git clone'
update_cache_command = 'git fetch'
fixtures_class = GitFixtures
def default_version(self):
return self.timestamps(self.rev(2))

48
hgfixtures.py Normal file
View File

@ -0,0 +1,48 @@
#!/usr/bin/python
import os
from fixtures import Fixtures
from utils import mkfreshdir, run_hg
class HgFixtures(Fixtures):
def init(self):
self.create_repo()
self.timestamps = { }
self.sha1s = { }
self.create_commits(2)
def run(self, cmd):
return run_hg(self.repo_path, cmd)
def create_repo(self):
os.makedirs(self.repo_path)
os.chdir(self.repo_path)
self.run('init')
c = open('.hg/hgrc', 'w')
c.write("[ui]\nusername = %s\n" % self.name_and_email)
c.close()
self.wd = self.repo_path
print "created repo", self.repo_path
def do_commit(self, newly_created):
self.run('add .')
self.run('commit -m%d' % self.next_commit_rev)
def get_metadata(self, formatstr):
return self.run('log -l1 --template "%s"' % formatstr)[0]
def record_rev(self, rev_num):
tag = str(rev_num - 1) # hg starts counting changesets at 0
self.revs[rev_num] = tag
self.timestamps[tag] = self.get_metadata('{date}')
self.sha1s[tag] = self.get_metadata('{node|short}')
self.scmlogs.annotate(
"Recorded rev %d: id %s, timestamp %s, SHA1 %s" % \
(rev_num,
tag,
self.timestamps[tag],
self.sha1s[tag])
)

14
hgtests.py Normal file
View File

@ -0,0 +1,14 @@
#!/usr/bin/python
from commontests import CommonTests
from hgfixtures import HgFixtures
from utils import run_hg
class HgTests(CommonTests):
scm = 'hg'
initial_clone_command = 'hg clone'
update_cache_command = 'hg pull'
fixtures_class = HgFixtures
def default_version(self):
return self.rev(2)

View File

@ -1,3 +1,10 @@
-------------------------------------------------------------------
Mon Feb 13 15:52:19 GMT 2012 - aspiers@suse.com
- Add test suite
- Fix --subdir with --scm svn
- Fix --scm bzr
-------------------------------------------------------------------
Mon Feb 13 10:51:19 UTC 2012 - coolo@suse.com

View File

@ -16,16 +16,20 @@
#
Name: obs-service-tar_scm
%define service tar_scm
Name: obs-service-%{service}
Summary: An OBS source service: checkout or update a tar ball from svn/git/hg
License: GPL-2.0+
Group: Development/Tools/Building
Url: https://build.opensuse.org/package/show?package=obs-service-tar_scm&project=openSUSE%3ATools
Version: 0.2.1
Url: https://build.opensuse.org/package/show?package=obs-service-%{service}&project=openSUSE%3ATools
Version: 0.2.2
Release: 0
Source: tar_scm
Source1: tar_scm.service
Requires: subversion git mercurial bzr
Source: %{service}
Source1: %{service}.service
Requires: bzr git mercurial subversion
BuildRequires: bzr git mercurial subversion
BuildRequires: python >= 2.6
BuildRoot: %{_tmppath}/%{name}-%{version}-build
BuildArch: noarch
@ -45,6 +49,11 @@ mkdir -p $RPM_BUILD_ROOT/usr/lib/obs/service
install -m 0755 %{SOURCE0} $RPM_BUILD_ROOT/usr/lib/obs/service
install -m 0644 %{SOURCE1} $RPM_BUILD_ROOT/usr/lib/obs/service
%check
chmod +x $RPM_SOURCE_DIR/scm-wrapper
: Running the test suite. Please be patient - this takes a few minutes ...
python $RPM_SOURCE_DIR/test.py
%files
%defattr(-,root,root)
%dir /usr/lib/obs

26
scm-wrapper Normal file
View File

@ -0,0 +1,26 @@
#!/bin/bash
# Wrapper around SCM to enable behaviour verification testing
# on tar_scm's repository caching code. This is cleaner than
# writing tests which look inside the cache, because then they
# become coupled to the cache's implementation, and require
# knowledge of where the cache lives etc.
me=`basename $0`
if [ -z "$SCM_INVOCATION_LOG" ]; then
cat <<EOF >&2
\$SCM_INVOCATION_LOG must be set before calling $0.
It should be invoked from the test suite, not directly.
EOF
exit 1
fi
if [ "$me" = 'scm-wrapper' ]; then
echo "$me should not be invoked directly, only via symlink" >&2
exit 1
fi
echo "$me $*" >> "$SCM_INVOCATION_LOG"
/usr/bin/$me "$@"

85
scmlogs.py Normal file
View File

@ -0,0 +1,85 @@
#!/usr/bin/python
import glob
import os
class ScmInvocationLogs:
"""
Provides log files which tracks invocations of SCM binaries. The
tracking is done via a wrapper around SCM to enable behaviour
verification testing on tar_scm's repository caching code. This
is cleaner than writing tests which look inside the cache, because
then they become coupled to the cache's implementation, and
require knowledge of where the cache lives etc.
One instance should be constructed per unit test. If the test
invokes the SCM binary multiple times, invoke next() in between
each, so that a separate log file is used for each invocation -
this allows more accurate fine-grained assertions on the
invocation log.
"""
@classmethod
def setup_bin_wrapper(cls, scm, tmp_dir):
cls.wrapper_dir = tmp_dir + '/wrappers'
if not os.path.exists(cls.wrapper_dir):
os.makedirs(cls.wrapper_dir)
wrapper = cls.wrapper_dir + '/' + scm
if not os.path.exists(wrapper):
os.symlink('../../scm-wrapper', wrapper)
path = os.getenv('PATH')
prepend = cls.wrapper_dir + ':'
if not path.startswith(prepend):
new_path = prepend + path
os.environ['PATH'] = new_path
def __init__(self, scm, test_dir):
self.scm = scm
self.test_dir = test_dir
self.counter = 0
self.unlink_existing_logs()
def get_log_file_template(self):
return '%s-invocation-%%s.log' % self.scm
def get_log_path_template(self):
return os.path.join(self.test_dir, self.get_log_file_template())
def unlink_existing_logs(self):
pat = self.get_log_path_template() % '*'
for log in glob.glob(pat):
os.unlink(log)
def get_log_file(self, identifier):
if identifier:
identifier = '-' + identifier
return self.get_log_file_template() % ('%02d%s' % (self.counter, identifier))
def get_log_path(self, identifier):
return os.path.join(self.test_dir, self.get_log_file(identifier))
def next(self, identifier=''):
self.counter += 1
self.current_log_path = self.get_log_path(identifier)
if os.path.exists(self.current_log_path):
raise RuntimeError, "%s already existed?!" % self.current_log_path
os.putenv('SCM_INVOCATION_LOG', self.current_log_path)
def annotate(self, msg):
log = open(self.current_log_path, 'a')
log.write('# ' + msg + "\n")
print msg
log.close()
def read(self):
if not os.path.exists(self.current_log_path):
return '<no %s log>' % self.scm
log = open(self.current_log_path)
loglines = log.readlines()
log.close()
return loglines

44
svnfixtures.py Normal file
View File

@ -0,0 +1,44 @@
#!/usr/bin/python
import os
from fixtures import Fixtures
from utils import mkfreshdir, quietrun, run_svn
class SvnFixtures(Fixtures):
def init(self):
self.wd_path = self.container_dir + '/wd'
self.create_repo()
self.checkout_repo()
self.added = { }
self.timestamps = { }
self.create_commits(2)
def run(self, cmd):
return run_svn(self.wd_path, cmd)
def create_repo(self):
quietrun('svnadmin create ' + self.repo_path)
print "created repo", self.repo_path
def checkout_repo(self):
mkfreshdir(self.wd_path)
quietrun('svn checkout %s %s' % (self.repo_url, self.wd_path))
self.wd = self.wd_path
def do_commit(self, newly_created):
for new in newly_created:
if not new in self.added:
self.run('add ' + new)
self.added[new] = True
self.run('commit -m%d' % self.next_commit_rev)
def get_metadata(self, formatstr):
return self.run('log -n1' % formatstr)[0]
def record_rev(self, rev_num):
self.revs[rev_num] = str(rev_num)
self.scmlogs.annotate("Recorded rev %d" % rev_num)

14
svntests.py Normal file
View File

@ -0,0 +1,14 @@
#!/usr/bin/python
from commontests import CommonTests
from svnfixtures import SvnFixtures
from utils import run_svn
class SvnTests(CommonTests):
scm = 'svn'
initial_clone_command = 'svn (co|checkout) '
update_cache_command = 'svn up(date)?'
fixtures_class = SvnFixtures
def default_version(self):
return self.rev(2)

42
tar_scm
View File

@ -87,11 +87,11 @@ done
FILE="$MYFILENAME"
VERSION="$MYVERSION"
if [ -z "$MYPACKAGEMETA" ]; then
EXCLUDES="$EXCLUDES --exclude-vcs"
EXCLUDES="$EXCLUDES --exclude=.$MYSCM"
fi
if [ -z "$MYSCM" ]; then
echo "ERROR: no scm is given via --scm parameter (git/svn/hg)!"
echo "ERROR: no scm is given via --scm parameter (git/svn/hg/bzr)!"
exit 1
fi
if [ -z "$MYURL" ]; then
@ -106,19 +106,19 @@ fi
SRCDIR=$(pwd)
cd "$MYOUTDIR"
if [ -z "$FILE" -a "$MYSCM" == "git" ]; then
if [ -z "$FILE" ]; then
case "$MYSCM" in
git)
FILE="${MYURL%/}"
FILE="${FILE##*/}"
FILE="${FILE%.git}"
FILE="${FILE#*@*:}"
fi
if [ -z "$FILE" -a "$MYSCM" == "svn" ]; then
FILE="${MYURL%/}"
FILE="${FILE##*/}"
fi
if [ -z "$FILE" -a "$MYSCM" == "hg" ]; then
;;
svn|hg|bzr)
FILE="${MYURL%/}"
FILE="${FILE##*/}"
;;
esac
fi
# Try to find an existing tar ball, which can be upgraded instead of complete full download.
@ -152,35 +152,39 @@ if [ "$MYSCM" == "svn" ]; then
if [ -z "$MYSUBDIR" -a -d "$TAR_DIRECTORY/.svn" ]; then
# update existing content for speed/bandwidth reasons
cd "$TAR_DIRECTORY"
OLDVERSION=`LC_ALL=C svn info | sed -n 's,^Last Changed Rev: \(.*\),\1,p'`
OLDVERSION=`LC_ALL=C svn info | sed -n 's,^Last Changed Rev: \(.*\),\1,p'` || exit 1
if [ -n "$MYREVISION" ]; then
svn up -r"$MYREVISION" || exit 1
else
svn up || exit 1
fi
NEWVERSION=`LC_ALL=C svn info | sed -n 's,^Last Changed Rev: \(.*\),\1,p'`
NEWVERSION=`LC_ALL=C svn info | sed -n 's,^Last Changed Rev: \(.*\),\1,p'` || exit 1
cd -
mv "$TAR_DIRECTORY" "${FILE}" || exit 1
else
# new checkout
if [ -n "$MYSUBDIR" ]; then
# just checkout the subdir
mkdir -p "$MYSUBDIR"
cd "$MYSUBDIR"
mkdir -p "$FILE"
cd "$FILE"
if [ "$MYSUBDIR" != "${MYSUBDIR#/}" ]; then
echo "ERROR: Absolute paths not permitted for --subdir"
exit 1
fi
fi
if [ -n "$MYREVISION" ]; then
svn co --non-interactive $TRUST_SERVER_CERT -r"$MYREVISION" "$MYURL/$MYSUBDIR" "${FILE}" || exit 1
svn co --non-interactive $TRUST_SERVER_CERT -r"$MYREVISION" "$MYURL/$MYSUBDIR" "${MYSUBDIR:-$FILE}" || exit 1
else
svn co --non-interactive $TRUST_SERVER_CERT "$MYURL/$MYSUBDIR" "${FILE}" || exit 1
svn co --non-interactive $TRUST_SERVER_CERT "$MYURL/$MYSUBDIR" "${MYSUBDIR:-$FILE}" || exit 1
fi
if [ -n "$MYSUBDIR" ]; then
cd -
fi
fi
if [ "$VERSION" == "_auto_" ]; then
cd "$FILE"
cd "$FILE/$MYSUBDIR"
[ -n "$MYPREFIX" ] && MYPREFIX="$MYPREFIX.rev"
VERSION="$MYPREFIX"`LC_ALL=C svn info | sed -n 's,^Last Changed Rev: \(.*\),\1,p'`
VERSION="$MYPREFIX"`LC_ALL=C svn info | sed -n 's,^Last Changed Rev: \(.*\),\1,p'` || exit 1
cd -
fi
elif [ "$MYSCM" == "git" ]; then
@ -290,7 +294,9 @@ if [ -z "$MYINCLUDES" ]; then
MYINCLUDES="$FILENAME"
fi
mv "$FILE/$MYSUBDIR" "${FILENAME}" || exit 1
if [ "$FILE" != "$FILENAME" ]; then
mv "$FILE/$MYSUBDIR" "${FILENAME}" || exit 1
fi
tar cf "$MYOUTDIR/${FILENAME}.tar" $EXCLUDES $MYINCLUDES || exit 1
rm -rf "${FILENAME}" "$FILE"

37
test.py Normal file
View File

@ -0,0 +1,37 @@
#!/usr/bin/python
import sys
import unittest
from gittests import GitTests
from svntests import SvnTests
from hgtests import HgTests
from bzrtests import BzrTests
if __name__ == '__main__':
suite = unittest.TestSuite()
testclasses = [
SvnTests,
GitTests,
HgTests,
BzrTests,
]
for testclass in testclasses:
suite.addTests(unittest.TestLoader().loadTestsFromTestCase(testclass))
runner_args = {
#'verbosity' : 2,
}
major, minor, micro, releaselevel, serial = sys.version_info
if major > 2 or (major == 2 and minor >= 7):
# New in 2.7
runner_args['buffer'] = True
#runner_args['failfast'] = True
runner = unittest.TextTestRunner(**runner_args)
result = runner.run(suite)
if result.wasSuccessful():
sys.exit(0)
else:
sys.exit(1)

134
testassertions.py Normal file
View File

@ -0,0 +1,134 @@
#!/usr/bin/python
import os
from pprint import pprint, pformat
import re
import tarfile
import unittest
line_start = '(^|\n)'
class TestAssertions(unittest.TestCase):
######################################################################
# backported from 2.7 just in case we're running on an older Python
def assertRegexpMatches(self, text, expected_regexp, msg=None):
"""Fail the test unless the text matches the regular expression."""
if isinstance(expected_regexp, basestring):
expected_regexp = re.compile(expected_regexp)
if not expected_regexp.search(text):
msg = msg or "Regexp didn't match"
msg = '%s: %r not found in %r' % (msg, expected_regexp.pattern, text)
raise self.failureException(msg)
def assertNotRegexpMatches(self, text, unexpected_regexp, msg=None):
"""Fail the test if the text matches the regular expression."""
if isinstance(unexpected_regexp, basestring):
unexpected_regexp = re.compile(unexpected_regexp)
match = unexpected_regexp.search(text)
if match:
msg = msg or "Regexp matched"
msg = '%s: %r matches %r in %r' % (msg,
text[match.start():match.end()],
unexpected_regexp.pattern,
text)
raise self.failureException(msg)
######################################################################
def assertNumDirents(self, dir, expected, msg = ''):
dirents = os.listdir(dir)
got = len(dirents)
if len(msg) > 0: msg += "\n"
msg += 'expected %d file(s), got %d: %s' % (expected, got, pformat(dirents))
self.assertEqual(expected, got, msg)
return dirents
def assertNumTarEnts(self, tar, expected, msg = ''):
self.assertTrue(tarfile.is_tarfile(tar))
th = tarfile.open(tar)
tarents = th.getmembers()
got = len(tarents)
if len(msg) > 0: msg += "\n"
msg += 'expected %s to have %d entries, got %d:\n%s' % \
(tar, expected, got, pformat(tarents))
self.assertEqual(expected, got, msg)
return th, tarents
def assertStandardTar(self, tar, top):
th, entries = self.assertNumTarEnts(tar, 4)
entries.sort(lambda x, y: cmp(x.name, y.name))
self.assertEqual(entries[0].name, top)
self.assertEqual(entries[1].name, top + '/a')
self.assertEqual(entries[2].name, top + '/subdir')
self.assertEqual(entries[3].name, top + '/subdir/b')
return th
def assertSubdirTar(self, tar, top):
th, entries = self.assertNumTarEnts(tar, 2)
entries.sort(lambda x, y: cmp(x.name, y.name))
self.assertEqual(entries[0].name, top)
self.assertEqual(entries[1].name, top + '/b')
return th
def checkTar(self, tar, tarbasename, toptardir=None, tarchecker=None):
if not toptardir:
toptardir = tarbasename
if not tarchecker:
tarchecker = self.assertStandardTar
self.assertEqual(tar, '%s.tar' % tarbasename)
tarpath = os.path.join(self.outdir, tar)
return tarchecker(tarpath, toptardir)
def assertTarOnly(self, tarbasename, **kwargs):
dirents = self.assertNumDirents(self.outdir, 1)
return self.checkTar(dirents[0], tarbasename, **kwargs)
def assertTarAndDir(self, tarbasename, dirname=None, **kwargs):
if not dirname:
dirname = tarbasename
dirents = self.assertNumDirents(self.outdir, 2)
pprint(dirents)
if dirents[0][-4:] == '.tar':
tar = dirents[0]
wd = dirents[1]
elif dirents[1][-4:] == '.tar':
tar = dirents[1]
wd = dirents[0]
else:
self.fail('no .tar found in ' + self.outdir)
self.assertEqual(wd, dirname)
self.assertTrue(os.path.isdir(os.path.join(self.outdir, wd)),
dirname + ' should be directory')
return self.checkTar(tar, tarbasename, **kwargs)
def assertTarMemberContains(self, th, tarmember, contents):
f = th.extractfile(tarmember)
self.assertEqual(contents, "\n".join(f.readlines()))
def assertRanInitialClone(self, logpath, loglines):
self._find(logpath, loglines, self.initial_clone_command, self.update_cache_command)
def assertRanUpdate(self, logpath, loglines):
self._find(logpath, loglines, self.update_cache_command, self.initial_clone_command)
def _find(self, logpath, loglines, should_find, should_not_find):
print "####", should_find
found = False
regexp = re.compile('^' + should_find)
for line in loglines:
msg = \
"Shouldn't find /%s/ in %s; log was:\n" \
"----\n%s\n----\n" \
% (should_not_find, logpath, "".join(loglines))
self.assertNotRegexpMatches(line, should_not_find, msg)
if regexp.search(line):
found = True
msg = \
"Didn't find /%s/ in %s; log was:\n" \
"----\n%s\n----\n" \
% (regexp.pattern, logpath, "".join(loglines))
self.assertTrue(found, msg)

143
testenv.py Normal file
View File

@ -0,0 +1,143 @@
#!/usr/bin/python
import os
import shutil
from utils import mkfreshdir, run_cmd
from scmlogs import ScmInvocationLogs
class TestEnvironment:
tests_dir = os.path.abspath(os.path.dirname(__file__)) # os.getcwd()
tmp_dir = tests_dir + '/tmp'
is_setup = False
@classmethod
def tar_scm_bin(cls):
tar_scm = cls.tests_dir + '/tar_scm'
if not os.path.isfile(tar_scm):
raise RuntimeError, "Failed to find tar_scm executable at " + tar_scm
return tar_scm
@classmethod
def setupClass(cls):
# deliberately not setUpClass - we emulate the behaviour
# to support Python < 2.7
if cls.is_setup:
return
print "++++++ setupClass ++++++"
ScmInvocationLogs.setup_bin_wrapper(cls.scm, cls.tmp_dir)
cls.is_setup = True
def calcPaths(self):
if not self._testMethodName.startswith('test_'):
raise RuntimeError, "unexpected test method name: " + self._testMethodName
self.test_name = self._testMethodName[5:]
self.test_dir = os.path.join(self.tmp_dir, self.scm, self.test_name)
self.pkgdir = os.path.join(self.test_dir, 'pkg')
self.outdir = os.path.join(self.test_dir, 'out')
self.cachedir = os.path.join(self.test_dir, 'cache')
def setUp(self):
self.setupClass()
print "++++++ setUp ++++++"
self.calcPaths()
self.scmlogs = ScmInvocationLogs(self.scm, self.test_dir)
self.scmlogs.next('fixtures')
self.initDirs()
self.fixtures = self.fixtures_class(self.test_dir, self.scmlogs)
self.fixtures.setup()
self.scmlogs.next('start-test')
self.scmlogs.annotate('Starting %s test' % self.test_name)
os.putenv('CACHEDIRECTORY', self.cachedir)
# osc launches source services with cwd as pkg dir
os.chdir(self.pkgdir)
def initDirs(self):
# pkgdir persists between tests to simulate real world use
# (although a test can choose to invoke mkfreshdir)
if not os.path.exists(self.pkgdir):
os.makedirs(self.pkgdir)
for subdir in ('repo', 'repourl', 'incoming'):
mkfreshdir(os.path.join(self.cachedir, subdir))
def disableCache(self):
os.unsetenv('CACHEDIRECTORY')
def tearDown(self):
print "++++++ tearDown ++++++"
self.postRun()
def postRun(self):
print "++++++ postRun +++++++"
self.service = { 'mode' : 'disabled' }
if os.path.exists(self.outdir):
self.simulate_osc_postrun()
def simulate_osc_postrun(self):
"""
Simulate osc copying files from temporary --outdir back to
package source directory, so our tests can catch any
potential side-effects due to the persistent nature of the
package source directory.
"""
temp_dir = self.outdir
dir = self.pkgdir
service = self.service
# This code copied straight out of osc/core.py Serviceinfo.execute():
if service['mode'] == "disabled" or service['mode'] == "trylocal" or service['mode'] == "localonly" or callmode == "local" or callmode == "trylocal":
for filename in os.listdir(temp_dir):
shutil.move( os.path.join(temp_dir, filename), os.path.join(dir, filename) )
else:
for filename in os.listdir(temp_dir):
shutil.move( os.path.join(temp_dir, filename), os.path.join(dir, "_service:"+name+":"+filename) )
def tar_scm_std(self, *args, **kwargs):
return self.tar_scm(self.stdargs(*args), **kwargs)
def tar_scm_std_fail(self, *args):
return self.tar_scm(self.stdargs(*args), should_succeed=False)
def stdargs(self, *args):
return [ '--url', self.fixtures.repo_url, '--scm', self.scm ] + list(args)
def tar_scm(self, args, should_succeed=True):
# simulate new temporary outdir for each tar_scm invocation
mkfreshdir(self.outdir)
cmdargs = args + [ '--outdir', self.outdir ]
quotedargs = [ "'%s'" % arg for arg in cmdargs ]
cmdstr = 'bash %s %s 2>&1' % (self.tar_scm_bin(), " ".join(quotedargs))
print "\n"
print "-" * 70
print "Running", cmdstr
(stdout, stderr, ret) = run_cmd(cmdstr)
if stdout:
print "STDOUT:"
print "------"
print stdout,
if stderr:
print "STDERR:"
print "------"
print stderr,
print "-" * 70
succeeded = ret == 0
self.assertEqual(succeeded, should_succeed)
return (stdout, stderr, ret)
def rev(self, rev):
return self.fixtures.revs[rev]
def timestamps(self, rev):
return self.fixtures.timestamps[rev]
def sha1s(self, rev):
return self.fixtures.sha1s[rev]

46
utils.py Normal file
View File

@ -0,0 +1,46 @@
#!/usr/bin/python
import os
import re
import shutil
import subprocess
def mkfreshdir(path):
if not re.search('.{10}/tmp(/|$)', path):
raise RuntimeError, 'unsafe call: mkfreshdir(%s)' % path
cwd = os.getcwd()
os.chdir('/')
if os.path.exists(path):
shutil.rmtree(path)
os.makedirs(path)
def run_cmd(cmd):
p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdout, stderr) = p.communicate()
return (stdout, stderr, p.returncode)
def quietrun(cmd):
(stdout, stderr, ret) = run_cmd(cmd)
if ret != 0:
print cmd, " failed!"
print stdout
print stderr
return (stdout, stderr, ret)
def run_scm(scm, repo, opts):
cmd = 'cd %s && %s %s' % (repo, scm, opts)
#return subprocess.check_output(cmd, shell=True)
return quietrun(cmd)
def run_git(repo, opts):
return run_scm('git', repo, opts)
def run_svn(repo, opts):
return run_scm('svn', repo, opts)
def run_hg(repo, opts):
return run_scm('hg', repo, opts)
def run_bzr(repo, opts):
return run_scm('bzr', repo, opts)