forked from pool/python-mohawk
852 lines
33 KiB
Diff
852 lines
33 KiB
Diff
|
|
Index: mohawk-1.1.0/mohawk/tests.py
|
||
|
|
===================================================================
|
||
|
|
--- mohawk-1.1.0.orig/mohawk/tests.py
|
||
|
|
+++ mohawk-1.1.0/mohawk/tests.py
|
||
|
|
@@ -4,7 +4,6 @@ from unittest import TestCase
|
||
|
|
from base64 import b64decode, urlsafe_b64encode
|
||
|
|
|
||
|
|
import mock
|
||
|
|
-from nose.tools import eq_, raises
|
||
|
|
import six
|
||
|
|
|
||
|
|
from . import Receiver, Sender
|
||
|
|
@@ -60,27 +59,23 @@ class Base(TestCase):
|
||
|
|
|
||
|
|
class TestConfig(Base):
|
||
|
|
|
||
|
|
- @raises(InvalidCredentials)
|
||
|
|
def test_no_id(self):
|
||
|
|
c = self.credentials.copy()
|
||
|
|
del c['id']
|
||
|
|
- validate_credentials(c)
|
||
|
|
+ self.assertRaises(InvalidCredentials, validate_credentials, c)
|
||
|
|
|
||
|
|
- @raises(InvalidCredentials)
|
||
|
|
def test_no_key(self):
|
||
|
|
c = self.credentials.copy()
|
||
|
|
del c['key']
|
||
|
|
- validate_credentials(c)
|
||
|
|
+ self.assertRaises(InvalidCredentials, validate_credentials, c)
|
||
|
|
|
||
|
|
- @raises(InvalidCredentials)
|
||
|
|
def test_no_algo(self):
|
||
|
|
c = self.credentials.copy()
|
||
|
|
del c['algorithm']
|
||
|
|
- validate_credentials(c)
|
||
|
|
+ self.assertRaises(InvalidCredentials, validate_credentials, c)
|
||
|
|
|
||
|
|
- @raises(InvalidCredentials)
|
||
|
|
def test_no_credentials(self):
|
||
|
|
- validate_credentials(None)
|
||
|
|
+ self.assertRaises(InvalidCredentials, validate_credentials, None)
|
||
|
|
|
||
|
|
def test_non_dict_credentials(self):
|
||
|
|
class WeirdThing(object):
|
||
|
|
@@ -144,10 +139,11 @@ class TestSender(Base):
|
||
|
|
self.receive(sn.request_header, method=method, content=content,
|
||
|
|
content_type='application/json; charset=other')
|
||
|
|
|
||
|
|
- @raises(MissingContent)
|
||
|
|
def test_missing_payload_details(self):
|
||
|
|
- self.Sender(method='POST', content=EmptyValue,
|
||
|
|
- content_type=EmptyValue)
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ MissingContent, self.Sender, method='POST', content=EmptyValue,
|
||
|
|
+ content_type=EmptyValue
|
||
|
|
+ )
|
||
|
|
|
||
|
|
def test_skip_payload_hashing(self):
|
||
|
|
method = 'POST'
|
||
|
|
@@ -193,65 +189,77 @@ class TestSender(Base):
|
||
|
|
content_type=content_type,
|
||
|
|
accept_untrusted_content=True)
|
||
|
|
|
||
|
|
- @raises(MissingContent)
|
||
|
|
def test_cannot_skip_content_only(self):
|
||
|
|
- self.Sender(method='POST', content=EmptyValue,
|
||
|
|
- content_type='application/json')
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ MissingContent, self.Sender, method='POST', content=EmptyValue,
|
||
|
|
+ content_type='application/json'
|
||
|
|
+ )
|
||
|
|
|
||
|
|
- @raises(MissingContent)
|
||
|
|
def test_cannot_skip_content_type_only(self):
|
||
|
|
- self.Sender(method='POST', content='{"foo": "bar"}',
|
||
|
|
- content_type=EmptyValue)
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ MissingContent, self.Sender, method='POST',
|
||
|
|
+ content='{"foo": "bar"}', content_type=EmptyValue
|
||
|
|
+ )
|
||
|
|
|
||
|
|
- @raises(MacMismatch)
|
||
|
|
def test_tamper_with_host(self):
|
||
|
|
sn = self.Sender()
|
||
|
|
- self.receive(sn.request_header, url='http://TAMPERED-WITH.com')
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ MacMismatch, self.receive, sn.request_header,
|
||
|
|
+ url='http://TAMPERED-WITH.com'
|
||
|
|
+ )
|
||
|
|
|
||
|
|
- @raises(MacMismatch)
|
||
|
|
def test_tamper_with_method(self):
|
||
|
|
sn = self.Sender(method='GET')
|
||
|
|
- self.receive(sn.request_header, method='POST')
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ MacMismatch, self.receive, sn.request_header, method='POST')
|
||
|
|
|
||
|
|
- @raises(MacMismatch)
|
||
|
|
def test_tamper_with_path(self):
|
||
|
|
sn = self.Sender()
|
||
|
|
- self.receive(sn.request_header,
|
||
|
|
- url='http://site.com/TAMPERED?bar=1')
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ MacMismatch, self.receive, sn.request_header,
|
||
|
|
+ url='http://site.com/TAMPERED?bar=1'
|
||
|
|
+ )
|
||
|
|
|
||
|
|
- @raises(MacMismatch)
|
||
|
|
def test_tamper_with_query(self):
|
||
|
|
sn = self.Sender()
|
||
|
|
- self.receive(sn.request_header,
|
||
|
|
- url='http://site.com/foo?bar=TAMPERED')
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ MacMismatch, self.receive, sn.request_header,
|
||
|
|
+ url='http://site.com/foo?bar=TAMPERED'
|
||
|
|
+ )
|
||
|
|
|
||
|
|
- @raises(MacMismatch)
|
||
|
|
def test_tamper_with_scheme(self):
|
||
|
|
sn = self.Sender()
|
||
|
|
- self.receive(sn.request_header, url='https://site.com/foo?bar=1')
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ MacMismatch, self.receive, sn.request_header,
|
||
|
|
+ url='https://site.com/foo?bar=1'
|
||
|
|
+ )
|
||
|
|
|
||
|
|
- @raises(MacMismatch)
|
||
|
|
def test_tamper_with_port(self):
|
||
|
|
sn = self.Sender()
|
||
|
|
- self.receive(sn.request_header,
|
||
|
|
- url='http://site.com:8000/foo?bar=1')
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ MacMismatch, self.receive, sn.request_header,
|
||
|
|
+ url='http://site.com:8000/foo?bar=1'
|
||
|
|
+ )
|
||
|
|
|
||
|
|
- @raises(MisComputedContentHash)
|
||
|
|
def test_tamper_with_content(self):
|
||
|
|
sn = self.Sender()
|
||
|
|
- self.receive(sn.request_header, content='stuff=nope')
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ MisComputedContentHash, self.receive, sn.request_header,
|
||
|
|
+ content='stuff=nope'
|
||
|
|
+ )
|
||
|
|
|
||
|
|
def test_non_ascii_content(self):
|
||
|
|
content = u'Ivan Kristi\u0107'
|
||
|
|
sn = self.Sender(content=content)
|
||
|
|
self.receive(sn.request_header, content=content)
|
||
|
|
|
||
|
|
- @raises(MacMismatch)
|
||
|
|
def test_tamper_with_content_type(self):
|
||
|
|
sn = self.Sender(method='POST')
|
||
|
|
- self.receive(sn.request_header, content_type='application/json')
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ MacMismatch, self.receive, sn.request_header,
|
||
|
|
+ content_type='application/json'
|
||
|
|
+ )
|
||
|
|
|
||
|
|
- @raises(AlreadyProcessed)
|
||
|
|
def test_nonce_fail(self):
|
||
|
|
|
||
|
|
def seen_nonce(id, nonce, ts):
|
||
|
|
@@ -259,7 +267,10 @@ class TestSender(Base):
|
||
|
|
|
||
|
|
sn = self.Sender()
|
||
|
|
|
||
|
|
- self.receive(sn.request_header, seen_nonce=seen_nonce)
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ AlreadyProcessed, self.receive, sn.request_header,
|
||
|
|
+ seen_nonce=seen_nonce
|
||
|
|
+ )
|
||
|
|
|
||
|
|
def test_nonce_ok(self):
|
||
|
|
|
||
|
|
@@ -269,11 +280,10 @@ class TestSender(Base):
|
||
|
|
sn = self.Sender(seen_nonce=seen_nonce)
|
||
|
|
self.receive(sn.request_header)
|
||
|
|
|
||
|
|
- @raises(TokenExpired)
|
||
|
|
def test_expired_ts(self):
|
||
|
|
now = utc_now() - 120
|
||
|
|
sn = self.Sender(_timestamp=now)
|
||
|
|
- self.receive(sn.request_header)
|
||
|
|
+ self.assertRaises(TokenExpired, self.receive, sn.request_header)
|
||
|
|
|
||
|
|
def test_expired_exception_reports_localtime(self):
|
||
|
|
now = utc_now()
|
||
|
|
@@ -288,8 +298,8 @@ class TestSender(Base):
|
||
|
|
except:
|
||
|
|
etype, exc, tb = sys.exc_info()
|
||
|
|
|
||
|
|
- eq_(type(exc), TokenExpired)
|
||
|
|
- eq_(exc.localtime_in_seconds, now)
|
||
|
|
+ self.assertEqual(type(exc), TokenExpired)
|
||
|
|
+ self.assertEqual(exc.localtime_in_seconds, now)
|
||
|
|
|
||
|
|
def test_localtime_offset(self):
|
||
|
|
now = utc_now() - 120
|
||
|
|
@@ -303,13 +313,11 @@ class TestSender(Base):
|
||
|
|
# Without an offset this will raise an expired exception.
|
||
|
|
self.receive(sn.request_header, timestamp_skew_in_seconds=120)
|
||
|
|
|
||
|
|
- @raises(MacMismatch)
|
||
|
|
def test_hash_tampering(self):
|
||
|
|
sn = self.Sender()
|
||
|
|
header = sn.request_header.replace('hash="', 'hash="nope')
|
||
|
|
- self.receive(header)
|
||
|
|
+ self.assertRaises(MacMismatch, self.receive, header)
|
||
|
|
|
||
|
|
- @raises(MacMismatch)
|
||
|
|
def test_bad_secret(self):
|
||
|
|
cfg = {
|
||
|
|
'id': 'my-hawk-id',
|
||
|
|
@@ -317,79 +325,77 @@ class TestSender(Base):
|
||
|
|
'algorithm': 'sha256',
|
||
|
|
}
|
||
|
|
sn = self.Sender(credentials=cfg)
|
||
|
|
- self.receive(sn.request_header)
|
||
|
|
+ self.assertRaises(MacMismatch, self.receive, sn.request_header)
|
||
|
|
|
||
|
|
- @raises(MacMismatch)
|
||
|
|
def test_unexpected_algorithm(self):
|
||
|
|
cr = self.credentials.copy()
|
||
|
|
cr['algorithm'] = 'sha512'
|
||
|
|
sn = self.Sender(credentials=cr)
|
||
|
|
|
||
|
|
# Validate with mismatched credentials (sha256).
|
||
|
|
- self.receive(sn.request_header)
|
||
|
|
+ self.assertRaises(MacMismatch, self.receive, sn.request_header)
|
||
|
|
|
||
|
|
- @raises(InvalidCredentials)
|
||
|
|
def test_invalid_credentials(self):
|
||
|
|
cfg = self.credentials.copy()
|
||
|
|
# Create an invalid credentials.
|
||
|
|
del cfg['algorithm']
|
||
|
|
|
||
|
|
- self.Sender(credentials=cfg)
|
||
|
|
+ self.assertRaises(InvalidCredentials, self.Sender, credentials=cfg)
|
||
|
|
|
||
|
|
- @raises(CredentialsLookupError)
|
||
|
|
def test_unknown_id(self):
|
||
|
|
cr = self.credentials.copy()
|
||
|
|
cr['id'] = 'someone-else'
|
||
|
|
sn = self.Sender(credentials=cr)
|
||
|
|
|
||
|
|
- self.receive(sn.request_header)
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ CredentialsLookupError, self.receive, sn.request_header)
|
||
|
|
|
||
|
|
- @raises(MacMismatch)
|
||
|
|
def test_bad_ext(self):
|
||
|
|
sn = self.Sender(ext='my external data')
|
||
|
|
|
||
|
|
header = sn.request_header.replace('my external data', 'TAMPERED')
|
||
|
|
- self.receive(header)
|
||
|
|
+ self.assertRaises(MacMismatch, self.receive, header)
|
||
|
|
|
||
|
|
- @raises(BadHeaderValue)
|
||
|
|
def test_duplicate_keys(self):
|
||
|
|
sn = self.Sender(ext='someext')
|
||
|
|
header = sn.request_header + ', ext="otherext"'
|
||
|
|
- self.receive(header)
|
||
|
|
+ self.assertRaises(BadHeaderValue, self.receive, header)
|
||
|
|
|
||
|
|
- @raises(BadHeaderValue)
|
||
|
|
def test_ext_with_quotes(self):
|
||
|
|
- sn = self.Sender(ext='quotes=""')
|
||
|
|
- self.receive(sn.request_header)
|
||
|
|
+ with self.assertRaises(BadHeaderValue):
|
||
|
|
+ sn = self.Sender(ext='quotes=""')
|
||
|
|
+ self.receive(sn.request_header)
|
||
|
|
|
||
|
|
- @raises(BadHeaderValue)
|
||
|
|
def test_ext_with_new_line(self):
|
||
|
|
- sn = self.Sender(ext="new line \n in the middle")
|
||
|
|
- self.receive(sn.request_header)
|
||
|
|
+ with self.assertRaises(BadHeaderValue):
|
||
|
|
+ sn = self.Sender(ext="new line \n in the middle")
|
||
|
|
+ self.receive(sn.request_header)
|
||
|
|
|
||
|
|
def test_ext_with_equality_sign(self):
|
||
|
|
sn = self.Sender(ext="foo=bar&foo2=bar2;foo3=bar3")
|
||
|
|
self.receive(sn.request_header)
|
||
|
|
parsed = parse_authorization_header(sn.request_header)
|
||
|
|
- eq_(parsed['ext'], "foo=bar&foo2=bar2;foo3=bar3")
|
||
|
|
+ self.assertEqual(parsed['ext'], "foo=bar&foo2=bar2;foo3=bar3")
|
||
|
|
|
||
|
|
- @raises(HawkFail)
|
||
|
|
def test_non_hawk_scheme(self):
|
||
|
|
- parse_authorization_header('Basic user:base64pw')
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ HawkFail, parse_authorization_header, 'Basic user:base64pw')
|
||
|
|
|
||
|
|
- @raises(HawkFail)
|
||
|
|
def test_invalid_key(self):
|
||
|
|
- parse_authorization_header('Hawk mac="validmac" unknownkey="value"')
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ HawkFail, parse_authorization_header,
|
||
|
|
+ 'Hawk mac="validmac" unknownkey="value"'
|
||
|
|
+ )
|
||
|
|
|
||
|
|
def test_ext_with_all_valid_characters(self):
|
||
|
|
valid_characters = "!#$%&'()*+,-./:;<=>?@[]^_`{|}~ azAZ09_"
|
||
|
|
sender = self.Sender(ext=valid_characters)
|
||
|
|
parsed = parse_authorization_header(sender.request_header)
|
||
|
|
- eq_(parsed['ext'], valid_characters)
|
||
|
|
+ self.assertEqual(parsed['ext'], valid_characters)
|
||
|
|
|
||
|
|
- @raises(BadHeaderValue)
|
||
|
|
def test_ext_with_illegal_chars(self):
|
||
|
|
- self.Sender(ext="something like \t is illegal")
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ BadHeaderValue, self.Sender, ext="something like \t is illegal")
|
||
|
|
|
||
|
|
def test_unparseable_header(self):
|
||
|
|
try:
|
||
|
|
@@ -401,48 +407,47 @@ class TestSender(Base):
|
||
|
|
else:
|
||
|
|
self.fail('should raise')
|
||
|
|
|
||
|
|
- @raises(BadHeaderValue)
|
||
|
|
def test_ext_with_illegal_unicode(self):
|
||
|
|
- self.Sender(ext=u'Ivan Kristi\u0107')
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ BadHeaderValue, self.Sender, ext=u'Ivan Kristi\u0107')
|
||
|
|
|
||
|
|
- @raises(BadHeaderValue)
|
||
|
|
def test_too_long_header(self):
|
||
|
|
sn = self.Sender(ext='a'*5000)
|
||
|
|
- self.receive(sn.request_header)
|
||
|
|
+ self.assertRaises(BadHeaderValue, self.receive, sn.request_header)
|
||
|
|
|
||
|
|
- @raises(BadHeaderValue)
|
||
|
|
def test_ext_with_illegal_utf8(self):
|
||
|
|
# This isn't allowed because the escaped byte chars are out of
|
||
|
|
# range.
|
||
|
|
- self.Sender(ext=u'Ivan Kristi\u0107'.encode('utf8'))
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ BadHeaderValue, self.Sender,
|
||
|
|
+ ext=u'Ivan Kristi\u0107'.encode('utf8')
|
||
|
|
+ )
|
||
|
|
|
||
|
|
def test_app_ok(self):
|
||
|
|
app = 'custom-app'
|
||
|
|
sn = self.Sender(app=app)
|
||
|
|
self.receive(sn.request_header)
|
||
|
|
parsed = parse_authorization_header(sn.request_header)
|
||
|
|
- eq_(parsed['app'], app)
|
||
|
|
+ self.assertEqual(parsed['app'], app)
|
||
|
|
|
||
|
|
- @raises(MacMismatch)
|
||
|
|
def test_tampered_app(self):
|
||
|
|
app = 'custom-app'
|
||
|
|
sn = self.Sender(app=app)
|
||
|
|
header = sn.request_header.replace(app, 'TAMPERED-WITH')
|
||
|
|
- self.receive(header)
|
||
|
|
+ self.assertRaises(MacMismatch, self.receive, header)
|
||
|
|
|
||
|
|
def test_dlg_ok(self):
|
||
|
|
dlg = 'custom-dlg'
|
||
|
|
sn = self.Sender(dlg=dlg)
|
||
|
|
self.receive(sn.request_header)
|
||
|
|
parsed = parse_authorization_header(sn.request_header)
|
||
|
|
- eq_(parsed['dlg'], dlg)
|
||
|
|
+ self.assertEqual(parsed['dlg'], dlg)
|
||
|
|
|
||
|
|
- @raises(MacMismatch)
|
||
|
|
def test_tampered_dlg(self):
|
||
|
|
dlg = 'custom-dlg'
|
||
|
|
sn = self.Sender(dlg=dlg, app='some-app')
|
||
|
|
header = sn.request_header.replace(dlg, 'TAMPERED-WITH')
|
||
|
|
- self.receive(header)
|
||
|
|
+ self.assertRaises(MacMismatch, self.receive, header)
|
||
|
|
|
||
|
|
def test_file_content(self):
|
||
|
|
method = "POST"
|
||
|
|
@@ -456,12 +461,14 @@ class TestSender(Base):
|
||
|
|
sn = self.Sender(method, content=content)
|
||
|
|
self.receive(sn.request_header, method=method, content=content.getvalue())
|
||
|
|
|
||
|
|
- @raises(MisComputedContentHash)
|
||
|
|
def test_bad_file_content(self):
|
||
|
|
method = "POST"
|
||
|
|
content = six.BytesIO(b"FILE CONTENT")
|
||
|
|
sn = self.Sender(method, content=content)
|
||
|
|
- self.receive(sn.request_header, method=method, content="BAD FILE CONTENT")
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ MisComputedContentHash, self.receive, sn.request_header,
|
||
|
|
+ method=method, content="BAD FILE CONTENT"
|
||
|
|
+ )
|
||
|
|
|
||
|
|
|
||
|
|
class TestReceiver(Base):
|
||
|
|
@@ -508,10 +515,10 @@ class TestReceiver(Base):
|
||
|
|
|
||
|
|
return receiver.response_header
|
||
|
|
|
||
|
|
- @raises(InvalidCredentials)
|
||
|
|
def test_invalid_credentials_lookup(self):
|
||
|
|
# Return invalid credentials.
|
||
|
|
- self.receive(credentials_map=lambda *a: {})
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ InvalidCredentials, self.receive, credentials_map=lambda *a: {})
|
||
|
|
|
||
|
|
def test_get_ok(self):
|
||
|
|
method = 'GET'
|
||
|
|
@@ -523,41 +530,42 @@ class TestReceiver(Base):
|
||
|
|
self.receive(method=method)
|
||
|
|
self.respond()
|
||
|
|
|
||
|
|
- @raises(MisComputedContentHash)
|
||
|
|
def test_respond_with_wrong_content(self):
|
||
|
|
self.receive()
|
||
|
|
- self.respond(content='real content',
|
||
|
|
- accept_kw=dict(content='TAMPERED WITH'))
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ MisComputedContentHash, self.respond, content='real content',
|
||
|
|
+ accept_kw=dict(content='TAMPERED WITH')
|
||
|
|
+ )
|
||
|
|
|
||
|
|
- @raises(MisComputedContentHash)
|
||
|
|
def test_respond_with_wrong_content_type(self):
|
||
|
|
self.receive()
|
||
|
|
- self.respond(content_type='text/html',
|
||
|
|
- accept_kw=dict(content_type='application/json'))
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ MisComputedContentHash, self.respond, content_type='text/html',
|
||
|
|
+ accept_kw=dict(content_type='application/json')
|
||
|
|
+ )
|
||
|
|
|
||
|
|
- @raises(MissingAuthorization)
|
||
|
|
def test_missing_authorization(self):
|
||
|
|
- Receiver(lambda id: self.credentials, None, '/', 'GET')
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ MissingAuthorization, Receiver, lambda id: self.credentials,
|
||
|
|
+ None, '/', 'GET'
|
||
|
|
+ )
|
||
|
|
|
||
|
|
- @raises(MacMismatch)
|
||
|
|
def test_respond_with_wrong_url(self):
|
||
|
|
self.receive(url='http://fakesite.com')
|
||
|
|
wrong_receiver = self.receiver
|
||
|
|
|
||
|
|
self.receive(url='http://realsite.com')
|
||
|
|
|
||
|
|
- self.respond(receiver=wrong_receiver)
|
||
|
|
+ self.assertRaises(MacMismatch, self.respond, receiver=wrong_receiver)
|
||
|
|
|
||
|
|
- @raises(MacMismatch)
|
||
|
|
def test_respond_with_wrong_method(self):
|
||
|
|
self.receive(method='GET')
|
||
|
|
wrong_receiver = self.receiver
|
||
|
|
|
||
|
|
self.receive(method='POST')
|
||
|
|
|
||
|
|
- self.respond(receiver=wrong_receiver)
|
||
|
|
+ self.assertRaises(MacMismatch, self.respond, receiver=wrong_receiver)
|
||
|
|
|
||
|
|
- @raises(MacMismatch)
|
||
|
|
def test_respond_with_wrong_nonce(self):
|
||
|
|
self.receive(sender_kw=dict(nonce='another-nonce'))
|
||
|
|
wrong_receiver = self.receiver
|
||
|
|
@@ -565,7 +573,7 @@ class TestReceiver(Base):
|
||
|
|
self.receive()
|
||
|
|
|
||
|
|
# The nonce must match the one sent in the original request.
|
||
|
|
- self.respond(receiver=wrong_receiver)
|
||
|
|
+ self.assertRaises(MacMismatch, self.respond, receiver=wrong_receiver)
|
||
|
|
|
||
|
|
def test_respond_with_unhashed_content(self):
|
||
|
|
self.receive()
|
||
|
|
@@ -574,7 +582,6 @@ class TestReceiver(Base):
|
||
|
|
content_type=None,
|
||
|
|
accept_kw=dict(accept_untrusted_content=True))
|
||
|
|
|
||
|
|
- @raises(TokenExpired)
|
||
|
|
def test_respond_with_expired_ts(self):
|
||
|
|
self.receive()
|
||
|
|
hdr = self.receiver.respond(content='', content_type='')
|
||
|
|
@@ -589,8 +596,7 @@ class TestReceiver(Base):
|
||
|
|
calculated = calculate_ts_mac(fn(), self.credentials)
|
||
|
|
if isinstance(calculated, six.binary_type):
|
||
|
|
calculated = calculated.decode('ascii')
|
||
|
|
- eq_(hdr['tsm'], calculated)
|
||
|
|
- raise
|
||
|
|
+ self.assertEqual(hdr['tsm'], calculated)
|
||
|
|
|
||
|
|
def test_respond_with_bad_ts_skew_ok(self):
|
||
|
|
now = utc_now() - 120
|
||
|
|
@@ -611,9 +617,8 @@ class TestReceiver(Base):
|
||
|
|
ext = 'custom-ext'
|
||
|
|
self.respond(ext=ext)
|
||
|
|
header = parse_authorization_header(self.receiver.response_header)
|
||
|
|
- eq_(header['ext'], ext)
|
||
|
|
+ self.assertEqual(header['ext'], ext)
|
||
|
|
|
||
|
|
- @raises(MacMismatch)
|
||
|
|
def test_respond_with_wrong_app(self):
|
||
|
|
self.receive(sender_kw=dict(app='TAMPERED-WITH', dlg='delegation'))
|
||
|
|
self.receiver.respond(content='', content_type='')
|
||
|
|
@@ -621,10 +626,11 @@ class TestReceiver(Base):
|
||
|
|
|
||
|
|
self.receive(sender_kw=dict(app='real-app', dlg='delegation'))
|
||
|
|
|
||
|
|
- self.sender.accept_response(wrong_receiver.response_header,
|
||
|
|
- content='', content_type='')
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ MacMismatch, self.sender.accept_response,
|
||
|
|
+ wrong_receiver.response_header, content='', content_type=''
|
||
|
|
+ )
|
||
|
|
|
||
|
|
- @raises(MacMismatch)
|
||
|
|
def test_respond_with_wrong_dlg(self):
|
||
|
|
self.receive(sender_kw=dict(app='app', dlg='TAMPERED-WITH'))
|
||
|
|
self.receiver.respond(content='', content_type='')
|
||
|
|
@@ -632,27 +638,33 @@ class TestReceiver(Base):
|
||
|
|
|
||
|
|
self.receive(sender_kw=dict(app='app', dlg='real-dlg'))
|
||
|
|
|
||
|
|
- self.sender.accept_response(wrong_receiver.response_header,
|
||
|
|
- content='', content_type='')
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ MacMismatch, self.sender.accept_response,
|
||
|
|
+ wrong_receiver.response_header, content='', content_type=''
|
||
|
|
+ )
|
||
|
|
|
||
|
|
- @raises(MacMismatch)
|
||
|
|
def test_receive_wrong_method(self):
|
||
|
|
self.receive(method='GET')
|
||
|
|
wrong_sender = self.sender
|
||
|
|
- self.receive(method='POST', sender=wrong_sender)
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ MacMismatch, self.receive, method='POST', sender=wrong_sender)
|
||
|
|
|
||
|
|
- @raises(MacMismatch)
|
||
|
|
def test_receive_wrong_url(self):
|
||
|
|
self.receive(url='http://fakesite.com/')
|
||
|
|
wrong_sender = self.sender
|
||
|
|
- self.receive(url='http://realsite.com/', sender=wrong_sender)
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ MacMismatch, self.receive, url='http://realsite.com/',
|
||
|
|
+ sender=wrong_sender
|
||
|
|
+ )
|
||
|
|
|
||
|
|
- @raises(MisComputedContentHash)
|
||
|
|
def test_receive_wrong_content(self):
|
||
|
|
self.receive(sender_kw=dict(content='real request'),
|
||
|
|
content='real request')
|
||
|
|
wrong_sender = self.sender
|
||
|
|
- self.receive(content='TAMPERED WITH', sender=wrong_sender)
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ MisComputedContentHash, self.receive, content='TAMPERED WITH',
|
||
|
|
+ sender=wrong_sender
|
||
|
|
+ )
|
||
|
|
|
||
|
|
def test_expected_unhashed_empty_content(self):
|
||
|
|
# This test sets up a scenario where the receiver will receive empty
|
||
|
|
@@ -669,7 +681,6 @@ class TestReceiver(Base):
|
||
|
|
content_type=EmptyValue,
|
||
|
|
always_hash_content=False))
|
||
|
|
|
||
|
|
- @raises(MisComputedContentHash)
|
||
|
|
def test_expected_unhashed_empty_content_with_content_type(self):
|
||
|
|
# This test sets up a scenario where the receiver will receive an
|
||
|
|
# empty content string and no content hash in the auth header, but
|
||
|
|
@@ -677,13 +688,13 @@ class TestReceiver(Base):
|
||
|
|
# This is to confirm that the hash is calculated and compared (to the
|
||
|
|
# hash of mock empty payload, which should fail) when it appears that
|
||
|
|
# the sender has sent a 0-length payload body.
|
||
|
|
- self.receive(content='',
|
||
|
|
- content_type='text/plain',
|
||
|
|
- sender_kw=dict(content=EmptyValue,
|
||
|
|
- content_type=EmptyValue,
|
||
|
|
- always_hash_content=False))
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ MisComputedContentHash, self.receive, content='',
|
||
|
|
+ content_type='text/plain',
|
||
|
|
+ sender_kw=dict(content=EmptyValue,
|
||
|
|
+ content_type=EmptyValue,
|
||
|
|
+ always_hash_content=False))
|
||
|
|
|
||
|
|
- @raises(MisComputedContentHash)
|
||
|
|
def test_expected_unhashed_content_with_empty_content_type(self):
|
||
|
|
# This test sets up a scenario where the receiver will receive some
|
||
|
|
# content but the empty string for the content_type and no content hash
|
||
|
|
@@ -691,11 +702,12 @@ class TestReceiver(Base):
|
||
|
|
# This is to confirm that the hash is calculated and compared (to the
|
||
|
|
# hash of mock empty payload, which should fail) when the sender has
|
||
|
|
# sent unhashed content.
|
||
|
|
- self.receive(content='some content',
|
||
|
|
- content_type='',
|
||
|
|
- sender_kw=dict(content=EmptyValue,
|
||
|
|
- content_type=EmptyValue,
|
||
|
|
- always_hash_content=False))
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ MisComputedContentHash, self.receive, content='some content',
|
||
|
|
+ content_type='',
|
||
|
|
+ sender_kw=dict(content=EmptyValue,
|
||
|
|
+ content_type=EmptyValue,
|
||
|
|
+ always_hash_content=False))
|
||
|
|
|
||
|
|
def test_empty_content_with_content_type(self):
|
||
|
|
# This test sets up a scenario where the receiver will receive an
|
||
|
|
@@ -719,7 +731,6 @@ class TestReceiver(Base):
|
||
|
|
content_type=EmptyValue,
|
||
|
|
always_hash_content=False))
|
||
|
|
|
||
|
|
- @raises(MisComputedContentHash)
|
||
|
|
def test_expected_unhashed_no_content_with_content_type(self):
|
||
|
|
# This test sets up a scenario where the receiver will receive None for
|
||
|
|
# content and no content hash in the auth header, but some value for
|
||
|
|
@@ -729,13 +740,13 @@ class TestReceiver(Base):
|
||
|
|
# hash in the request to compare against. While this may not be in
|
||
|
|
# accordance with the js reference spec, it's the safest (ie. most
|
||
|
|
# secure) way of handling this bizarre set of circumstances.
|
||
|
|
- self.receive(content=None,
|
||
|
|
- content_type='text/plain',
|
||
|
|
- sender_kw=dict(content=EmptyValue,
|
||
|
|
- content_type=EmptyValue,
|
||
|
|
- always_hash_content=False))
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ MisComputedContentHash, self.receive, content=None,
|
||
|
|
+ content_type='text/plain',
|
||
|
|
+ sender_kw=dict(content=EmptyValue,
|
||
|
|
+ content_type=EmptyValue,
|
||
|
|
+ always_hash_content=False))
|
||
|
|
|
||
|
|
- @raises(MisComputedContentHash)
|
||
|
|
def test_expected_unhashed_content_with_no_content_type(self):
|
||
|
|
# This test sets up a scenario where the receiver will receive some
|
||
|
|
# content but no value for the content_type and no content hash in
|
||
|
|
@@ -743,11 +754,12 @@ class TestReceiver(Base):
|
||
|
|
# This is to confirm that the hash is calculated and compared (to the
|
||
|
|
# hash of mock empty payload, which should fail) when the sender has
|
||
|
|
# sent unhashed content.
|
||
|
|
- self.receive(content='some content',
|
||
|
|
- content_type=None,
|
||
|
|
- sender_kw=dict(content=EmptyValue,
|
||
|
|
- content_type=EmptyValue,
|
||
|
|
- always_hash_content=False))
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ MisComputedContentHash, self.receive, content='some content',
|
||
|
|
+ content_type=None,
|
||
|
|
+ sender_kw=dict(content=EmptyValue,
|
||
|
|
+ content_type=EmptyValue,
|
||
|
|
+ always_hash_content=False))
|
||
|
|
|
||
|
|
def test_no_content_with_content_type(self):
|
||
|
|
# This test sets up a scenario where the receiver will receive None for
|
||
|
|
@@ -760,28 +772,31 @@ class TestReceiver(Base):
|
||
|
|
sender_kw=dict(content='',
|
||
|
|
content_type='text/plain'))
|
||
|
|
|
||
|
|
- @raises(MissingContent)
|
||
|
|
def test_cannot_receive_empty_content_only(self):
|
||
|
|
content_type = 'text/plain'
|
||
|
|
- self.receive(sender_kw=dict(content='<content>',
|
||
|
|
- content_type=content_type),
|
||
|
|
- content=EmptyValue, content_type=content_type)
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ MissingContent, self.receive,
|
||
|
|
+ sender_kw=dict(content='<content>', content_type=content_type),
|
||
|
|
+ content=EmptyValue, content_type=content_type
|
||
|
|
+ )
|
||
|
|
|
||
|
|
- @raises(MissingContent)
|
||
|
|
def test_cannot_receive_empty_content_type_only(self):
|
||
|
|
content = '<content>'
|
||
|
|
- self.receive(sender_kw=dict(content=content,
|
||
|
|
- content_type='text/plain'),
|
||
|
|
- content=content, content_type=EmptyValue)
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ MissingContent, self.receive,
|
||
|
|
+ sender_kw=dict(content=content, content_type='text/plain'),
|
||
|
|
+ content=content, content_type=EmptyValue
|
||
|
|
+ )
|
||
|
|
|
||
|
|
- @raises(MisComputedContentHash)
|
||
|
|
def test_receive_wrong_content_type(self):
|
||
|
|
self.receive(sender_kw=dict(content_type='text/html'),
|
||
|
|
content_type='text/html')
|
||
|
|
wrong_sender = self.sender
|
||
|
|
|
||
|
|
- self.receive(content_type='application/json',
|
||
|
|
- sender=wrong_sender)
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ MisComputedContentHash, self.receive,
|
||
|
|
+ content_type='application/json', sender=wrong_sender
|
||
|
|
+ )
|
||
|
|
|
||
|
|
|
||
|
|
class TestSendAndReceive(Base):
|
||
|
|
@@ -854,7 +869,7 @@ class TestBewit(Base):
|
||
|
|
bewit = get_bewit(res)
|
||
|
|
|
||
|
|
expected = '123456\\1356420707\\IGYmLgIqLrCe8CxvKPs4JlWIA+UjWJJouwgARiVhCAg=\\'
|
||
|
|
- eq_(b64decode(bewit).decode('ascii'), expected)
|
||
|
|
+ self.assertEqual(b64decode(bewit).decode('ascii'), expected)
|
||
|
|
|
||
|
|
def test_bewit_with_binary_id(self):
|
||
|
|
# Check for exceptions in get_bewit call with binary id
|
||
|
|
@@ -877,26 +892,26 @@ class TestBewit(Base):
|
||
|
|
bewit = get_bewit(res)
|
||
|
|
|
||
|
|
expected = '123456\\1356420707\\kscxwNR2tJpP1T1zDLNPbB5UiKIU9tOSJXTUdG7X9h8=\\xandyandz'
|
||
|
|
- eq_(b64decode(bewit).decode('ascii'), expected)
|
||
|
|
+ self.assertEqual(b64decode(bewit).decode('ascii'), expected)
|
||
|
|
|
||
|
|
- @raises(BadHeaderValue)
|
||
|
|
def test_bewit_with_invalid_ext(self):
|
||
|
|
res = Resource(url='https://example.com/somewhere/over/the/rainbow',
|
||
|
|
method='GET', credentials=self.credentials,
|
||
|
|
timestamp=1356420407 + 300,
|
||
|
|
nonce='',
|
||
|
|
ext='xand\\yandz')
|
||
|
|
- get_bewit(res)
|
||
|
|
+ self.assertRaises(BadHeaderValue, get_bewit, res)
|
||
|
|
|
||
|
|
- @raises(BadHeaderValue)
|
||
|
|
def test_bewit_with_backslashes_in_id(self):
|
||
|
|
credentials = self.credentials
|
||
|
|
credentials['id'] = '123\\456'
|
||
|
|
- res = Resource(url='https://example.com/somewhere/over/the/rainbow',
|
||
|
|
- method='GET', credentials=self.credentials,
|
||
|
|
- timestamp=1356420407 + 300,
|
||
|
|
- nonce='')
|
||
|
|
- get_bewit(res)
|
||
|
|
+ with self.assertRaises(BadHeaderValue):
|
||
|
|
+ res = Resource(
|
||
|
|
+ url='https://example.com/somewhere/over/the/rainbow',
|
||
|
|
+ method='GET', credentials=self.credentials,
|
||
|
|
+ timestamp=1356420407 + 300,
|
||
|
|
+ nonce='')
|
||
|
|
+ get_bewit(res)
|
||
|
|
|
||
|
|
def test_bewit_with_port(self):
|
||
|
|
res = Resource(url='https://example.com:8080/somewhere/over/the/rainbow',
|
||
|
|
@@ -905,22 +920,20 @@ class TestBewit(Base):
|
||
|
|
bewit = get_bewit(res)
|
||
|
|
|
||
|
|
expected = '123456\\1356420707\\hZbJ3P2cKEo4ky0C8jkZAkRyCZueg4WSNbxV7vq3xHU=\\xandyandz'
|
||
|
|
- eq_(b64decode(bewit).decode('ascii'), expected)
|
||
|
|
+ self.assertEqual(b64decode(bewit).decode('ascii'), expected)
|
||
|
|
|
||
|
|
- @raises(ValueError)
|
||
|
|
def test_bewit_with_nonce(self):
|
||
|
|
res = Resource(url='https://example.com/somewhere/over/the/rainbow',
|
||
|
|
method='GET', credentials=self.credentials,
|
||
|
|
timestamp=1356420407 + 300,
|
||
|
|
nonce='n1')
|
||
|
|
- get_bewit(res)
|
||
|
|
+ self.assertRaises(ValueError, get_bewit, res)
|
||
|
|
|
||
|
|
- @raises(ValueError)
|
||
|
|
def test_bewit_invalid_method(self):
|
||
|
|
res = Resource(url='https://example.com:8080/somewhere/over/the/rainbow',
|
||
|
|
method='POST', credentials=self.credentials,
|
||
|
|
timestamp=1356420407 + 300, nonce='')
|
||
|
|
- get_bewit(res)
|
||
|
|
+ self.assertRaises(ValueError, get_bewit, res)
|
||
|
|
|
||
|
|
def test_strip_bewit(self):
|
||
|
|
bewit = b'123456\\1356420707\\IGYmLgIqLrCe8CxvKPs4JlWIA+UjWJJouwgARiVhCAg=\\'
|
||
|
|
@@ -931,10 +944,9 @@ class TestBewit(Base):
|
||
|
|
self.assertEqual(raw_bewit, bewit)
|
||
|
|
self.assertEqual(stripped_url, "https://example.com/somewhere/over/the/rainbow")
|
||
|
|
|
||
|
|
- @raises(InvalidBewit)
|
||
|
|
def test_strip_url_without_bewit(self):
|
||
|
|
url = "https://example.com/somewhere/over/the/rainbow"
|
||
|
|
- strip_bewit(url)
|
||
|
|
+ self.assertRaises(InvalidBewit, strip_bewit, url)
|
||
|
|
|
||
|
|
def test_parse_bewit(self):
|
||
|
|
bewit = b'123456\\1356420707\\IGYmLgIqLrCe8CxvKPs4JlWIA+UjWJJouwgARiVhCAg=\\'
|
||
|
|
@@ -954,23 +966,20 @@ class TestBewit(Base):
|
||
|
|
self.assertEqual(bewit.mac, 'IGYmLgIqLrCe8CxvKPs4JlWIA+UjWJJouwgARiVhCAg=')
|
||
|
|
self.assertEqual(bewit.ext, 'xandyandz')
|
||
|
|
|
||
|
|
- @raises(InvalidBewit)
|
||
|
|
def test_parse_bewit_with_ext_and_backslashes(self):
|
||
|
|
bewit = b'123456\\1356420707\\IGYmLgIqLrCe8CxvKPs4JlWIA+UjWJJouwgARiVhCAg=\\xand\\yandz'
|
||
|
|
bewit = urlsafe_b64encode(bewit).decode('ascii')
|
||
|
|
- parse_bewit(bewit)
|
||
|
|
+ self.assertRaises(InvalidBewit, parse_bewit, bewit)
|
||
|
|
|
||
|
|
- @raises(InvalidBewit)
|
||
|
|
def test_parse_invalid_bewit_with_only_one_part(self):
|
||
|
|
bewit = b'12345'
|
||
|
|
bewit = urlsafe_b64encode(bewit).decode('ascii')
|
||
|
|
- bewit = parse_bewit(bewit)
|
||
|
|
+ self.assertRaises(InvalidBewit, parse_bewit, bewit)
|
||
|
|
|
||
|
|
- @raises(InvalidBewit)
|
||
|
|
def test_parse_invalid_bewit_with_only_two_parts(self):
|
||
|
|
bewit = b'1\\2'
|
||
|
|
bewit = urlsafe_b64encode(bewit).decode('ascii')
|
||
|
|
- bewit = parse_bewit(bewit)
|
||
|
|
+ self.assertRaises(InvalidBewit, parse_bewit, bewit)
|
||
|
|
|
||
|
|
def test_validate_bewit(self):
|
||
|
|
bewit = b'123456\\1356420707\\IGYmLgIqLrCe8CxvKPs4JlWIA+UjWJJouwgARiVhCAg=\\'
|
||
|
|
@@ -990,7 +999,6 @@ class TestBewit(Base):
|
||
|
|
})
|
||
|
|
self.assertTrue(check_bewit(url, credential_lookup=credential_lookup, now=1356420407 + 10))
|
||
|
|
|
||
|
|
- @raises(InvalidBewit)
|
||
|
|
def test_validate_bewit_with_ext_and_backslashes(self):
|
||
|
|
bewit = b'123456\\1356420707\\b82LLIxG5UDkaChLU953mC+SMrbniV1sb8KiZi9cSsc=\\xand\\yandz'
|
||
|
|
bewit = urlsafe_b64encode(bewit).decode('ascii')
|
||
|
|
@@ -998,9 +1006,11 @@ class TestBewit(Base):
|
||
|
|
credential_lookup = self.make_credential_lookup({
|
||
|
|
self.credentials['id']: self.credentials,
|
||
|
|
})
|
||
|
|
- check_bewit(url, credential_lookup=credential_lookup, now=1356420407 + 10)
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ InvalidBewit, check_bewit, url,
|
||
|
|
+ credential_lookup=credential_lookup, now=1356420407 + 10
|
||
|
|
+ )
|
||
|
|
|
||
|
|
- @raises(TokenExpired)
|
||
|
|
def test_validate_expired_bewit(self):
|
||
|
|
bewit = b'123456\\1356420707\\IGYmLgIqLrCe8CxvKPs4JlWIA+UjWJJouwgARiVhCAg=\\'
|
||
|
|
bewit = urlsafe_b64encode(bewit).decode('ascii')
|
||
|
|
@@ -1008,9 +1018,11 @@ class TestBewit(Base):
|
||
|
|
credential_lookup = self.make_credential_lookup({
|
||
|
|
self.credentials['id']: self.credentials,
|
||
|
|
})
|
||
|
|
- check_bewit(url, credential_lookup=credential_lookup, now=1356420407 + 1000)
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ TokenExpired, check_bewit, url,
|
||
|
|
+ credential_lookup=credential_lookup, now=1356420407 + 1000
|
||
|
|
+ )
|
||
|
|
|
||
|
|
- @raises(CredentialsLookupError)
|
||
|
|
def test_validate_bewit_with_unknown_credentials(self):
|
||
|
|
bewit = b'123456\\1356420707\\IGYmLgIqLrCe8CxvKPs4JlWIA+UjWJJouwgARiVhCAg=\\'
|
||
|
|
bewit = urlsafe_b64encode(bewit).decode('ascii')
|
||
|
|
@@ -1018,7 +1030,10 @@ class TestBewit(Base):
|
||
|
|
credential_lookup = self.make_credential_lookup({
|
||
|
|
'other_id': self.credentials,
|
||
|
|
})
|
||
|
|
- check_bewit(url, credential_lookup=credential_lookup, now=1356420407 + 10)
|
||
|
|
+ self.assertRaises(
|
||
|
|
+ CredentialsLookupError, check_bewit, url,
|
||
|
|
+ credential_lookup=credential_lookup, now=1356420407 + 10
|
||
|
|
+ )
|
||
|
|
|
||
|
|
|
||
|
|
class TestPayloadHash(Base):
|