python-botocore/remove_nose.patch
Robert Schweikert 958455abf6 Accepting request 829838 from home:mcepl:branches:devel:languages:python:aws
- Add remove_nose.patch which ports test suite from nose to
  pytest (mostly just plain unittest, I just don't know how to
  mark tests as slow). Filed upstream as gh#boto/botocore#2134.

OBS-URL: https://build.opensuse.org/request/show/829838
OBS-URL: https://build.opensuse.org/package/show/devel:languages:python:aws/python-botocore?expand=0&rev=82
2020-09-14 13:56:56 +00:00

3824 lines
148 KiB
Diff

---
requirements.txt | 2
setup.cfg | 2
tests/__init__.py | 22 +-
tests/acceptance/features/steps/base.py | 3
tests/functional/csm/test_monitoring.py | 4
tests/functional/test_client_class_names.py | 21 +-
tests/functional/test_cognito_idp.py | 6
tests/functional/test_credentials.py | 4
tests/functional/test_endpoints.py | 9 -
tests/functional/test_history.py | 4
tests/functional/test_model_backcompat.py | 9 -
tests/functional/test_paginate.py | 44 ++---
tests/functional/test_regions.py | 115 +++++++-------
tests/functional/test_response_shadowing.py | 13 -
tests/functional/test_retry.py | 3
tests/functional/test_s3.py | 9 -
tests/functional/test_service_names.py | 21 +-
tests/functional/test_stub.py | 12 -
tests/integration/test_client.py | 6
tests/integration/test_ec2.py | 2
tests/integration/test_emr.py | 4
tests/integration/test_s3.py | 17 +-
tests/integration/test_smoke.py | 12 -
tests/integration/test_sts.py | 4
tests/integration/test_waiters.py | 4
tests/unit/auth/test_sigv4.py | 20 +-
tests/unit/docs/test_utils.py | 2
tests/unit/response_parsing/README.rst | 12 -
tests/unit/response_parsing/test_response_parsing.py | 8 -
tests/unit/retries/test_special.py | 2
tests/unit/retries/test_standard.py | 34 +---
tests/unit/test_awsrequest.py | 4
tests/unit/test_client.py | 18 +-
tests/unit/test_compat.py | 134 ++++++++--------
tests/unit/test_config_provider.py | 11 -
tests/unit/test_credentials.py | 20 +-
tests/unit/test_errorfactory.py | 3
tests/unit/test_eventstream.py | 152 ++++++++++---------
tests/unit/test_exceptions.py | 6
tests/unit/test_handlers.py | 2
tests/unit/test_http_client_exception_mapping.py | 20 --
tests/unit/test_http_session.py | 13 -
tests/unit/test_loaders.py | 9 -
tests/unit/test_model.py | 12 -
tests/unit/test_paginate.py | 2
tests/unit/test_parsers.py | 19 +-
tests/unit/test_protocols.py | 30 +--
tests/unit/test_s3_addressing.py | 3
tests/unit/test_utils.py | 2
tests/unit/test_waiters.py | 2
50 files changed, 427 insertions(+), 465 deletions(-)
--- a/setup.cfg
+++ b/setup.cfg
@@ -13,3 +13,5 @@ requires-dist =
tag_build =
tag_date = 0
+[tool:pytest]
+markers = slow: marks tests as slow
\ No newline at end of file
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -13,7 +13,10 @@
import os
import sys
-import mock
+try:
+ import mock
+except ImportError:
+ from unittest import mock
import time
import random
import shutil
@@ -29,8 +32,6 @@ from subprocess import Popen, PIPE
from dateutil.tz import tzlocal
import unittest
-from nose.tools import assert_equal
-
import botocore.loaders
import botocore.session
from botocore.awsrequest import AWSResponse
@@ -346,16 +347,16 @@ def assert_url_equal(url1, url2):
# Because the query string ordering isn't relevant, we have to parse
# every single part manually and then handle the query string.
- assert_equal(parts1.scheme, parts2.scheme)
- assert_equal(parts1.netloc, parts2.netloc)
- assert_equal(parts1.path, parts2.path)
- assert_equal(parts1.params, parts2.params)
- assert_equal(parts1.fragment, parts2.fragment)
- assert_equal(parts1.username, parts2.username)
- assert_equal(parts1.password, parts2.password)
- assert_equal(parts1.hostname, parts2.hostname)
- assert_equal(parts1.port, parts2.port)
- assert_equal(parse_qs(parts1.query), parse_qs(parts2.query))
+ assert parts1.scheme == parts2.scheme
+ assert parts1.netloc == parts2.netloc
+ assert parts1.path == parts2.path
+ assert parts1.params == parts2.params
+ assert parts1.fragment == parts2.fragment
+ assert parts1.username == parts2.username
+ assert parts1.password == parts2.password
+ assert parts1.hostname == parts2.hostname
+ assert parts1.port == parts2.port
+ assert parse_qs(parts1.query) == parse_qs(parts2.query)
class HTTPStubberException(Exception):
--- a/tests/acceptance/features/steps/base.py
+++ b/tests/acceptance/features/steps/base.py
@@ -4,7 +4,6 @@ from botocore import xform_name
from botocore.exceptions import ClientError
from behave import when, then
-from nose.tools import assert_equal
def _params_from_table(table):
@@ -72,7 +71,7 @@ def api_call_with_json_and_error(context
@then(u'I expect the response error code to be "{}"')
def then_expected_error(context, code):
- assert_equal(context.error_response.response['Error']['Code'], code)
+ assert context.error_response.response['Error']['Code'] == code
@then(u'the value at "{}" should be a list')
--- a/tests/functional/csm/test_monitoring.py
+++ b/tests/functional/csm/test_monitoring.py
@@ -18,8 +18,7 @@ import os
import socket
import threading
-import mock
-from nose.tools import assert_equal
+from tests import mock
from tests import temporary_file
from tests import ClientHTTPStubber
@@ -50,7 +49,7 @@ EXPECTED_EXCEPTIONS_THROWN = (
def test_client_monitoring():
test_cases = _load_test_cases()
for case in test_cases:
- yield _run_test_case, case
+ _run_test_case(case)
def _load_test_cases():
@@ -121,8 +120,7 @@ def _run_test_case(case):
case['configuration'], listener.port) as session:
for api_call in case['apiCalls']:
_make_api_call(session, api_call)
- assert_equal(
- listener.received_events, case['expectedMonitoringEvents'])
+ assert listener.received_events == case['expectedMonitoringEvents']
def _make_api_call(session, api_call):
--- a/tests/functional/test_client_class_names.py
+++ b/tests/functional/test_client_class_names.py
@@ -10,11 +10,9 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
-from nose.tools import assert_equal
-
+from tests import unittest
import botocore.session
-
REGION = 'us-east-1'
SERVICE_TO_CLASS_NAME = {
@@ -69,13 +67,10 @@ SERVICE_TO_CLASS_NAME = {
}
-def test_client_has_correct_class_name():
- session = botocore.session.get_session()
- for service_name in SERVICE_TO_CLASS_NAME:
- client = session.create_client(service_name, REGION)
- yield (_assert_class_name_matches_ref_class_name, client,
- SERVICE_TO_CLASS_NAME[service_name])
-
-
-def _assert_class_name_matches_ref_class_name(client, ref_class_name):
- assert_equal(client.__class__.__name__, ref_class_name)
+class TestClientClassNames(unittest.TestCase):
+ def test_client_has_correct_class_name(self):
+ session = botocore.session.get_session()
+ for service_name in SERVICE_TO_CLASS_NAME:
+ client = session.create_client(service_name, REGION)
+ self.assertEqual(client.__class__.__name__,
+ SERVICE_TO_CLASS_NAME[service_name])
--- a/tests/functional/test_cognito_idp.py
+++ b/tests/functional/test_cognito_idp.py
@@ -10,9 +10,7 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
-import mock
-
-from nose.tools import assert_false
+from tests import mock
from tests import create_session, ClientHTTPStubber
@@ -95,8 +93,7 @@ def test_unsigned_operations():
client = session.create_client('cognito-idp', 'us-west-2')
for operation, params in operation_params.items():
- test_case = UnsignedOperationTestCase(client, operation, params)
- yield test_case.run
+ UnsignedOperationTestCase(client, operation, params).run()
class UnsignedOperationTestCase(object):
@@ -114,7 +111,5 @@ class UnsignedOperationTestCase(object):
operation(**self._parameters)
request = self._http_stubber.requests[0]
- assert_false(
- 'authorization' in request.headers,
+ assert 'authorization' not in request.headers, \
'authorization header found in unsigned operation'
- )
--- a/tests/functional/test_endpoints.py
+++ b/tests/functional/test_endpoints.py
@@ -10,7 +10,6 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
-from nose.tools import assert_equal
from botocore.session import get_session
@@ -134,9 +133,7 @@ def test_endpoint_matches_service():
# prefix.
endpoint_prefix = ENDPOINT_PREFIX_OVERRIDE.get(endpoint_prefix,
endpoint_prefix)
- yield (_assert_known_endpoint_prefix,
- endpoint_prefix,
- known_endpoint_prefixes)
+ _assert_known_endpoint_prefix(endpoint_prefix, known_endpoint_prefixes)
def _assert_known_endpoint_prefix(endpoint_prefix, known_endpoint_prefixes):
@@ -156,7 +153,7 @@ def test_service_name_matches_endpoint_p
services = loader.list_available_services('service-2')
for service in services:
- yield _assert_service_name_matches_endpoint_prefix, session, service
+ _assert_service_name_matches_endpoint_prefix(session, service)
def _assert_service_name_matches_endpoint_prefix(session, service_name):
@@ -166,8 +163,6 @@ def _assert_service_name_matches_endpoin
# Handle known exceptions where we have renamed the service directory
# for one reason or another.
actual_service_name = SERVICE_RENAMES.get(service_name, service_name)
- assert_equal(
- computed_name, actual_service_name,
- "Actual service name `%s` does not match expected service name "
- "we computed: `%s`" % (
- actual_service_name, computed_name))
+ assert computed_name == actual_service_name, \
+ ("Actual service name `%s` does not match expected service name " +
+ "we computed: `%s`") % (actual_service_name, computed_name)
--- a/tests/functional/test_model_backcompat.py
+++ b/tests/functional/test_model_backcompat.py
@@ -12,7 +12,6 @@
# language governing permissions and limitations under the License.
import os
-from nose.tools import assert_equal
from botocore.session import Session
from tests import ClientHTTPStubber
@@ -60,8 +59,7 @@ def test_old_model_continues_to_work():
'Content-Type': 'application/x-amz-json-1.1'},
body=b'{"CertificateSummaryList":[]}')
response = client.list_certificates()
- assert_equal(
- response,
+ assert response == \
{'CertificateSummaryList': [],
'ResponseMetadata': {
'HTTPHeaders': {
@@ -73,8 +71,7 @@ def test_old_model_continues_to_work():
'RequestId': 'abcd',
'RetryAttempts': 0}
}
- )
# Also verify we can use the paginators as well.
- assert_equal(client.can_paginate('list_certificates'), True)
- assert_equal(client.waiter_names, ['certificate_validated'])
+ assert client.can_paginate('list_certificates')
+ assert client.waiter_names == ['certificate_validated']
--- a/tests/functional/test_paginate.py
+++ b/tests/functional/test_paginate.py
@@ -14,9 +14,7 @@ from __future__ import division
from math import ceil
from datetime import datetime
-from nose.tools import assert_equal
-
-from tests import random_chars
+from tests import random_chars, unittest
from tests import BaseSessionTest
from botocore.stub import Stubber, StubAssertionError
from botocore.paginate import TokenDecoder, TokenEncoder
@@ -79,7 +77,7 @@ class TestAutoscalingPagination(BaseSess
self.stubber.activate()
def _setup_scaling_pagination(self, page_size=200, max_items=100,
- total_items=600):
+ total_items=600):
"""
Add to the stubber to test paginating describe_scaling_activities.
@@ -217,22 +215,22 @@ class TestCloudwatchLogsPagination(BaseS
self.assertEqual(len(result['events']), 1)
-def test_token_encoding():
- cases = [
- {'foo': 'bar'},
- {'foo': b'bar'},
- {'foo': {'bar': b'baz'}},
- {'foo': ['bar', b'baz']},
- {'foo': b'\xff'},
- {'foo': {'bar': b'baz', 'bin': [b'bam']}},
- ]
-
- for token_dict in cases:
- yield assert_token_encodes_and_decodes, token_dict
-
-
-def assert_token_encodes_and_decodes(token_dict):
- encoded = TokenEncoder().encode(token_dict)
- assert isinstance(encoded, six.string_types)
- decoded = TokenDecoder().decode(encoded)
- assert_equal(decoded, token_dict)
+class TestTokenEncoding(unittest.TestCase):
+ def test_token_encoding(self):
+ cases = [
+ {'foo': 'bar'},
+ {'foo': b'bar'},
+ {'foo': {'bar': b'baz'}},
+ {'foo': ['bar', b'baz']},
+ {'foo': b'\xff'},
+ {'foo': {'bar': b'baz', 'bin': [b'bam']}},
+ ]
+
+ for token_dict in cases:
+ self.assert_token_encodes_and_decodes(token_dict)
+
+ def assert_token_encodes_and_decodes(self, token_dict):
+ encoded = TokenEncoder().encode(token_dict)
+ assert isinstance(encoded, six.string_types)
+ decoded = TokenDecoder().decode(encoded)
+ self.assertEqual(decoded, token_dict)
--- a/tests/functional/test_regions.py
+++ b/tests/functional/test_regions.py
@@ -10,10 +10,9 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
-from tests import create_session
+from tests import create_session, unittest
-import mock
-from nose.tools import assert_equal, assert_raises
+from tests import mock
from botocore.client import ClientEndpointBridge
from botocore.exceptions import NoRegionError
@@ -448,64 +447,62 @@ def _get_patched_session():
return session
-def test_known_endpoints():
- # Verify the actual values from the partition files. While
- # TestEndpointHeuristics verified the generic functionality given any
- # endpoints file, this test actually verifies the partition data against a
- # fixed list of known endpoints. This list doesn't need to be kept 100% up
- # to date, but serves as a basis for regressions as the endpoint data
- # logic evolves.
- resolver = _get_patched_session()._get_internal_component(
- 'endpoint_resolver')
- for region_name, service_dict in KNOWN_REGIONS.items():
- for service_name, endpoint in service_dict.items():
- yield (_test_single_service_region, service_name,
- region_name, endpoint, resolver)
-
-
-def _test_single_service_region(service_name, region_name,
- expected_endpoint, resolver):
- bridge = ClientEndpointBridge(resolver, None, None)
- result = bridge.resolve(service_name, region_name)
- expected = 'https://%s' % expected_endpoint
- assert_equal(result['endpoint_url'], expected)
-
-
-# Ensure that all S3 regions use s3v4 instead of v4
-def test_all_s3_endpoints_have_s3v4():
- session = _get_patched_session()
- partitions = session.get_available_partitions()
- resolver = session._get_internal_component('endpoint_resolver')
- for partition_name in partitions:
- for endpoint in session.get_available_regions('s3', partition_name):
- resolved = resolver.construct_endpoint('s3', endpoint)
- assert 's3v4' in resolved['signatureVersions']
- assert 'v4' not in resolved['signatureVersions']
-
-
-def test_known_endpoints():
- resolver = _get_patched_session()._get_internal_component(
- 'endpoint_resolver')
- for service_name, endpoint in KNOWN_AWS_PARTITION_WIDE.items():
- yield (_test_single_service_partition_endpoint, service_name,
- endpoint, resolver)
-
-
-def _test_single_service_partition_endpoint(service_name, expected_endpoint,
- resolver):
- bridge = ClientEndpointBridge(resolver)
- result = bridge.resolve(service_name)
- assert_equal(result['endpoint_url'], expected_endpoint)
-
-
-def test_non_partition_endpoint_requires_region():
- resolver = _get_patched_session()._get_internal_component(
- 'endpoint_resolver')
- assert_raises(NoRegionError, resolver.construct_endpoint, 'ec2')
+class TestRegions(unittest.TestCase):
+ def test_known_endpoints(self):
+ # Verify the actual values from the partition files. While
+ # TestEndpointHeuristics verified the generic functionality given
+ # any endpoints file, this test actually verifies the partition
+ # data against a fixed list of known endpoints. This list doesn't
+ # need to be kept 100% up to date, but serves as a basis for
+ # regressions as the endpoint data logic evolves.
+ resolver = _get_patched_session()._get_internal_component(
+ 'endpoint_resolver')
+ for region_name, service_dict in KNOWN_REGIONS.items():
+ for service_name, endpoint in service_dict.items():
+ self._test_single_service_region(service_name,
+ region_name, endpoint,
+ resolver)
+
+ def _test_single_service_region(self, service_name, region_name,
+ expected_endpoint, resolver):
+ bridge = ClientEndpointBridge(resolver, None, None)
+ result = bridge.resolve(service_name, region_name)
+ expected = 'https://%s' % expected_endpoint
+ self.assertEqual(result['endpoint_url'], expected)
+
+ # Ensure that all S3 regions use s3v4 instead of v4
+ def test_all_s3_endpoints_have_s3v4(self):
+ session = _get_patched_session()
+ partitions = session.get_available_partitions()
+ resolver = session._get_internal_component('endpoint_resolver')
+ for partition_name in partitions:
+ for endpoint in session.get_available_regions('s3', partition_name):
+ resolved = resolver.construct_endpoint('s3', endpoint)
+ assert 's3v4' in resolved['signatureVersions']
+ assert 'v4' not in resolved['signatureVersions']
+
+ def _test_single_service_partition_endpoint(self, service_name,
+ expected_endpoint,
+ resolver):
+ bridge = ClientEndpointBridge(resolver)
+ result = bridge.resolve(service_name)
+ assert result['endpoint_url'] == expected_endpoint
+
+ def test_known_endpoints_other(self):
+ resolver = _get_patched_session()._get_internal_component(
+ 'endpoint_resolver')
+ for service_name, endpoint in KNOWN_AWS_PARTITION_WIDE.items():
+ self._test_single_service_partition_endpoint(service_name,
+ endpoint, resolver)
+
+ def test_non_partition_endpoint_requires_region(self):
+ resolver = _get_patched_session()._get_internal_component(
+ 'endpoint_resolver')
+ with self.assertRaises(NoRegionError):
+ resolver.construct_endpoint('ec2')
class TestEndpointResolution(BaseSessionTest):
-
def setUp(self):
super(TestEndpointResolution, self).setUp()
self.xml_response = (
@@ -526,7 +523,7 @@ class TestEndpointResolution(BaseSession
client, stubber = self.create_stubbed_client('s3', 'us-east-2')
stubber.add_response()
client.list_buckets()
- self.assertEquals(
+ self.assertEqual(
stubber.requests[0].url,
'https://s3.us-east-2.amazonaws.com/'
)
@@ -537,7 +534,7 @@ class TestEndpointResolution(BaseSession
client.list_buckets()
# Validate we don't fall back to partition endpoint for
# regionalized services.
- self.assertEquals(
+ self.assertEqual(
stubber.requests[0].url,
'https://s3.not-real.amazonaws.com/'
)
--- a/tests/functional/test_response_shadowing.py
+++ b/tests/functional/test_response_shadowing.py
@@ -11,7 +11,6 @@
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
from botocore.session import Session
-from nose.tools import assert_false
def _all_services():
@@ -33,17 +32,17 @@ def _assert_not_shadowed(key, shape):
msg = (
'Found shape "%s" that shadows the botocore response key "%s"'
)
- assert_false(key in shape.members, msg % (shape.name, key))
+ assert key not in shape.members, msg % (shape.name, key)
def test_response_metadata_is_not_shadowed():
for operation_model in _all_operations():
shape = operation_model.output_shape
- yield _assert_not_shadowed, 'ResponseMetadata', shape
+ _assert_not_shadowed('ResponseMetadata', shape)
def test_exceptions_do_not_shadow():
for service_model in _all_services():
for shape in service_model.error_shapes:
- yield _assert_not_shadowed, 'ResponseMetadata', shape
- yield _assert_not_shadowed, 'Error', shape
+ _assert_not_shadowed('ResponseMetadata', shape)
+ _assert_not_shadowed('Error', shape)
--- a/tests/functional/test_s3.py
+++ b/tests/functional/test_s3.py
@@ -14,7 +14,6 @@ import re
from tests import temporary_file
from tests import unittest, mock, BaseSessionTest, create_session, ClientHTTPStubber
-from nose.tools import assert_equal
import botocore.session
from botocore.config import Config
@@ -447,8 +446,8 @@ class TestS3Copy(BaseS3OperationTest):
)
# Validate we retried and got second body
- self.assertEquals(len(self.http_stubber.requests), 2)
- self.assertEquals(response['ResponseMetadata']['HTTPStatusCode'], 200)
+ self.assertEqual(len(self.http_stubber.requests), 2)
+ self.assertEqual(response['ResponseMetadata']['HTTPStatusCode'], 200)
self.assertTrue('CopyObjectResult' in response)
def test_s3_copy_object_with_incomplete_response(self):
@@ -1193,48 +1192,49 @@ class TestGeneratePresigned(BaseS3Operat
'get_object', {'Bucket': 'mybucket', 'Key': 'mykey'})
self.assert_is_v2_presigned_url(url)
+
def test_checksums_included_in_expected_operations():
"""Validate expected calls include Content-MD5 header"""
t = S3ChecksumCases(_verify_checksum_in_headers)
- yield t.case('put_bucket_tagging',
- {"Bucket": "foo", "Tagging":{"TagSet":[]}})
- yield t.case('put_bucket_lifecycle',
- {"Bucket": "foo", "LifecycleConfiguration":{"Rules":[]}})
- yield t.case('put_bucket_lifecycle_configuration',
- {"Bucket": "foo", "LifecycleConfiguration":{"Rules":[]}})
- yield t.case('put_bucket_cors',
- {"Bucket": "foo", "CORSConfiguration":{"CORSRules": []}})
- yield t.case('delete_objects',
- {"Bucket": "foo", "Delete": {"Objects": [{"Key": "bar"}]}})
- yield t.case('put_bucket_replication',
- {"Bucket": "foo",
- "ReplicationConfiguration": {"Role":"", "Rules": []}})
- yield t.case('put_bucket_acl',
- {"Bucket": "foo", "AccessControlPolicy":{}})
- yield t.case('put_bucket_logging',
- {"Bucket": "foo",
- "BucketLoggingStatus":{}})
- yield t.case('put_bucket_notification',
- {"Bucket": "foo", "NotificationConfiguration":{}})
- yield t.case('put_bucket_policy',
- {"Bucket": "foo", "Policy": "<bucket-policy>"})
- yield t.case('put_bucket_request_payment',
- {"Bucket": "foo", "RequestPaymentConfiguration":{"Payer": ""}})
- yield t.case('put_bucket_versioning',
- {"Bucket": "foo", "VersioningConfiguration":{}})
- yield t.case('put_bucket_website',
- {"Bucket": "foo",
- "WebsiteConfiguration":{}})
- yield t.case('put_object_acl',
- {"Bucket": "foo", "Key": "bar", "AccessControlPolicy":{}})
- yield t.case('put_object_legal_hold',
- {"Bucket": "foo", "Key": "bar", "LegalHold":{"Status": "ON"}})
- yield t.case('put_object_retention',
- {"Bucket": "foo", "Key": "bar",
- "Retention":{"RetainUntilDate":"2020-11-05"}})
- yield t.case('put_object_lock_configuration',
- {"Bucket": "foo", "ObjectLockConfiguration":{}})
+ t.case('put_bucket_tagging',
+ {"Bucket": "foo", "Tagging": {"TagSet": []}})
+ t.case('put_bucket_lifecycle',
+ {"Bucket": "foo", "LifecycleConfiguration": {"Rules": []}})
+ t.case('put_bucket_lifecycle_configuration',
+ {"Bucket": "foo", "LifecycleConfiguration": {"Rules": []}})
+ t.case('put_bucket_cors',
+ {"Bucket": "foo", "CORSConfiguration": {"CORSRules": []}})
+ t.case('delete_objects',
+ {"Bucket": "foo", "Delete": {"Objects": [{"Key": "bar"}]}})
+ t.case('put_bucket_replication',
+ {"Bucket": "foo",
+ "ReplicationConfiguration": {"Role": "", "Rules": []}})
+ t.case('put_bucket_acl',
+ {"Bucket": "foo", "AccessControlPolicy": {}})
+ t.case('put_bucket_logging',
+ {"Bucket": "foo",
+ "BucketLoggingStatus": {}})
+ t.case('put_bucket_notification',
+ {"Bucket": "foo", "NotificationConfiguration": {}})
+ t.case('put_bucket_policy',
+ {"Bucket": "foo", "Policy": "<bucket-policy>"})
+ t.case('put_bucket_request_payment',
+ {"Bucket": "foo", "RequestPaymentConfiguration": {"Payer": ""}})
+ t.case('put_bucket_versioning',
+ {"Bucket": "foo", "VersioningConfiguration": {}})
+ t.case('put_bucket_website',
+ {"Bucket": "foo",
+ "WebsiteConfiguration": {}})
+ t.case('put_object_acl',
+ {"Bucket": "foo", "Key": "bar", "AccessControlPolicy": {}})
+ t.case('put_object_legal_hold',
+ {"Bucket": "foo", "Key": "bar", "LegalHold": {"Status": "ON"}})
+ t.case('put_object_retention',
+ {"Bucket": "foo", "Key": "bar",
+ "Retention": {"RetainUntilDate": "2020-11-05"}})
+ t.case('put_object_lock_configuration',
+ {"Bucket": "foo", "ObjectLockConfiguration": {}})
def _verify_checksum_in_headers(operation, operation_kwargs):
@@ -1259,36 +1259,36 @@ def test_correct_url_used_for_s3():
t = S3AddressingCases(_verify_expected_endpoint_url)
# The default behavior for sigv2. DNS compatible buckets
- yield t.case(region='us-west-2', bucket='bucket', key='key',
- signature_version='s3',
- expected_url='https://bucket.s3.us-west-2.amazonaws.com/key')
- yield t.case(region='us-east-1', bucket='bucket', key='key',
- signature_version='s3',
- expected_url='https://bucket.s3.amazonaws.com/key')
- yield t.case(region='us-west-1', bucket='bucket', key='key',
- signature_version='s3',
- expected_url='https://bucket.s3.us-west-1.amazonaws.com/key')
- yield t.case(region='us-west-1', bucket='bucket', key='key',
- signature_version='s3', is_secure=False,
- expected_url='http://bucket.s3.us-west-1.amazonaws.com/key')
+ t.case(region='us-west-2', bucket='bucket', key='key',
+ signature_version='s3',
+ expected_url='https://bucket.s3.us-west-2.amazonaws.com/key')
+ t.case(region='us-east-1', bucket='bucket', key='key',
+ signature_version='s3',
+ expected_url='https://bucket.s3.amazonaws.com/key')
+ t.case(region='us-west-1', bucket='bucket', key='key',
+ signature_version='s3',
+ expected_url='https://bucket.s3.us-west-1.amazonaws.com/key')
+ t.case(region='us-west-1', bucket='bucket', key='key',
+ signature_version='s3', is_secure=False,
+ expected_url='http://bucket.s3.us-west-1.amazonaws.com/key')
# Virtual host addressing is independent of signature version.
- yield t.case(region='us-west-2', bucket='bucket', key='key',
- signature_version='s3v4',
- expected_url=(
- 'https://bucket.s3.us-west-2.amazonaws.com/key'))
- yield t.case(region='us-east-1', bucket='bucket', key='key',
- signature_version='s3v4',
- expected_url='https://bucket.s3.amazonaws.com/key')
- yield t.case(region='us-west-1', bucket='bucket', key='key',
- signature_version='s3v4',
- expected_url=(
- 'https://bucket.s3.us-west-1.amazonaws.com/key'))
- yield t.case(region='us-west-1', bucket='bucket', key='key',
- signature_version='s3v4', is_secure=False,
- expected_url=(
- 'http://bucket.s3.us-west-1.amazonaws.com/key'))
- yield t.case(
+ t.case(region='us-west-2', bucket='bucket', key='key',
+ signature_version='s3v4',
+ expected_url=(
+ 'https://bucket.s3.us-west-2.amazonaws.com/key'))
+ t.case(region='us-east-1', bucket='bucket', key='key',
+ signature_version='s3v4',
+ expected_url='https://bucket.s3.amazonaws.com/key')
+ t.case(region='us-west-1', bucket='bucket', key='key',
+ signature_version='s3v4',
+ expected_url=(
+ 'https://bucket.s3.us-west-1.amazonaws.com/key'))
+ t.case(region='us-west-1', bucket='bucket', key='key',
+ signature_version='s3v4', is_secure=False,
+ expected_url=(
+ 'http://bucket.s3.us-west-1.amazonaws.com/key'))
+ t.case(
region='us-west-1', bucket='bucket-with-num-1', key='key',
signature_version='s3v4', is_secure=False,
expected_url='http://bucket-with-num-1.s3.us-west-1.amazonaws.com/key')
@@ -1296,189 +1296,188 @@ def test_correct_url_used_for_s3():
# Regions outside of the 'aws' partition.
# These should still default to virtual hosted addressing
# unless explicitly configured otherwise.
- yield t.case(region='cn-north-1', bucket='bucket', key='key',
- signature_version='s3v4',
- expected_url=(
- 'https://bucket.s3.cn-north-1.amazonaws.com.cn/key'))
+ t.case(region='cn-north-1', bucket='bucket', key='key',
+ signature_version='s3v4',
+ expected_url=(
+ 'https://bucket.s3.cn-north-1.amazonaws.com.cn/key'))
# This isn't actually supported because cn-north-1 is sigv4 only,
# but we'll still double check that our internal logic is correct
# when building the expected url.
- yield t.case(region='cn-north-1', bucket='bucket', key='key',
- signature_version='s3',
- expected_url=(
- 'https://bucket.s3.cn-north-1.amazonaws.com.cn/key'))
+ t.case(region='cn-north-1', bucket='bucket', key='key',
+ signature_version='s3',
+ expected_url=(
+ 'https://bucket.s3.cn-north-1.amazonaws.com.cn/key'))
# If the request is unsigned, we should have the default
# fix_s3_host behavior which is to use virtual hosting where
# possible but fall back to path style when needed.
- yield t.case(region='cn-north-1', bucket='bucket', key='key',
- signature_version=UNSIGNED,
- expected_url=(
- 'https://bucket.s3.cn-north-1.amazonaws.com.cn/key'))
- yield t.case(region='cn-north-1', bucket='bucket.dot', key='key',
- signature_version=UNSIGNED,
- expected_url=(
- 'https://s3.cn-north-1.amazonaws.com.cn/bucket.dot/key'))
+ t.case(region='cn-north-1', bucket='bucket', key='key',
+ signature_version=UNSIGNED,
+ expected_url=(
+ 'https://bucket.s3.cn-north-1.amazonaws.com.cn/key'))
+ t.case(region='cn-north-1', bucket='bucket.dot', key='key',
+ signature_version=UNSIGNED,
+ expected_url=(
+ 'https://s3.cn-north-1.amazonaws.com.cn/bucket.dot/key'))
# And of course you can explicitly specify which style to use.
virtual_hosting = {'addressing_style': 'virtual'}
- yield t.case(region='cn-north-1', bucket='bucket', key='key',
- signature_version=UNSIGNED,
- s3_config=virtual_hosting,
- expected_url=(
- 'https://bucket.s3.cn-north-1.amazonaws.com.cn/key'))
+ t.case(region='cn-north-1', bucket='bucket', key='key',
+ signature_version=UNSIGNED,
+ s3_config=virtual_hosting,
+ expected_url=(
+ 'https://bucket.s3.cn-north-1.amazonaws.com.cn/key'))
path_style = {'addressing_style': 'path'}
- yield t.case(region='cn-north-1', bucket='bucket', key='key',
- signature_version=UNSIGNED,
- s3_config=path_style,
- expected_url=(
- 'https://s3.cn-north-1.amazonaws.com.cn/bucket/key'))
+ t.case(region='cn-north-1', bucket='bucket', key='key',
+ signature_version=UNSIGNED,
+ s3_config=path_style,
+ expected_url=(
+ 'https://s3.cn-north-1.amazonaws.com.cn/bucket/key'))
# If you don't have a DNS compatible bucket, we use path style.
- yield t.case(
+ t.case(
region='us-west-2', bucket='bucket.dot', key='key',
expected_url='https://s3.us-west-2.amazonaws.com/bucket.dot/key')
- yield t.case(
+ t.case(
region='us-east-1', bucket='bucket.dot', key='key',
expected_url='https://s3.amazonaws.com/bucket.dot/key')
- yield t.case(
+ t.case(
region='us-east-1', bucket='BucketName', key='key',
expected_url='https://s3.amazonaws.com/BucketName/key')
- yield t.case(
+ t.case(
region='us-west-1', bucket='bucket_name', key='key',
expected_url='https://s3.us-west-1.amazonaws.com/bucket_name/key')
- yield t.case(
+ t.case(
region='us-west-1', bucket='-bucket-name', key='key',
expected_url='https://s3.us-west-1.amazonaws.com/-bucket-name/key')
- yield t.case(
+ t.case(
region='us-west-1', bucket='bucket-name-', key='key',
expected_url='https://s3.us-west-1.amazonaws.com/bucket-name-/key')
- yield t.case(
+ t.case(
region='us-west-1', bucket='aa', key='key',
expected_url='https://s3.us-west-1.amazonaws.com/aa/key')
- yield t.case(
+ t.case(
region='us-west-1', bucket='a'*64, key='key',
expected_url=('https://s3.us-west-1.amazonaws.com/%s/key' % ('a' * 64))
)
# Custom endpoint url should always be used.
- yield t.case(
+ t.case(
customer_provided_endpoint='https://my-custom-s3/',
bucket='foo', key='bar',
expected_url='https://my-custom-s3/foo/bar')
- yield t.case(
+ t.case(
customer_provided_endpoint='https://my-custom-s3/',
bucket='bucket.dots', key='bar',
expected_url='https://my-custom-s3/bucket.dots/bar')
# Doesn't matter what region you specify, a custom endpoint url always
# wins.
- yield t.case(
+ t.case(
customer_provided_endpoint='https://my-custom-s3/',
region='us-west-2', bucket='foo', key='bar',
expected_url='https://my-custom-s3/foo/bar')
# Explicitly configuring "virtual" addressing_style.
virtual_hosting = {'addressing_style': 'virtual'}
- yield t.case(
+ t.case(
region='us-east-1', bucket='bucket', key='key',
s3_config=virtual_hosting,
expected_url='https://bucket.s3.amazonaws.com/key')
- yield t.case(
+ t.case(
region='us-west-2', bucket='bucket', key='key',
s3_config=virtual_hosting,
expected_url='https://bucket.s3.us-west-2.amazonaws.com/key')
- yield t.case(
+ t.case(
region='eu-central-1', bucket='bucket', key='key',
s3_config=virtual_hosting,
expected_url='https://bucket.s3.eu-central-1.amazonaws.com/key')
- yield t.case(
+ t.case(
region='us-east-1', bucket='bucket', key='key',
s3_config=virtual_hosting,
customer_provided_endpoint='https://foo.amazonaws.com',
expected_url='https://bucket.foo.amazonaws.com/key')
- yield t.case(
+ t.case(
region='unknown', bucket='bucket', key='key',
s3_config=virtual_hosting,
expected_url='https://bucket.s3.unknown.amazonaws.com/key')
# Test us-gov with virtual addressing.
- yield t.case(
+ t.case(
region='us-gov-west-1', bucket='bucket', key='key',
s3_config=virtual_hosting,
expected_url='https://bucket.s3.us-gov-west-1.amazonaws.com/key')
- yield t.case(
+ t.case(
region='us-gov-west-1', bucket='bucket', key='key',
signature_version='s3',
expected_url='https://bucket.s3.us-gov-west-1.amazonaws.com/key')
- yield t.case(
+ t.case(
region='fips-us-gov-west-1', bucket='bucket', key='key',
signature_version='s3',
expected_url='https://bucket.s3-fips-us-gov-west-1.amazonaws.com/key')
-
# Test path style addressing.
path_style = {'addressing_style': 'path'}
- yield t.case(
+ t.case(
region='us-east-1', bucket='bucket', key='key',
s3_config=path_style,
expected_url='https://s3.amazonaws.com/bucket/key')
- yield t.case(
+ t.case(
region='us-east-1', bucket='bucket', key='key',
s3_config=path_style,
customer_provided_endpoint='https://foo.amazonaws.com/',
expected_url='https://foo.amazonaws.com/bucket/key')
- yield t.case(
+ t.case(
region='unknown', bucket='bucket', key='key',
s3_config=path_style,
expected_url='https://s3.unknown.amazonaws.com/bucket/key')
# S3 accelerate
use_accelerate = {'use_accelerate_endpoint': True}
- yield t.case(
+ t.case(
region='us-east-1', bucket='bucket', key='key',
s3_config=use_accelerate,
expected_url='https://bucket.s3-accelerate.amazonaws.com/key')
- yield t.case(
+ t.case(
# region is ignored with S3 accelerate.
region='us-west-2', bucket='bucket', key='key',
s3_config=use_accelerate,
expected_url='https://bucket.s3-accelerate.amazonaws.com/key')
# Provided endpoints still get recognized as accelerate endpoints.
- yield t.case(
+ t.case(
region='us-east-1', bucket='bucket', key='key',
customer_provided_endpoint='https://s3-accelerate.amazonaws.com',
expected_url='https://bucket.s3-accelerate.amazonaws.com/key')
- yield t.case(
+ t.case(
region='us-east-1', bucket='bucket', key='key',
customer_provided_endpoint='http://s3-accelerate.amazonaws.com',
expected_url='http://bucket.s3-accelerate.amazonaws.com/key')
- yield t.case(
+ t.case(
region='us-east-1', bucket='bucket', key='key',
s3_config=use_accelerate, is_secure=False,
# Note we're using http:// because is_secure=False.
expected_url='http://bucket.s3-accelerate.amazonaws.com/key')
- yield t.case(
+ t.case(
region='us-east-1', bucket='bucket', key='key',
# s3-accelerate must be the first part of the url.
customer_provided_endpoint='https://foo.s3-accelerate.amazonaws.com',
expected_url='https://foo.s3-accelerate.amazonaws.com/bucket/key')
- yield t.case(
+ t.case(
region='us-east-1', bucket='bucket', key='key',
# The endpoint must be an Amazon endpoint.
customer_provided_endpoint='https://s3-accelerate.notamazon.com',
expected_url='https://s3-accelerate.notamazon.com/bucket/key')
- yield t.case(
+ t.case(
region='us-east-1', bucket='bucket', key='key',
# Extra components must be whitelisted.
customer_provided_endpoint='https://s3-accelerate.foo.amazonaws.com',
expected_url='https://s3-accelerate.foo.amazonaws.com/bucket/key')
- yield t.case(
+ t.case(
region='unknown', bucket='bucket', key='key',
s3_config=use_accelerate,
expected_url='https://bucket.s3-accelerate.amazonaws.com/key')
# Use virtual even if path is specified for s3 accelerate because
# path style will not work with S3 accelerate.
- yield t.case(
+ t.case(
region='us-east-1', bucket='bucket', key='key',
s3_config={'use_accelerate_endpoint': True,
'addressing_style': 'path'},
@@ -1486,17 +1485,17 @@ def test_correct_url_used_for_s3():
# S3 dual stack endpoints.
use_dualstack = {'use_dualstack_endpoint': True}
- yield t.case(
+ t.case(
region='us-east-1', bucket='bucket', key='key',
s3_config=use_dualstack, signature_version='s3',
# Still default to virtual hosted when possible on sigv2.
expected_url='https://bucket.s3.dualstack.us-east-1.amazonaws.com/key')
- yield t.case(
+ t.case(
region=None, bucket='bucket', key='key',
s3_config=use_dualstack,
# Uses us-east-1 for no region set.
expected_url='https://bucket.s3.dualstack.us-east-1.amazonaws.com/key')
- yield t.case(
+ t.case(
region='aws-global', bucket='bucket', key='key',
s3_config=use_dualstack,
# Pseudo-regions should not have any special resolving logic even when
@@ -1505,32 +1504,32 @@ def test_correct_url_used_for_s3():
# region name.
expected_url=(
'https://bucket.s3.dualstack.aws-global.amazonaws.com/key'))
- yield t.case(
+ t.case(
region='us-west-2', bucket='bucket', key='key',
s3_config=use_dualstack, signature_version='s3',
# Still default to virtual hosted when possible on sigv2.
expected_url='https://bucket.s3.dualstack.us-west-2.amazonaws.com/key')
- yield t.case(
+ t.case(
region='us-east-1', bucket='bucket', key='key',
s3_config=use_dualstack, signature_version='s3v4',
expected_url='https://bucket.s3.dualstack.us-east-1.amazonaws.com/key')
- yield t.case(
+ t.case(
region='us-west-2', bucket='bucket', key='key',
s3_config=use_dualstack, signature_version='s3v4',
expected_url='https://bucket.s3.dualstack.us-west-2.amazonaws.com/key')
- yield t.case(
+ t.case(
region='unknown', bucket='bucket', key='key',
s3_config=use_dualstack, signature_version='s3v4',
expected_url='https://bucket.s3.dualstack.unknown.amazonaws.com/key')
# Non DNS compatible buckets use path style for dual stack.
- yield t.case(
+ t.case(
region='us-west-2', bucket='bucket.dot', key='key',
s3_config=use_dualstack,
# Still default to virtual hosted when possible.
expected_url=(
'https://s3.dualstack.us-west-2.amazonaws.com/bucket.dot/key'))
# Supports is_secure (use_ssl=False in create_client()).
- yield t.case(
+ t.case(
region='us-west-2', bucket='bucket.dot', key='key', is_secure=False,
s3_config=use_dualstack,
# Still default to virtual hosted when possible.
@@ -1543,7 +1542,7 @@ def test_correct_url_used_for_s3():
'use_dualstack_endpoint': True,
'addressing_style': 'path',
}
- yield t.case(
+ t.case(
region='us-west-2', bucket='bucket', key='key',
s3_config=force_path_style,
# Still default to virtual hosted when possible.
@@ -1554,32 +1553,32 @@ def test_correct_url_used_for_s3():
'use_accelerate_endpoint': True,
'use_dualstack_endpoint': True,
}
- yield t.case(
+ t.case(
region='us-east-1', bucket='bucket', key='key',
s3_config=use_accelerate_dualstack,
expected_url=(
'https://bucket.s3-accelerate.dualstack.amazonaws.com/key'))
- yield t.case(
+ t.case(
# Region is ignored with S3 accelerate.
region='us-west-2', bucket='bucket', key='key',
s3_config=use_accelerate_dualstack,
expected_url=(
'https://bucket.s3-accelerate.dualstack.amazonaws.com/key'))
# Only s3-accelerate overrides a customer endpoint.
- yield t.case(
+ t.case(
region='us-east-1', bucket='bucket', key='key',
s3_config=use_dualstack,
customer_provided_endpoint='https://s3-accelerate.amazonaws.com',
expected_url=(
'https://bucket.s3-accelerate.amazonaws.com/key'))
- yield t.case(
+ t.case(
region='us-east-1', bucket='bucket', key='key',
# Dualstack is whitelisted.
customer_provided_endpoint=(
'https://s3-accelerate.dualstack.amazonaws.com'),
expected_url=(
'https://bucket.s3-accelerate.dualstack.amazonaws.com/key'))
- yield t.case(
+ t.case(
region='us-east-1', bucket='bucket', key='key',
# Even whitelisted parts cannot be duplicated.
customer_provided_endpoint=(
@@ -1587,7 +1586,7 @@ def test_correct_url_used_for_s3():
expected_url=(
'https://s3-accelerate.dualstack.dualstack'
'.amazonaws.com/bucket/key'))
- yield t.case(
+ t.case(
region='us-east-1', bucket='bucket', key='key',
# More than two extra parts is not allowed.
customer_provided_endpoint=(
@@ -1596,12 +1595,12 @@ def test_correct_url_used_for_s3():
expected_url=(
'https://s3-accelerate.dualstack.dualstack.dualstack.amazonaws.com'
'/bucket/key'))
- yield t.case(
+ t.case(
region='us-east-1', bucket='bucket', key='key',
# Extra components must be whitelisted.
customer_provided_endpoint='https://s3-accelerate.foo.amazonaws.com',
expected_url='https://s3-accelerate.foo.amazonaws.com/bucket/key')
- yield t.case(
+ t.case(
region='us-east-1', bucket='bucket', key='key',
s3_config=use_accelerate_dualstack, is_secure=False,
# Note we're using http:// because is_secure=False.
@@ -1610,7 +1609,7 @@ def test_correct_url_used_for_s3():
# Use virtual even if path is specified for s3 accelerate because
# path style will not work with S3 accelerate.
use_accelerate_dualstack['addressing_style'] = 'path'
- yield t.case(
+ t.case(
region='us-east-1', bucket='bucket', key='key',
s3_config=use_accelerate_dualstack,
expected_url=(
@@ -1620,14 +1619,14 @@ def test_correct_url_used_for_s3():
accesspoint_arn = (
'arn:aws:s3:us-west-2:123456789012:accesspoint:myendpoint'
)
- yield t.case(
+ t.case(
region='us-west-2', bucket=accesspoint_arn, key='key',
expected_url=(
'https://myendpoint-123456789012.s3-accesspoint.'
'us-west-2.amazonaws.com/key'
)
)
- yield t.case(
+ t.case(
region='us-west-2', bucket=accesspoint_arn, key='key',
s3_config={'use_arn_region': True},
expected_url=(
@@ -1635,21 +1634,21 @@ def test_correct_url_used_for_s3():
'us-west-2.amazonaws.com/key'
)
)
- yield t.case(
+ t.case(
region='us-west-2', bucket=accesspoint_arn, key='myendpoint/key',
expected_url=(
'https://myendpoint-123456789012.s3-accesspoint.'
'us-west-2.amazonaws.com/myendpoint/key'
)
)
- yield t.case(
+ t.case(
region='us-west-2', bucket=accesspoint_arn, key='foo/myendpoint/key',
expected_url=(
'https://myendpoint-123456789012.s3-accesspoint.'
'us-west-2.amazonaws.com/foo/myendpoint/key'
)
)
- yield t.case(
+ t.case(
# Note: The access-point arn has us-west-2 and the client's region is
# us-east-1, for the default case the access-point arn region is used.
region='us-east-1', bucket=accesspoint_arn, key='key',
@@ -1658,7 +1657,7 @@ def test_correct_url_used_for_s3():
'us-west-2.amazonaws.com/key'
)
)
- yield t.case(
+ t.case(
region='us-east-1', bucket=accesspoint_arn, key='key',
s3_config={'use_arn_region': False},
expected_url=(
@@ -1666,14 +1665,14 @@ def test_correct_url_used_for_s3():
'us-east-1.amazonaws.com/key'
)
)
- yield t.case(
+ t.case(
region='s3-external-1', bucket=accesspoint_arn, key='key',
expected_url=(
'https://myendpoint-123456789012.s3-accesspoint.'
'us-west-2.amazonaws.com/key'
)
)
- yield t.case(
+ t.case(
region='s3-external-1', bucket=accesspoint_arn, key='key',
s3_config={'use_arn_region': False},
expected_url=(
@@ -1681,14 +1680,14 @@ def test_correct_url_used_for_s3():
's3-external-1.amazonaws.com/key'
)
)
- yield t.case(
+ t.case(
region='aws-global', bucket=accesspoint_arn, key='key',
expected_url=(
'https://myendpoint-123456789012.s3-accesspoint.'
'us-west-2.amazonaws.com/key'
)
)
- yield t.case(
+ t.case(
region='aws-global', bucket=accesspoint_arn, key='key',
s3_config={'use_arn_region': False},
expected_url=(
@@ -1696,7 +1695,7 @@ def test_correct_url_used_for_s3():
'aws-global.amazonaws.com/key'
)
)
- yield t.case(
+ t.case(
region='unknown', bucket=accesspoint_arn, key='key',
s3_config={'use_arn_region': False},
expected_url=(
@@ -1704,7 +1703,7 @@ def test_correct_url_used_for_s3():
'unknown.amazonaws.com/key'
)
)
- yield t.case(
+ t.case(
region='unknown', bucket=accesspoint_arn, key='key',
s3_config={'use_arn_region': True},
expected_url=(
@@ -1715,21 +1714,21 @@ def test_correct_url_used_for_s3():
accesspoint_arn_cn = (
'arn:aws-cn:s3:cn-north-1:123456789012:accesspoint:myendpoint'
)
- yield t.case(
+ t.case(
region='cn-north-1', bucket=accesspoint_arn_cn, key='key',
expected_url=(
'https://myendpoint-123456789012.s3-accesspoint.'
'cn-north-1.amazonaws.com.cn/key'
)
)
- yield t.case(
+ t.case(
region='cn-northwest-1', bucket=accesspoint_arn_cn, key='key',
expected_url=(
'https://myendpoint-123456789012.s3-accesspoint.'
'cn-north-1.amazonaws.com.cn/key'
)
)
- yield t.case(
+ t.case(
region='cn-northwest-1', bucket=accesspoint_arn_cn, key='key',
s3_config={'use_arn_region': False},
expected_url=(
@@ -1740,21 +1739,21 @@ def test_correct_url_used_for_s3():
accesspoint_arn_gov = (
'arn:aws-us-gov:s3:us-gov-east-1:123456789012:accesspoint:myendpoint'
)
- yield t.case(
+ t.case(
region='us-gov-east-1', bucket=accesspoint_arn_gov, key='key',
expected_url=(
'https://myendpoint-123456789012.s3-accesspoint.'
'us-gov-east-1.amazonaws.com/key'
)
)
- yield t.case(
+ t.case(
region='fips-us-gov-west-1', bucket=accesspoint_arn_gov, key='key',
expected_url=(
'https://myendpoint-123456789012.s3-accesspoint.'
'us-gov-east-1.amazonaws.com/key'
)
)
- yield t.case(
+ t.case(
region='fips-us-gov-west-1', bucket=accesspoint_arn_gov, key='key',
s3_config={'use_arn_region': False},
expected_url=(
@@ -1763,7 +1762,7 @@ def test_correct_url_used_for_s3():
)
)
- yield t.case(
+ t.case(
region='us-west-2', bucket=accesspoint_arn, key='key', is_secure=False,
expected_url=(
'http://myendpoint-123456789012.s3-accesspoint.'
@@ -1771,7 +1770,7 @@ def test_correct_url_used_for_s3():
)
)
# Dual-stack with access-point arn
- yield t.case(
+ t.case(
# Note: The access-point arn has us-west-2 and the client's region is
# us-east-1, for the default case the access-point arn region is used.
region='us-east-1', bucket=accesspoint_arn, key='key',
@@ -1783,7 +1782,7 @@ def test_correct_url_used_for_s3():
'us-west-2.amazonaws.com/key'
)
)
- yield t.case(
+ t.case(
region='us-east-1', bucket=accesspoint_arn, key='key',
s3_config={
'use_dualstack_endpoint': True,
@@ -1794,7 +1793,7 @@ def test_correct_url_used_for_s3():
'us-east-1.amazonaws.com/key'
)
)
- yield t.case(
+ t.case(
region='us-gov-east-1', bucket=accesspoint_arn_gov, key='key',
s3_config={
'use_dualstack_endpoint': True,
@@ -1807,7 +1806,7 @@ def test_correct_url_used_for_s3():
# None of the various s3 settings related to paths should affect what
# endpoint to use when an access-point is provided.
- yield t.case(
+ t.case(
region='us-west-2', bucket=accesspoint_arn, key='key',
s3_config={'adressing_style': 'auto'},
expected_url=(
@@ -1815,7 +1814,7 @@ def test_correct_url_used_for_s3():
'us-west-2.amazonaws.com/key'
)
)
- yield t.case(
+ t.case(
region='us-west-2', bucket=accesspoint_arn, key='key',
s3_config={'adressing_style': 'virtual'},
expected_url=(
@@ -1823,7 +1822,7 @@ def test_correct_url_used_for_s3():
'us-west-2.amazonaws.com/key'
)
)
- yield t.case(
+ t.case(
region='us-west-2', bucket=accesspoint_arn, key='key',
s3_config={'adressing_style': 'path'},
expected_url=(
@@ -1836,27 +1835,27 @@ def test_correct_url_used_for_s3():
us_east_1_regional_endpoint = {
'us_east_1_regional_endpoint': 'regional'
}
- yield t.case(
+ t.case(
region='us-east-1', bucket='bucket', key='key',
s3_config=us_east_1_regional_endpoint,
expected_url=(
'https://bucket.s3.us-east-1.amazonaws.com/key'))
- yield t.case(
+ t.case(
region='us-west-2', bucket='bucket', key='key',
s3_config=us_east_1_regional_endpoint,
expected_url=(
'https://bucket.s3.us-west-2.amazonaws.com/key'))
- yield t.case(
+ t.case(
region=None, bucket='bucket', key='key',
s3_config=us_east_1_regional_endpoint,
expected_url=(
'https://bucket.s3.amazonaws.com/key'))
- yield t.case(
+ t.case(
region='unknown', bucket='bucket', key='key',
s3_config=us_east_1_regional_endpoint,
expected_url=(
'https://bucket.s3.unknown.amazonaws.com/key'))
- yield t.case(
+ t.case(
region='us-east-1', bucket='bucket', key='key',
s3_config={
'us_east_1_regional_endpoint': 'regional',
@@ -1864,7 +1863,7 @@ def test_correct_url_used_for_s3():
},
expected_url=(
'https://bucket.s3.dualstack.us-east-1.amazonaws.com/key'))
- yield t.case(
+ t.case(
region='us-east-1', bucket='bucket', key='key',
s3_config={
'us_east_1_regional_endpoint': 'regional',
@@ -1872,7 +1871,7 @@ def test_correct_url_used_for_s3():
},
expected_url=(
'https://bucket.s3-accelerate.amazonaws.com/key'))
- yield t.case(
+ t.case(
region='us-east-1', bucket='bucket', key='key',
s3_config={
'us_east_1_regional_endpoint': 'regional',
@@ -1886,19 +1885,19 @@ def test_correct_url_used_for_s3():
us_east_1_regional_endpoint_legacy = {
'us_east_1_regional_endpoint': 'legacy'
}
- yield t.case(
+ t.case(
region='us-east-1', bucket='bucket', key='key',
s3_config=us_east_1_regional_endpoint_legacy,
expected_url=(
'https://bucket.s3.amazonaws.com/key'))
- yield t.case(
+ t.case(
region=None, bucket='bucket', key='key',
s3_config=us_east_1_regional_endpoint_legacy,
expected_url=(
'https://bucket.s3.amazonaws.com/key'))
- yield t.case(
+ t.case(
region='unknown', bucket='bucket', key='key',
s3_config=us_east_1_regional_endpoint_legacy,
expected_url=(
@@ -1950,7 +1949,7 @@ def _verify_expected_endpoint_url(region
with ClientHTTPStubber(s3) as http_stubber:
http_stubber.add_response()
s3.put_object(Bucket=bucket, Key=key, Body=b'bar')
- assert_equal(http_stubber.requests[0].url, expected_url)
+ assert http_stubber.requests[0].url == expected_url
def _create_s3_client(region, is_secure, endpoint_url, s3_config,
@@ -1983,96 +1982,96 @@ def test_addressing_for_presigned_urls()
# us-east-1, or the "global" endpoint. A signature version of
# None means the user doesn't have signature version configured.
- yield t.case(region='us-east-1', bucket='bucket', key='key',
- signature_version=None,
- expected_url='https://bucket.s3.amazonaws.com/key')
- yield t.case(region='us-east-1', bucket='bucket', key='key',
- signature_version='s3',
- expected_url='https://bucket.s3.amazonaws.com/key')
- yield t.case(region='us-east-1', bucket='bucket', key='key',
- signature_version='s3v4',
- expected_url='https://bucket.s3.amazonaws.com/key')
- yield t.case(region='us-east-1', bucket='bucket', key='key',
- signature_version='s3v4',
- s3_config={'addressing_style': 'path'},
- expected_url='https://s3.amazonaws.com/bucket/key')
+ t.case(region='us-east-1', bucket='bucket', key='key',
+ signature_version=None,
+ expected_url='https://bucket.s3.amazonaws.com/key')
+ t.case(region='us-east-1', bucket='bucket', key='key',
+ signature_version='s3',
+ expected_url='https://bucket.s3.amazonaws.com/key')
+ t.case(region='us-east-1', bucket='bucket', key='key',
+ signature_version='s3v4',
+ expected_url='https://bucket.s3.amazonaws.com/key')
+ t.case(region='us-east-1', bucket='bucket', key='key',
+ signature_version='s3v4',
+ s3_config={'addressing_style': 'path'},
+ expected_url='https://s3.amazonaws.com/bucket/key')
# A region that supports both 's3' and 's3v4'.
- yield t.case(region='us-west-2', bucket='bucket', key='key',
- signature_version=None,
- expected_url='https://bucket.s3.amazonaws.com/key')
- yield t.case(region='us-west-2', bucket='bucket', key='key',
- signature_version='s3',
- expected_url='https://bucket.s3.amazonaws.com/key')
- yield t.case(region='us-west-2', bucket='bucket', key='key',
- signature_version='s3v4',
- expected_url='https://bucket.s3.amazonaws.com/key')
- yield t.case(region='us-west-2', bucket='bucket', key='key',
- signature_version='s3v4',
- s3_config={'addressing_style': 'path'},
- expected_url='https://s3.us-west-2.amazonaws.com/bucket/key')
+ t.case(region='us-west-2', bucket='bucket', key='key',
+ signature_version=None,
+ expected_url='https://bucket.s3.amazonaws.com/key')
+ t.case(region='us-west-2', bucket='bucket', key='key',
+ signature_version='s3',
+ expected_url='https://bucket.s3.amazonaws.com/key')
+ t.case(region='us-west-2', bucket='bucket', key='key',
+ signature_version='s3v4',
+ expected_url='https://bucket.s3.amazonaws.com/key')
+ t.case(region='us-west-2', bucket='bucket', key='key',
+ signature_version='s3v4',
+ s3_config={'addressing_style': 'path'},
+ expected_url='https://s3.us-west-2.amazonaws.com/bucket/key')
# An 's3v4' only region.
- yield t.case(region='us-east-2', bucket='bucket', key='key',
- signature_version=None,
- expected_url='https://bucket.s3.amazonaws.com/key')
- yield t.case(region='us-east-2', bucket='bucket', key='key',
- signature_version='s3',
- expected_url='https://bucket.s3.amazonaws.com/key')
- yield t.case(region='us-east-2', bucket='bucket', key='key',
- signature_version='s3v4',
- expected_url='https://bucket.s3.amazonaws.com/key')
- yield t.case(region='us-east-2', bucket='bucket', key='key',
- signature_version='s3v4',
- s3_config={'addressing_style': 'path'},
- expected_url='https://s3.us-east-2.amazonaws.com/bucket/key')
+ t.case(region='us-east-2', bucket='bucket', key='key',
+ signature_version=None,
+ expected_url='https://bucket.s3.amazonaws.com/key')
+ t.case(region='us-east-2', bucket='bucket', key='key',
+ signature_version='s3',
+ expected_url='https://bucket.s3.amazonaws.com/key')
+ t.case(region='us-east-2', bucket='bucket', key='key',
+ signature_version='s3v4',
+ expected_url='https://bucket.s3.amazonaws.com/key')
+ t.case(region='us-east-2', bucket='bucket', key='key',
+ signature_version='s3v4',
+ s3_config={'addressing_style': 'path'},
+ expected_url='https://s3.us-east-2.amazonaws.com/bucket/key')
# Dualstack endpoints
- yield t.case(
+ t.case(
region='us-west-2', bucket='bucket', key='key',
signature_version=None,
s3_config={'use_dualstack_endpoint': True},
expected_url='https://bucket.s3.dualstack.us-west-2.amazonaws.com/key')
- yield t.case(
+ t.case(
region='us-west-2', bucket='bucket', key='key',
signature_version='s3',
s3_config={'use_dualstack_endpoint': True},
expected_url='https://bucket.s3.dualstack.us-west-2.amazonaws.com/key')
- yield t.case(
+ t.case(
region='us-west-2', bucket='bucket', key='key',
signature_version='s3v4',
s3_config={'use_dualstack_endpoint': True},
expected_url='https://bucket.s3.dualstack.us-west-2.amazonaws.com/key')
# Accelerate
- yield t.case(region='us-west-2', bucket='bucket', key='key',
- signature_version=None,
- s3_config={'use_accelerate_endpoint': True},
- expected_url='https://bucket.s3-accelerate.amazonaws.com/key')
+ t.case(region='us-west-2', bucket='bucket', key='key',
+ signature_version=None,
+ s3_config={'use_accelerate_endpoint': True},
+ expected_url='https://bucket.s3-accelerate.amazonaws.com/key')
# A region that we don't know about.
- yield t.case(region='us-west-50', bucket='bucket', key='key',
- signature_version=None,
- expected_url='https://bucket.s3.amazonaws.com/key')
+ t.case(region='us-west-50', bucket='bucket', key='key',
+ signature_version=None,
+ expected_url='https://bucket.s3.amazonaws.com/key')
# Customer provided URL results in us leaving the host untouched.
- yield t.case(region='us-west-2', bucket='bucket', key='key',
- signature_version=None,
- customer_provided_endpoint='https://foo.com/',
- expected_url='https://foo.com/bucket/key')
+ t.case(region='us-west-2', bucket='bucket', key='key',
+ signature_version=None,
+ customer_provided_endpoint='https://foo.com/',
+ expected_url='https://foo.com/bucket/key')
# Access-point
accesspoint_arn = (
'arn:aws:s3:us-west-2:123456789012:accesspoint:myendpoint'
)
- yield t.case(
+ t.case(
region='us-west-2', bucket=accesspoint_arn, key='key',
expected_url=(
'https://myendpoint-123456789012.s3-accesspoint.'
'us-west-2.amazonaws.com/key'
)
)
- yield t.case(
+ t.case(
region='us-east-1', bucket=accesspoint_arn, key='key',
s3_config={'use_arn_region': False},
expected_url=(
@@ -2085,12 +2084,12 @@ def test_addressing_for_presigned_urls()
us_east_1_regional_endpoint = {
'us_east_1_regional_endpoint': 'regional'
}
- yield t.case(
+ t.case(
region='us-east-1', bucket='bucket', key='key',
s3_config=us_east_1_regional_endpoint, signature_version='s3',
expected_url=(
'https://bucket.s3.us-east-1.amazonaws.com/key'))
- yield t.case(
+ t.case(
region='us-east-1', bucket='bucket', key='key',
s3_config=us_east_1_regional_endpoint, signature_version='s3v4',
expected_url=(
@@ -2112,4 +2111,4 @@ def _verify_presigned_url_addressing(reg
# those are tested elsewhere. We just care about the hostname/path.
parts = urlsplit(url)
actual = '%s://%s%s' % parts[:3]
- assert_equal(actual, expected_url)
+ assert actual == expected_url
--- a/tests/functional/test_service_names.py
+++ b/tests/functional/test_service_names.py
@@ -12,7 +12,6 @@
# language governing permissions and limitations under the License.
import re
-from nose.tools import assert_true
from botocore.session import get_session
BLACKLIST = [
@@ -41,18 +40,18 @@ MAX_SERVICE_NAME_LENGTH = 50
def _assert_name_length(service_name):
if service_name not in BLACKLIST:
service_name_length = len(service_name)
- assert_true(service_name_length >= MIN_SERVICE_NAME_LENGTH,
- 'Service name must be greater than or equal to 2 '
- 'characters in length.')
- assert_true(service_name_length <= MAX_SERVICE_NAME_LENGTH,
- 'Service name must be less than or equal to 50 '
- 'characters in length.')
+ assert service_name_length >= MIN_SERVICE_NAME_LENGTH, \
+ ('Service name must be greater than or equal to {:d} ' +
+ 'characters in length.').format(MIN_SERVICE_NAME_LENGTH)
+ assert service_name_length <= MAX_SERVICE_NAME_LENGTH, \
+ ('Service name must be less than or equal to {:d} ' +
+ 'characters in length.').format(MAX_SERVICE_NAME_LENGTH)
def _assert_name_pattern(service_name):
if service_name not in BLACKLIST:
- valid = VALID_NAME_REGEX.match(service_name) is not None
- assert_true(valid, VALID_NAME_EXPLANATION)
+ assert VALID_NAME_REGEX.match(service_name) is not None, \
+ VALID_NAME_EXPLANATION
def test_service_names_are_valid():
@@ -60,5 +59,5 @@ def test_service_names_are_valid():
loader = session.get_component('data_loader')
service_names = loader.list_available_services('service-2')
for service_name in service_names:
- yield _assert_name_length, service_name
- yield _assert_name_pattern, service_name
+ _assert_name_length(service_name)
+ _assert_name_pattern(service_name)
--- a/tests/integration/test_ec2.py
+++ b/tests/integration/test_ec2.py
@@ -13,8 +13,6 @@
from tests import unittest
import itertools
-from nose.plugins.attrib import attr
-
import botocore.session
from botocore.exceptions import ClientError
--- a/tests/integration/test_emr.py
+++ b/tests/integration/test_emr.py
@@ -12,8 +12,6 @@
# language governing permissions and limitations under the License.
from tests import unittest
-from nose.tools import assert_true
-
import botocore.session
from botocore.paginate import PageIterator
from botocore.exceptions import OperationNotPageableError
@@ -34,7 +32,7 @@ def test_emr_endpoints_work_with_py26():
def _test_can_list_clusters_in_region(session, region):
client = session.create_client('emr', region_name=region)
response = client.list_clusters()
- assert_true('Clusters' in response)
+ assert 'Clusters' in response
# I consider these integration tests because they're
--- a/tests/integration/test_s3.py
+++ b/tests/integration/test_s3.py
@@ -22,11 +22,10 @@ import tempfile
import shutil
import threading
import logging
-import mock
from tarfile import TarFile
from contextlib import closing
-from nose.plugins.attrib import attr
+import pytest
import urllib3
from botocore.endpoint import Endpoint
@@ -324,7 +323,7 @@ class TestS3Objects(TestS3BaseWithBucket
Bucket=self.bucket_name, Key=key_name)
self.assert_status_code(response, 204)
- @attr('slow')
+ @pytest.mark.slow
def test_can_paginate(self):
for i in range(5):
key_name = 'key%s' % i
@@ -340,7 +339,7 @@ class TestS3Objects(TestS3BaseWithBucket
for el in responses]
self.assertEqual(key_names, ['key0', 'key1', 'key2', 'key3', 'key4'])
- @attr('slow')
+ @pytest.mark.slow
def test_can_paginate_with_page_size(self):
for i in range(5):
key_name = 'key%s' % i
@@ -357,7 +356,7 @@ class TestS3Objects(TestS3BaseWithBucket
for el in data]
self.assertEqual(key_names, ['key0', 'key1', 'key2', 'key3', 'key4'])
- @attr('slow')
+ @pytest.mark.slow
def test_result_key_iters(self):
for i in range(5):
key_name = 'key/%s/%s' % (i, i)
@@ -380,7 +379,7 @@ class TestS3Objects(TestS3BaseWithBucket
self.assertIn('Contents', response)
self.assertIn('CommonPrefixes', response)
- @attr('slow')
+ @pytest.mark.slow
def test_can_get_and_put_object(self):
self.create_object('foobarbaz', body='body contents')
time.sleep(3)
@@ -930,7 +929,7 @@ class TestS3SigV4Client(BaseS3ClientTest
Key='foo.txt', Body=body)
self.assert_status_code(response, 200)
- @attr('slow')
+ @pytest.mark.slow
def test_paginate_list_objects_unicode(self):
key_names = [
u'non-ascii-key-\xe4\xf6\xfc-01.txt',
@@ -953,7 +952,7 @@ class TestS3SigV4Client(BaseS3ClientTest
self.assertEqual(key_names, key_refs)
- @attr('slow')
+ @pytest.mark.slow
def test_paginate_list_objects_safe_chars(self):
key_names = [
u'-._~safe-chars-key-01.txt',
@@ -1247,7 +1246,7 @@ class TestRegionRedirect(BaseS3ClientTes
eu_bucket = self.create_bucket(self.bucket_region)
msg = 'The authorization mechanism you have provided is not supported.'
- with self.assertRaisesRegexp(ClientError, msg):
+ with six.assertRaisesRegex(self, ClientError, msg):
sigv2_client.list_objects(Bucket=eu_bucket)
def test_region_redirects_multiple_requests(self):
--- a/tests/integration/test_smoke.py
+++ b/tests/integration/test_smoke.py
@@ -11,17 +11,14 @@ to use and all the services in SMOKE_TES
"""
import os
-import mock
from pprint import pformat
import warnings
import logging
-from nose.tools import assert_equal, assert_true
from tests import ClientHTTPStubber
from botocore import xform_name
import botocore.session
from botocore.client import ClientError
-from botocore.endpoint import Endpoint
from botocore.exceptions import ConnectionClosedError
@@ -262,10 +259,9 @@ def _make_client_call(client, operation_
method = getattr(client, operation_name)
with warnings.catch_warnings(record=True) as caught_warnings:
response = method(**kwargs)
- assert_equal(len(caught_warnings), 0,
- "Warnings were emitted during smoke test: %s"
- % caught_warnings)
- assert_true('Errors' not in response)
+ assert len(caught_warnings) == 0, \
+ "Warnings were emitted during smoke test: %s" % caught_warnings
+ assert 'Errors' not in response
def test_can_make_request_and_understand_errors_with_client():
@@ -275,7 +271,7 @@ def test_can_make_request_and_understand
for operation_name in ERROR_TESTS[service_name]:
kwargs = ERROR_TESTS[service_name][operation_name]
method_name = xform_name(operation_name)
- yield _make_error_client_call, client, method_name, kwargs
+ _make_error_client_call(client, method_name, kwargs)
def _make_error_client_call(client, operation_name, kwargs):
--- a/tests/integration/test_waiters.py
+++ b/tests/integration/test_waiters.py
@@ -12,14 +12,14 @@
# language governing permissions and limitations under the License.
from tests import unittest, random_chars
-from nose.plugins.attrib import attr
+import pytest
import botocore.session
from botocore.exceptions import WaiterError
# This is the same test as above, except using the client interface.
-@attr('slow')
+@pytest.mark.slow
class TestWaiterForDynamoDB(unittest.TestCase):
def setUp(self):
self.session = botocore.session.get_session()
--- a/tests/unit/auth/test_sigv4.py
+++ b/tests/unit/auth/test_sigv4.py
@@ -18,8 +18,7 @@ AWS provides a test suite for signature
http://docs.aws.amazon.com/general/latest/gr/signature-v4-test-suite.html
This module contains logic to run these tests. The test files were
-placed in ./aws4_testsuite, and we're using nose's test generators to
-dynamically generate testcases based on these files.
+placed in ./aws4_testsuite.
"""
import os
@@ -28,7 +27,7 @@ import io
import datetime
from botocore.compat import six
-import mock
+from tests import mock
import botocore.auth
from botocore.awsrequest import AWSRequest
@@ -106,7 +105,7 @@ def test_generator():
if test_case in TESTS_TO_IGNORE:
log.debug("Skipping test: %s", test_case)
continue
- yield (_test_signature_version_4, test_case)
+ _test_signature_version_4(test_case)
datetime_patcher.stop()
formatdate_patcher.stop()
@@ -147,21 +146,22 @@ def _test_signature_version_4(test_case)
auth = botocore.auth.SigV4Auth(test_case.credentials, 'host', 'us-east-1')
actual_canonical_request = auth.canonical_request(request)
- assert_equal(actual_canonical_request, test_case.canonical_request,
- test_case.raw_request, 'canonical_request')
+ assert_requests_equal(actual_canonical_request,
+ test_case.canonical_request,
+ test_case.raw_request, 'canonical_request')
actual_string_to_sign = auth.string_to_sign(request,
actual_canonical_request)
- assert_equal(actual_string_to_sign, test_case.string_to_sign,
- test_case.raw_request, 'string_to_sign')
+ assert_requests_equal(actual_string_to_sign, test_case.string_to_sign,
+ test_case.raw_request, 'string_to_sign')
auth.add_auth(request)
actual_auth_header = request.headers['Authorization']
- assert_equal(actual_auth_header, test_case.authorization_header,
- test_case.raw_request, 'authheader')
+ assert_requests_equal(actual_auth_header, test_case.authorization_header,
+ test_case.raw_request, 'authheader')
-def assert_equal(actual, expected, raw_request, part):
+def assert_requests_equal(actual, expected, raw_request, part):
if actual != expected:
message = "The %s did not match" % part
message += "\nACTUAL:%r !=\nEXPECT:%r" % (actual, expected)
--- a/tests/unit/retries/test_special.py
+++ b/tests/unit/retries/test_special.py
@@ -1,9 +1,7 @@
from tests import unittest
-import mock
-from nose.tools import assert_equal, assert_is_instance
+from tests import mock
-from botocore.compat import six
from botocore.awsrequest import AWSResponse
from botocore.retries import standard, special
--- a/tests/unit/retries/test_standard.py
+++ b/tests/unit/retries/test_standard.py
@@ -1,7 +1,6 @@
from tests import unittest
-import mock
-from nose.tools import assert_equal, assert_is_instance
+from tests import mock
from botocore.retries import standard
from botocore.retries import quota
@@ -154,22 +153,20 @@ SERVICE_DESCRIPTION_WITH_RETRIES = {
def test_can_detect_retryable_transient_errors():
transient_checker = standard.TransientRetryableChecker()
for case in RETRYABLE_TRANSIENT_ERRORS:
- yield (_verify_retryable, transient_checker, None) + case
+ _verify_retryable(transient_checker, None, *case)
def test_can_detect_retryable_throttled_errors():
throttled_checker = standard.ThrottledRetryableChecker()
for case in RETRYABLE_THROTTLED_RESPONSES:
- yield (_verify_retryable, throttled_checker, None) + case
+ _verify_retryable(throttled_checker, None, *case)
def test_can_detect_modeled_retryable_errors():
modeled_retry_checker = standard.ModeledRetryableChecker()
- test_params = (_verify_retryable, modeled_retry_checker,
- get_operation_model_with_retries())
for case in RETRYABLE_MODELED_ERRORS:
- test_case = test_params + case
- yield test_case
+ _verify_retryable(modeled_retry_checker,
+ get_operation_model_with_retries(), *case)
def test_standard_retry_conditions():
@@ -184,9 +181,8 @@ def test_standard_retry_conditions():
# are retryable for a different checker. We need to filter out all
# the False cases.
all_cases = [c for c in all_cases if c[2]]
- test_params = (_verify_retryable, standard_checker, op_model)
for case in all_cases:
- yield test_params + case
+ _verify_retryable(standard_checker, op_model, *case)
def get_operation_model_with_retries():
@@ -213,7 +209,7 @@ def _verify_retryable(checker, operation
http_response=http_response,
caught_exception=caught_exception,
)
- assert_equal(checker.is_retryable(context), is_retryable)
+ assert checker.is_retryable(context) == is_retryable
def arbitrary_retry_context():
@@ -233,36 +229,36 @@ def test_can_honor_max_attempts():
checker = standard.MaxAttemptsChecker(max_attempts=3)
context = arbitrary_retry_context()
context.attempt_number = 1
- assert_equal(checker.is_retryable(context), True)
+ assert checker.is_retryable(context)
context.attempt_number = 2
- assert_equal(checker.is_retryable(context), True)
+ assert checker.is_retryable(context)
context.attempt_number = 3
- assert_equal(checker.is_retryable(context), False)
+ assert not checker.is_retryable(context)
def test_max_attempts_adds_metadata_key_when_reached():
checker = standard.MaxAttemptsChecker(max_attempts=3)
context = arbitrary_retry_context()
context.attempt_number = 3
- assert_equal(checker.is_retryable(context), False)
- assert_equal(context.get_retry_metadata(), {'MaxAttemptsReached': True})
+ assert not checker.is_retryable(context)
+ assert context.get_retry_metadata() == {'MaxAttemptsReached': True}
def test_can_create_default_retry_handler():
mock_client = mock.Mock()
mock_client.meta.service_model.service_id = model.ServiceId('my-service')
- assert_is_instance(standard.register_retry_handler(mock_client),
- standard.RetryHandler)
+ assert isinstance(standard.register_retry_handler(mock_client),
+ standard.RetryHandler)
call_args_list = mock_client.meta.events.register.call_args_list
# We should have registered the retry quota to after-calls
first_call = call_args_list[0][0]
second_call = call_args_list[1][0]
# Not sure if there's a way to verify the class associated with the
# bound method matches what we expect.
- assert_equal(first_call[0], 'after-call.my-service')
- assert_equal(second_call[0], 'needs-retry.my-service')
+ assert first_call[0] == 'after-call.my-service'
+ assert second_call[0] == 'needs-retry.my-service'
class TestRetryHandler(unittest.TestCase):
--- a/tests/unit/test_compat.py
+++ b/tests/unit/test_compat.py
@@ -11,9 +11,7 @@
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import datetime
-import mock
-
-from nose.tools import assert_equal, assert_raises
+from tests import mock
from botocore.exceptions import MD5UnavailableError
from botocore.compat import (
@@ -98,80 +96,76 @@ class TestGetMD5(unittest.TestCase):
get_md5()
-def test_compat_shell_split_windows():
- windows_cases = {
- r'': [],
- r'spam \\': [r'spam', '\\\\'],
- r'spam ': [r'spam'],
- r' spam': [r'spam'],
- 'spam eggs': [r'spam', r'eggs'],
- 'spam\teggs': [r'spam', r'eggs'],
- 'spam\neggs': ['spam\neggs'],
- '""': [''],
- '" "': [' '],
- '"\t"': ['\t'],
- '\\\\': ['\\\\'],
- '\\\\ ': ['\\\\'],
- '\\\\\t': ['\\\\'],
- r'\"': ['"'],
- # The following four test cases are official test cases given in
- # Microsoft's documentation.
- r'"abc" d e': [r'abc', r'd', r'e'],
- r'a\\b d"e f"g h': [r'a\\b', r'de fg', r'h'],
- r'a\\\"b c d': [r'a\"b', r'c', r'd'],
- r'a\\\\"b c" d e': [r'a\\b c', r'd', r'e']
- }
- runner = ShellSplitTestRunner()
- for input_string, expected_output in windows_cases.items():
- yield runner.assert_equal, input_string, expected_output, "win32"
-
- yield runner.assert_raises, r'"', ValueError, "win32"
-
-
-def test_compat_shell_split_unix():
- unix_cases = {
- r'': [],
- r'spam \\': [r'spam', '\\'],
- r'spam ': [r'spam'],
- r' spam': [r'spam'],
- 'spam eggs': [r'spam', r'eggs'],
- 'spam\teggs': [r'spam', r'eggs'],
- 'spam\neggs': ['spam', 'eggs'],
- '""': [''],
- '" "': [' '],
- '"\t"': ['\t'],
- '\\\\': ['\\'],
- '\\\\ ': ['\\'],
- '\\\\\t': ['\\'],
- r'\"': ['"'],
- # The following four test cases are official test cases given in
- # Microsoft's documentation, but adapted to unix shell splitting.
- r'"abc" d e': [r'abc', r'd', r'e'],
- r'a\\b d"e f"g h': [r'a\b', r'de fg', r'h'],
- r'a\\\"b c d': [r'a\"b', r'c', r'd'],
- r'a\\\\"b c" d e': [r'a\\b c', r'd', r'e']
- }
- runner = ShellSplitTestRunner()
- for input_string, expected_output in unix_cases.items():
- yield runner.assert_equal, input_string, expected_output, "linux2"
- yield runner.assert_equal, input_string, expected_output, "darwin"
-
- yield runner.assert_raises, r'"', ValueError, "linux2"
- yield runner.assert_raises, r'"', ValueError, "darwin"
-
-
-class ShellSplitTestRunner(object):
- def assert_equal(self, s, expected, platform):
- assert_equal(compat_shell_split(s, platform), expected)
+class TestCompatShellSplit(unittest.TestCase):
+ def test_compat_shell_split_windows(self):
+ windows_cases = {
+ r'': [],
+ r'spam \\': [r'spam', '\\\\'],
+ r'spam ': [r'spam'],
+ r' spam': [r'spam'],
+ 'spam eggs': [r'spam', r'eggs'],
+ 'spam\teggs': [r'spam', r'eggs'],
+ 'spam\neggs': ['spam\neggs'],
+ '""': [''],
+ '" "': [' '],
+ '"\t"': ['\t'],
+ '\\\\': ['\\\\'],
+ '\\\\ ': ['\\\\'],
+ '\\\\\t': ['\\\\'],
+ r'\"': ['"'],
+ # The following four test cases are official test cases given in
+ # Microsoft's documentation.
+ r'"abc" d e': [r'abc', r'd', r'e'],
+ r'a\\b d"e f"g h': [r'a\\b', r'de fg', r'h'],
+ r'a\\\"b c d': [r'a\"b', r'c', r'd'],
+ r'a\\\\"b c" d e': [r'a\\b c', r'd', r'e']
+ }
+ for input_string, expected_output in windows_cases.items():
+ self.assertEqual(compat_shell_split(input_string, "win32"),
+ expected_output)
+
+ with self.assertRaises(ValueError):
+ compat_shell_split(r'"', "win32")
+
+ def test_compat_shell_split_unix(self):
+ unix_cases = {
+ r'': [],
+ r'spam \\': [r'spam', '\\'],
+ r'spam ': [r'spam'],
+ r' spam': [r'spam'],
+ 'spam eggs': [r'spam', r'eggs'],
+ 'spam\teggs': [r'spam', r'eggs'],
+ 'spam\neggs': ['spam', 'eggs'],
+ '""': [''],
+ '" "': [' '],
+ '"\t"': ['\t'],
+ '\\\\': ['\\'],
+ '\\\\ ': ['\\'],
+ '\\\\\t': ['\\'],
+ r'\"': ['"'],
+ # The following four test cases are official test cases given in
+ # Microsoft's documentation, but adapted to unix shell splitting.
+ r'"abc" d e': [r'abc', r'd', r'e'],
+ r'a\\b d"e f"g h': [r'a\b', r'de fg', r'h'],
+ r'a\\\"b c d': [r'a\"b', r'c', r'd'],
+ r'a\\\\"b c" d e': [r'a\\b c', r'd', r'e']
+ }
+ for input_string, expected_output in unix_cases.items():
+ self.assertEqual(compat_shell_split(input_string, "linux2"),
+ expected_output)
+ self.assertEqual(compat_shell_split(input_string, "darwin"),
+ expected_output)
- def assert_raises(self, s, exception_cls, platform):
- assert_raises(exception_cls, compat_shell_split, s, platform)
+ with self.assertRaises(ValueError):
+ compat_shell_split(r'"', "linux2")
+ with self.assertRaises(ValueError):
+ compat_shell_split(r'"', "darwin")
class TestTimezoneOperations(unittest.TestCase):
def test_get_tzinfo_options(self):
options = get_tzinfo_options()
- self.assertTrue(len(options) > 0)
+ self.assertGreater(len(options), 0)
for tzinfo in options:
self.assertIsInstance(tzinfo(), datetime.tzinfo)
--- a/tests/unit/test_config_provider.py
+++ b/tests/unit/test_config_provider.py
@@ -11,8 +11,7 @@
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
from tests import unittest
-import mock
-from nose.tools import assert_equal
+from tests import mock
import botocore
import botocore.session as session
@@ -308,7 +307,7 @@ class TestConfigValueStore(unittest.Test
provider = ConfigValueStore()
provider.set_config_variable('fake_variable', 'foo')
value = provider.get_config_variable('fake_variable')
- self.assertEquals(value, 'foo')
+ self.assertEqual(value, 'foo')
def test_can_set_config_provider(self):
foo_value_provider = mock.Mock(spec=BaseProvider)
@@ -448,7 +447,7 @@ def assert_chain_does_provide(providers,
providers=providers,
)
value = provider.provide()
- assert_equal(value, expected_value)
+ assert value == expected_value
def test_chain_provider():
@@ -468,9 +467,9 @@ def test_chain_provider():
('foo', ['foo', 'bar', 'baz']),
]
for case in cases:
- yield assert_chain_does_provide, \
- _make_providers_that_return(case[1]), \
- case[0]
+ assert_chain_does_provide(
+ _make_providers_that_return(case[1]),
+ case[0])
class TestChainProvider(unittest.TestCase):
--- a/tests/unit/test_eventstream.py
+++ b/tests/unit/test_eventstream.py
@@ -12,8 +12,10 @@
# language governing permissions and limitations under the License.
"""Unit tests for the binary event stream decoder. """
-from mock import Mock
-from nose.tools import assert_equal, raises
+try:
+ from mock import Mock
+except ImportError:
+ from unittest.mock import Mock
from botocore.parsers import EventStreamXMLParser
from botocore.eventstream import (
@@ -240,18 +242,12 @@ NEGATIVE_CASES = [
def assert_message_equal(message_a, message_b):
"""Asserts all fields for two messages are equal. """
- assert_equal(
- message_a.prelude.total_length,
- message_b.prelude.total_length
- )
- assert_equal(
- message_a.prelude.headers_length,
- message_b.prelude.headers_length
- )
- assert_equal(message_a.prelude.crc, message_b.prelude.crc)
- assert_equal(message_a.headers, message_b.headers)
- assert_equal(message_a.payload, message_b.payload)
- assert_equal(message_a.crc, message_b.crc)
+ assert message_a.prelude.total_length == message_b.prelude.total_length
+ assert message_a.prelude.headers_length == message_b.prelude.headers_length
+ assert message_a.prelude.crc == message_b.prelude.crc
+ assert message_a.headers == message_b.headers
+ assert message_a.payload == message_b.payload
+ assert message_a.crc == message_b.crc
def test_partial_message():
@@ -262,7 +258,7 @@ def test_partial_message():
mid_point = 15
event_buffer.add_data(data[:mid_point])
messages = list(event_buffer)
- assert_equal(messages, [])
+ assert messages == []
event_buffer.add_data(data[mid_point:len(data)])
for message in event_buffer:
assert_message_equal(message, EMPTY_MESSAGE[1])
@@ -280,7 +276,7 @@ def check_message_decodes(encoded, decod
def test_positive_cases():
"""Test that all positive cases decode how we expect. """
for (encoded, decoded) in POSITIVE_CASES:
- yield check_message_decodes, encoded, decoded
+ check_message_decodes(encoded, decoded)
def test_all_positive_cases():
@@ -301,8 +297,13 @@ def test_all_positive_cases():
def test_negative_cases():
"""Test that all negative cases raise the expected exception. """
for (encoded, exception) in NEGATIVE_CASES:
- test_function = raises(exception)(check_message_decodes)
- yield test_function, encoded, None
+ try:
+ check_message_decodes(encoded, None)
+ except exception:
+ pass
+ else:
+ raise AssertionError(
+ 'Expected exception {!s} has not been raised.'.format(exception))
def test_header_parser():
@@ -329,87 +330,87 @@ def test_header_parser():
parser = EventStreamHeaderParser()
headers = parser.parse(headers_data)
- assert_equal(headers, expected_headers)
+ assert headers == expected_headers
def test_message_prelude_properties():
"""Test that calculated properties from the payload are correct. """
# Total length: 40, Headers Length: 15, random crc
prelude = MessagePrelude(40, 15, 0x00000000)
- assert_equal(prelude.payload_length, 9)
- assert_equal(prelude.headers_end, 27)
- assert_equal(prelude.payload_end, 36)
+ assert prelude.payload_length == 9
+ assert prelude.headers_end == 27
+ assert prelude.payload_end == 36
def test_message_to_response_dict():
response_dict = PAYLOAD_ONE_STR_HEADER[1].to_response_dict()
- assert_equal(response_dict['status_code'], 200)
+ assert response_dict['status_code'] == 200
expected_headers = {'content-type': 'application/json'}
- assert_equal(response_dict['headers'], expected_headers)
- assert_equal(response_dict['body'], b"{'foo':'bar'}")
+ assert response_dict['headers'] == expected_headers
+ assert response_dict['body'] == b"{'foo':'bar'}"
def test_message_to_response_dict_error():
response_dict = ERROR_EVENT_MESSAGE[1].to_response_dict()
- assert_equal(response_dict['status_code'], 400)
+ assert response_dict['status_code'] == 400
headers = {
':message-type': 'error',
':error-code': 'code',
':error-message': 'message',
}
- assert_equal(response_dict['headers'], headers)
- assert_equal(response_dict['body'], b'')
+ assert response_dict['headers'] == headers
+ assert response_dict['body'] == b''
def test_unpack_uint8():
(value, bytes_consumed) = DecodeUtils.unpack_uint8(b'\xDE')
- assert_equal(bytes_consumed, 1)
- assert_equal(value, 0xDE)
+ assert bytes_consumed == 1
+ assert value == 0xDE
def test_unpack_uint32():
(value, bytes_consumed) = DecodeUtils.unpack_uint32(b'\xDE\xAD\xBE\xEF')
- assert_equal(bytes_consumed, 4)
- assert_equal(value, 0xDEADBEEF)
+ assert bytes_consumed == 4
+ assert value == 0xDEADBEEF
def test_unpack_int8():
(value, bytes_consumed) = DecodeUtils.unpack_int8(b'\xFE')
- assert_equal(bytes_consumed, 1)
- assert_equal(value, -2)
+ assert bytes_consumed == 1
+ assert value == -2
def test_unpack_int16():
(value, bytes_consumed) = DecodeUtils.unpack_int16(b'\xFF\xFE')
- assert_equal(bytes_consumed, 2)
- assert_equal(value, -2)
+ assert bytes_consumed == 2
+ assert value == -2
def test_unpack_int32():
(value, bytes_consumed) = DecodeUtils.unpack_int32(b'\xFF\xFF\xFF\xFE')
- assert_equal(bytes_consumed, 4)
- assert_equal(value, -2)
+ assert bytes_consumed == 4
+ assert value == -2
def test_unpack_int64():
test_bytes = b'\xFF\xFF\xFF\xFF\xFF\xFF\xFF\xFE'
(value, bytes_consumed) = DecodeUtils.unpack_int64(test_bytes)
- assert_equal(bytes_consumed, 8)
- assert_equal(value, -2)
+ assert bytes_consumed == 8
+ assert value == -2
def test_unpack_array_short():
test_bytes = b'\x00\x10application/json'
(value, bytes_consumed) = DecodeUtils.unpack_byte_array(test_bytes)
- assert_equal(bytes_consumed, 18)
- assert_equal(value, b'application/json')
+ assert bytes_consumed == 18
+ assert value == b'application/json'
def test_unpack_byte_array_int():
(value, array_bytes_consumed) = DecodeUtils.unpack_byte_array(
b'\x00\x00\x00\x10application/json', length_byte_size=4)
- assert_equal(array_bytes_consumed, 20)
- assert_equal(value, b'application/json')
+ assert array_bytes_consumed == 20
+ assert value == b'application/json'
def test_unpack_utf8_string():
@@ -417,18 +418,19 @@ def test_unpack_utf8_string():
utf8_string = b'\xe6\x97\xa5\xe6\x9c\xac\xe8\xaa\x9e'
encoded = length + utf8_string
(value, bytes_consumed) = DecodeUtils.unpack_utf8_string(encoded)
- assert_equal(bytes_consumed, 11)
- assert_equal(value, utf8_string.decode('utf-8'))
+ assert bytes_consumed == 11
+ assert value == utf8_string.decode('utf-8')
def test_unpack_prelude():
data = b'\x00\x00\x00\x01\x00\x00\x00\x02\x00\x00\x00\x03'
prelude = DecodeUtils.unpack_prelude(data)
- assert_equal(prelude, ((1, 2, 3), 12))
+ assert prelude == ((1, 2, 3), 12)
def create_mock_raw_stream(*data):
raw_stream = Mock()
+
def generator():
for chunk in data:
yield chunk
@@ -445,7 +447,7 @@ def test_event_stream_wrapper_iteration(
output_shape = Mock()
event_stream = EventStream(raw_stream, output_shape, parser, '')
events = list(event_stream)
- assert_equal(len(events), 1)
+ assert len(events) == 1
response_dict = {
'headers': {'event-id': 0x0000a00c},
@@ -455,14 +457,19 @@ def test_event_stream_wrapper_iteration(
parser.parse.assert_called_with(response_dict, output_shape)
-@raises(EventStreamError)
def test_eventstream_wrapper_iteration_error():
- raw_stream = create_mock_raw_stream(ERROR_EVENT_MESSAGE[0])
- parser = Mock(spec=EventStreamXMLParser)
- parser.parse.return_value = {}
- output_shape = Mock()
- event_stream = EventStream(raw_stream, output_shape, parser, '')
- list(event_stream)
+ try:
+ raw_stream = create_mock_raw_stream(ERROR_EVENT_MESSAGE[0])
+ parser = Mock(spec=EventStreamXMLParser)
+ parser.parse.return_value = {}
+ output_shape = Mock()
+ event_stream = EventStream(raw_stream, output_shape, parser, '')
+ list(event_stream)
+ except EventStreamError:
+ pass
+ else:
+ raise AssertionError(
+ 'Expected exception EventStreamError has not been raised.')
def test_event_stream_wrapper_close():
@@ -492,22 +499,32 @@ def test_event_stream_initial_response()
assert event.payload == payload
-@raises(NoInitialResponseError)
def test_event_stream_initial_response_wrong_type():
- raw_stream = create_mock_raw_stream(
- b"\x00\x00\x00+\x00\x00\x00\x0e4\x8b\xec{\x08event-id\x04\x00",
- b"\x00\xa0\x0c{'foo':'bar'}\xd3\x89\x02\x85",
- )
- parser = Mock(spec=EventStreamXMLParser)
- output_shape = Mock()
- event_stream = EventStream(raw_stream, output_shape, parser, '')
- event_stream.get_initial_response()
+ try:
+ raw_stream = create_mock_raw_stream(
+ b"\x00\x00\x00+\x00\x00\x00\x0e4\x8b\xec{\x08event-id\x04\x00",
+ b"\x00\xa0\x0c{'foo':'bar'}\xd3\x89\x02\x85",
+ )
+ parser = Mock(spec=EventStreamXMLParser)
+ output_shape = Mock()
+ event_stream = EventStream(raw_stream, output_shape, parser, '')
+ event_stream.get_initial_response()
+ except NoInitialResponseError:
+ pass
+ else:
+ raise AssertionError(
+ 'Expected exception NoInitialResponseError has not been raised.')
-@raises(NoInitialResponseError)
def test_event_stream_initial_response_no_event():
- raw_stream = create_mock_raw_stream(b'')
- parser = Mock(spec=EventStreamXMLParser)
- output_shape = Mock()
- event_stream = EventStream(raw_stream, output_shape, parser, '')
- event_stream.get_initial_response()
+ try:
+ raw_stream = create_mock_raw_stream(b'')
+ parser = Mock(spec=EventStreamXMLParser)
+ output_shape = Mock()
+ event_stream = EventStream(raw_stream, output_shape, parser, '')
+ event_stream.get_initial_response()
+ except NoInitialResponseError:
+ pass
+ else:
+ raise AssertionError(
+ 'Expected exception NoInitialResponseError has not been raised.')
--- a/tests/unit/test_exceptions.py
+++ b/tests/unit/test_exceptions.py
@@ -14,8 +14,6 @@
import pickle
from tests import unittest
-from nose.tools import assert_equal
-
import botocore.awsrequest
import botocore.session
from botocore import exceptions
@@ -24,7 +22,7 @@ from botocore import exceptions
def test_client_error_can_handle_missing_code_or_message():
response = {'Error': {}}
expect = 'An error occurred (Unknown) when calling the blackhole operation: Unknown'
- assert_equal(str(exceptions.ClientError(response, 'blackhole')), expect)
+ assert str(exceptions.ClientError(response, 'blackhole')) == expect
def test_client_error_has_operation_name_set():
@@ -36,7 +34,7 @@ def test_client_error_has_operation_name
def test_client_error_set_correct_operation_name():
response = {'Error': {}}
exception = exceptions.ClientError(response, 'blackhole')
- assert_equal(exception.operation_name, 'blackhole')
+ assert exception.operation_name == 'blackhole'
def test_retry_info_added_when_present():
--- a/tests/unit/test_http_client_exception_mapping.py
+++ b/tests/unit/test_http_client_exception_mapping.py
@@ -1,4 +1,4 @@
-from nose.tools import assert_raises
+import unittest
from botocore import exceptions as botocore_exceptions
from botocore.vendored.requests import exceptions as requests_exceptions
@@ -13,15 +13,9 @@ EXCEPTION_MAPPING = [
]
-def _raise_exception(exception):
- raise exception(endpoint_url=None, proxy_url=None, error=None)
-
-
-def _test_exception_mapping(new_exception, old_exception):
- # assert that the new exception can still be caught by the old vendored one
- assert_raises(old_exception, _raise_exception, new_exception)
-
-
-def test_http_client_exception_mapping():
- for new_exception, old_exception in EXCEPTION_MAPPING:
- yield _test_exception_mapping, new_exception, old_exception
+class TestHttpClientExceptionMapping(unittest.TestCase):
+ def test_http_client_exception_mapping(self):
+ for new_exception, old_exception in EXCEPTION_MAPPING:
+ with self.assertRaises(old_exception):
+ raise new_exception(endpoint_url=None, proxy_url=None,
+ error=None)
--- a/tests/unit/test_http_session.py
+++ b/tests/unit/test_http_session.py
@@ -1,11 +1,12 @@
import socket
-from mock import patch, Mock, ANY
+try:
+ from mock import patch, Mock, ANY
+except ImportError:
+ from unittest.mock import patch, Mock, ANY
from tests import unittest
-from nose.tools import raises
from urllib3.exceptions import NewConnectionError, ProtocolError
-from botocore.vendored import six
from botocore.awsrequest import AWSRequest
from botocore.awsrequest import AWSHTTPConnectionPool, AWSHTTPSConnectionPool
from botocore.httpsession import get_cert_path
@@ -250,15 +251,15 @@ class TestURLLib3Session(unittest.TestCa
session = URLLib3Session()
session.send(self.request.prepare())
- @raises(EndpointConnectionError)
def test_catches_new_connection_error(self):
- error = NewConnectionError(None, None)
- self.make_request_with_error(error)
+ with self.assertRaises(EndpointConnectionError):
+ error = NewConnectionError(None, None)
+ self.make_request_with_error(error)
- @raises(ConnectionClosedError)
def test_catches_bad_status_line(self):
- error = ProtocolError(None)
- self.make_request_with_error(error)
+ with self.assertRaises(ConnectionClosedError):
+ error = ProtocolError(None)
+ self.make_request_with_error(error)
def test_aws_connection_classes_are_used(self):
session = URLLib3Session()
--- a/tests/unit/test_model.py
+++ b/tests/unit/test_model.py
@@ -2,11 +2,11 @@ from tests import unittest
from botocore import model
from botocore.compat import OrderedDict
-from botocore.exceptions import MissingServiceIdError
+from botocore.compat import six
def test_missing_model_attribute_raises_exception():
- # We're using a nose test generator here to cut down
+ # We're using a test generator here to cut down
# on the duplication. The property names below
# all have the same test logic.
service_model = model.ServiceModel({'metadata': {'endpointPrefix': 'foo'}})
@@ -28,7 +28,7 @@ def test_missing_model_attribute_raises_
"be raised, but no exception was raised for: %s" % attr_name)
for name in property_names:
- yield _test_attribute_raise_exception, name
+ _test_attribute_raise_exception(name)
class TestServiceId(unittest.TestCase):
@@ -105,9 +105,9 @@ class TestServiceModel(unittest.TestCase
}
service_name = 'myservice'
service_model = model.ServiceModel(service_model, service_name)
- with self.assertRaisesRegexp(model.UndefinedModelAttributeError,
- service_name):
- service_model.service_id
+ with six.assertRaisesRegex(self, model.UndefinedModelAttributeError,
+ service_name):
+ service_model.service_id()
def test_operation_does_not_exist(self):
with self.assertRaises(model.OperationNotFoundError):
--- a/tests/unit/test_parsers.py
+++ b/tests/unit/test_parsers.py
@@ -14,11 +14,11 @@ from tests import unittest, RawResponse
import datetime
from dateutil.tz import tzutc
-from nose.tools import assert_equal
from botocore import parsers
from botocore import model
from botocore.compat import json, MutableMapping
+from botocore.compat import six
# HTTP responses will typically return a custom HTTP
@@ -597,8 +597,8 @@ class TestHandlesInvalidXMLResponses(uni
parser = parsers.QueryParser()
output_shape = None
# The XML body should be in the error message.
- with self.assertRaisesRegexp(parsers.ResponseParserError,
- '<DeleteTagsResponse'):
+ with six.assertRaisesRegex(self, parsers.ResponseParserError,
+ '<DeleteTagsResponse'):
parser.parse(
{'body': invalid_xml, 'headers': {}, 'status_code': 200},
output_shape)
@@ -1310,9 +1310,9 @@ def test_can_handle_generic_error_messag
).encode('utf-8')
empty_body = b''
none_body = None
- yield _assert_parses_generic_error, parser_cls(), generic_html_body
- yield _assert_parses_generic_error, parser_cls(), empty_body
- yield _assert_parses_generic_error, parser_cls(), none_body
+ _assert_parses_generic_error, parser_cls(), generic_html_body
+ _assert_parses_generic_error, parser_cls(), empty_body
+ _assert_parses_generic_error, parser_cls(), none_body
def _assert_parses_generic_error(parser, body):
@@ -1320,7 +1320,6 @@ def _assert_parses_generic_error(parser,
# html error page. We should be able to handle this case.
parsed = parser.parse({
'body': body, 'headers': {}, 'status_code': 503}, None)
- assert_equal(
- parsed['Error'],
- {'Code': '503', 'Message': 'Service Unavailable'})
- assert_equal(parsed['ResponseMetadata']['HTTPStatusCode'], 503)
+ assert parsed['Error'] == \
+ {'Code': '503', 'Message': 'Service Unavailable'}
+ assert parsed['ResponseMetadata']['HTTPStatusCode'] == 503
--- a/tests/unit/test_protocols.py
+++ b/tests/unit/test_protocols.py
@@ -16,7 +16,7 @@
This is a test runner for all the JSON tests defined in
``tests/unit/protocols/``, including both the input/output tests.
-You can use the normal ``nosetests tests/unit/test_protocols.py`` to run
+You can use the normal ``pytest tests/unit/test_protocols.py`` to run
this test. In addition, there are several env vars you can use during
development.
@@ -37,17 +37,17 @@ failed test.
To run tests from only a single file, you can set the
BOTOCORE_TEST env var::
- BOTOCORE_TEST=tests/unit/compliance/input/json.json nosetests tests/unit/test_protocols.py
+ BOTOCORE_TEST=tests/unit/compliance/input/json.json pytest tests/unit/test_protocols.py
To run a single test suite you can set the BOTOCORE_TEST_ID env var:
BOTOCORE_TEST=tests/unit/compliance/input/json.json BOTOCORE_TEST_ID=5 \
- nosetests tests/unit/test_protocols.py
+ pytest tests/unit/test_protocols.py
To run a single test case in a suite (useful when debugging a single test), you
can set the BOTOCORE_TEST_ID env var with the ``suite_id:test_id`` syntax.
- BOTOCORE_TEST_ID=5:1 nosetests test/unit/test_protocols.py
+ BOTOCORE_TEST_ID=5:1 pytest test/unit/test_protocols.py
"""
import os
@@ -69,8 +69,6 @@ from botocore.awsrequest import prepare_
from calendar import timegm
from botocore.model import NoShapeFoundError
-from nose.tools import assert_equal as _assert_equal
-
TEST_DIR = os.path.join(
os.path.dirname(os.path.abspath(__file__)),
'protocols')
@@ -101,9 +99,9 @@ def test_compliance():
if model.get('description') in PROTOCOL_TEST_BLACKLIST:
continue
if 'params' in case:
- yield _test_input, model, case, basename
+ _test_input(model, case, basename)
elif 'response' in case:
- yield _test_output, model, case, basename
+ _test_output(model, case, basename)
def _test_input(json_description, case, basename):
@@ -142,7 +140,7 @@ def _assert_endpoints_equal(actual, expe
return
prepare_request_dict(actual, endpoint)
actual_host = urlsplit(actual['url']).netloc
- assert_equal(actual_host, expected['host'], 'Host')
+ rich_assert_equal(actual_host, expected['host'], 'Host')
class MockRawResponse(object):
@@ -208,7 +206,7 @@ def _test_output(json_description, case,
expected_result.update(case['error'])
else:
expected_result = case['result']
- assert_equal(parsed, expected_result, "Body")
+ rich_assert_equal(parsed, expected_result, "Body")
except Exception as e:
_output_failure_message(model.metadata['protocol'],
case, parsed, expected_result, e)
@@ -318,11 +316,11 @@ def _try_json_dump(obj):
return str(obj)
-def assert_equal(first, second, prefix):
+def rich_assert_equal(first, second, prefix):
# A better assert equals. It allows you to just provide
# prefix instead of the entire message.
try:
- _assert_equal(first, second)
+ assert first == second
except Exception:
try:
better = "%s (actual != expected)\n%s !=\n%s" % (
@@ -353,14 +351,14 @@ def _serialize_request_description(reque
def _assert_requests_equal(actual, expected):
- assert_equal(actual['body'], expected.get('body', '').encode('utf-8'),
+ rich_assert_equal(actual['body'], expected.get('body', '').encode('utf-8'),
'Body value')
actual_headers = dict(actual['headers'])
expected_headers = expected.get('headers', {})
- assert_equal(actual_headers, expected_headers, "Header values")
- assert_equal(actual['url_path'], expected.get('uri', ''), "URI")
+ rich_assert_equal(actual_headers, expected_headers, "Header values")
+ rich_assert_equal(actual['url_path'], expected.get('uri', ''), "URI")
if 'method' in expected:
- assert_equal(actual['method'], expected['method'], "Method")
+ rich_assert_equal(actual['method'], expected['method'], "Method")
def _walk_files():
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,7 +1,7 @@
tox>=2.5.0,<3.0.0
-nose==1.3.7
+pytest>=4.6
+pluggy>=0.7
+py>=1.5.0
+pytest-cov
mock==1.3.0
wheel==0.24.0
-docutils>=0.10,<0.16
-behave==1.2.5
-jsonschema==2.5.1
--- a/tests/unit/response_parsing/README.rst
+++ b/tests/unit/response_parsing/README.rst
@@ -16,12 +16,12 @@ response sent from the server for that p
file contains the expected Python data structure created from the XML
response.
-The main test is contained in ``test_response_parser.py`` and is
-implemented as a nose generator. Each time through the loop an XML
-file is read and passed to a ``botocore.response.XmlResponse``
-object. The corresponding JSON file is then parsed and compared to
-the value created by the parser. If the are equal, the test passes. If
-they are not equal, both the expected result and the actual result are
+The main test is contained in ``test_response_parser.py``. Each
+time through the loop an XML file is read and passed to
+a ``botocore.response.XmlResponse`` object. The corresponding
+JSON file is then parsed and compared to the value created by the
+parser. If the are equal, the test passes. If they are not
+equal, both the expected result and the actual result are
pretty-printed to stdout and the tests continue.
-----------------
--- a/tests/functional/test_credentials.py
+++ b/tests/functional/test_credentials.py
@@ -15,7 +15,7 @@ import threading
import os
import math
import time
-import mock
+from tests import mock
import tempfile
import shutil
from datetime import datetime, timedelta
@@ -41,7 +41,7 @@ from botocore.session import Session
from botocore.exceptions import InvalidConfigError, InfiniteLoopConfigError
from botocore.stub import Stubber
from botocore.utils import datetime2timestamp
-
+from botocore.compat import six
class TestCredentialRefreshRaces(unittest.TestCase):
def assert_consistent_credentials_seen(self, creds, func):
@@ -826,7 +826,7 @@ class TestProcessProvider(unittest.TestC
# Finally `(?s)` at the beginning makes dots match newlines so
# we can handle a multi-line string.
reg = r"(?s)^((?!b').)*$"
- with self.assertRaisesRegexp(CredentialRetrievalError, reg):
+ with six.assertRaisesRegex(self, CredentialRetrievalError, reg):
session.get_credentials()
--- a/tests/functional/test_history.py
+++ b/tests/functional/test_history.py
@@ -1,6 +1,6 @@
from contextlib import contextmanager
-import mock
+from tests import mock
from tests import BaseSessionTest, ClientHTTPStubber
from botocore.history import BaseHistoryHandler
@@ -87,10 +87,10 @@ class TestRecordStatementsInjections(Bas
self.assertIsNone(body)
streaming = payload['streaming']
- self.assertEquals(streaming, False)
+ self.assertEqual(streaming, False)
url = payload['url']
- self.assertEquals(url, 'https://s3.us-west-2.amazonaws.com/')
+ self.assertEqual(url, 'https://s3.us-west-2.amazonaws.com/')
self.assertEqual(source, 'BOTOCORE')
--- a/tests/functional/test_retry.py
+++ b/tests/functional/test_retry.py
@@ -16,6 +16,7 @@ from tests import BaseSessionTest, mock,
from botocore.exceptions import ClientError
from botocore.config import Config
+from botocore.compat import six
class BaseRetryTest(BaseSessionTest):
@@ -38,7 +39,7 @@ class BaseRetryTest(BaseSessionTest):
with ClientHTTPStubber(client) as http_stubber:
for _ in range(num_responses):
http_stubber.add_response(status=status, body=body)
- with self.assertRaisesRegexp(
+ with six.assertRaisesRegex(self,
ClientError, 'reached max retries: %s' % num_retries):
yield
self.assertEqual(len(http_stubber.requests), num_responses)
--- a/tests/functional/test_stub.py
+++ b/tests/functional/test_stub.py
@@ -16,6 +16,7 @@ from tests import unittest
import botocore
import botocore.session
import botocore.stub as stub
+from botocore.compat import six
from botocore.stub import Stubber
from botocore.exceptions import StubResponseError, ClientError, \
StubAssertionError, UnStubbedResponseError
@@ -54,8 +55,8 @@ class TestStubber(unittest.TestCase):
def test_activated_stubber_errors_with_no_registered_stubs(self):
self.stubber.activate()
# Params one per line for readability.
- with self.assertRaisesRegexp(UnStubbedResponseError,
- "Unexpected API Call"):
+ with six.assertRaisesRegex(self, UnStubbedResponseError,
+ "Unexpected API Call"):
self.client.list_objects(
Bucket='asdfasdfasdfasdf',
Delimiter='asdfasdfasdfasdf',
@@ -119,8 +120,8 @@ class TestStubber(unittest.TestCase):
'list_objects', service_response, expected_params)
self.stubber.activate()
# This should call should raise an for mismatching expected params.
- with self.assertRaisesRegexp(StubResponseError,
- "{'Bucket': 'bar'},\n"):
+ with six.assertRaisesRegex(self, StubResponseError,
+ "{'Bucket': 'bar'},\n"):
self.client.list_objects(Bucket='foo')
def test_expected_params_mixed_with_errors_responses(self):
@@ -143,7 +144,8 @@ class TestStubber(unittest.TestCase):
self.client.list_objects(Bucket='foo')
# The second call should throw an error for unexpected parameters
- with self.assertRaisesRegexp(StubResponseError, 'Expected parameters'):
+ with six.assertRaisesRegex(self, StubResponseError,
+ 'Expected parameters'):
self.client.list_objects(Bucket='foo')
def test_can_continue_to_call_after_expected_params_fail(self):
--- a/tests/integration/test_client.py
+++ b/tests/integration/test_client.py
@@ -84,7 +84,7 @@ class TestCreateClients(unittest.TestCas
self.assertTrue(hasattr(client, 'list_buckets'))
def test_client_raises_exception_invalid_region(self):
- with self.assertRaisesRegexp(ValueError, ('Invalid endpoint')):
+ with six.assertRaisesRegex(self, ValueError, ('Invalid endpoint')):
self.session.create_client(
'cloudformation', region_name='invalid region name')
@@ -96,8 +96,8 @@ class TestClientErrors(unittest.TestCase
def test_region_mentioned_in_invalid_region(self):
client = self.session.create_client(
'cloudformation', region_name='us-east-999')
- with self.assertRaisesRegexp(EndpointConnectionError,
- 'Could not connect to the endpoint URL'):
+ with six.assertRaisesRegex(self, EndpointConnectionError,
+ 'Could not connect to the endpoint URL'):
client.list_stacks()
def test_client_modeled_exception(self):
--- a/tests/integration/test_sts.py
+++ b/tests/integration/test_sts.py
@@ -13,6 +13,8 @@
from tests import unittest
import botocore.session
+
+from botocore.compat import six
from botocore.exceptions import ClientError
class TestSTS(unittest.TestCase):
@@ -38,5 +40,5 @@ class TestSTS(unittest.TestCase):
self.assertEqual(sts.meta.endpoint_url,
'https://sts.us-west-2.amazonaws.com')
# Signing error will be thrown with the incorrect region name included.
- with self.assertRaisesRegexp(ClientError, 'ap-southeast-1') as e:
+ with six.assertRaisesRegex(self, ClientError, 'ap-southeast-1'):
sts.get_session_token()
--- a/tests/unit/docs/test_utils.py
+++ b/tests/unit/docs/test_utils.py
@@ -223,5 +223,5 @@ class TestAppendParamDocumentation(BaseD
class TestEscapeControls(unittest.TestCase):
def test_escapes_controls(self):
escaped = escape_controls('\na\rb\tc\fd\be')
- self.assertEquals(escaped, '\\na\\rb\\tc\\fd\\be')
+ self.assertEqual(escaped, '\\na\\rb\\tc\\fd\\be')
--- a/tests/unit/response_parsing/test_response_parsing.py
+++ b/tests/unit/response_parsing/test_response_parsing.py
@@ -119,8 +119,8 @@ def test_xml_parsing():
expected = _get_expected_parsed_result(xmlfile)
operation_model = _get_operation_model(service_model, xmlfile)
raw_response_body = _get_raw_response_body(xmlfile)
- yield _test_parsed_response, xmlfile, raw_response_body, \
- operation_model, expected
+ _test_parsed_response(xmlfile, raw_response_body,
+ operation_model, expected)
def _get_raw_response_body(xmlfile):
@@ -179,8 +179,8 @@ def test_json_errors_parsing():
operation_model = service_model.operation_model(op_name)
with open(raw_response_file, 'rb') as f:
raw_response_body = f.read()
- yield _test_parsed_response, raw_response_file, \
- raw_response_body, operation_model, expected
+ _test_parsed_response(raw_response_file,
+ raw_response_body, operation_model, expected)
def _uhg_test_json_parsing():
--- a/tests/unit/test_awsrequest.py
+++ b/tests/unit/test_awsrequest.py
@@ -18,13 +18,15 @@ import tempfile
import shutil
import io
import socket
-import sys
-from mock import Mock, patch
+try:
+ from mock import Mock, patch
+except ImportError:
+ from unittest.mock import Mock, patch
from urllib3.connectionpool import HTTPConnectionPool, HTTPSConnectionPool
from botocore.exceptions import UnseekableStreamError
-from botocore.awsrequest import AWSRequest, AWSPreparedRequest, AWSResponse
+from botocore.awsrequest import AWSRequest, AWSResponse
from botocore.awsrequest import AWSHTTPConnection, AWSHTTPSConnection, HeadersDict
from botocore.awsrequest import prepare_request_dict, create_request_object
from botocore.compat import file_type, six
@@ -271,11 +273,11 @@ class TestAWSResponse(unittest.TestCase)
def test_text_property(self):
self.set_raw_stream([b'\xe3\x82\xb8\xe3\x83\xa7\xe3\x82\xb0'])
self.response.headers['content-type'] = 'text/plain; charset=utf-8'
- self.assertEquals(self.response.text, u'\u30b8\u30e7\u30b0')
+ self.assertEqual(self.response.text, u'\u30b8\u30e7\u30b0')
def test_text_property_defaults_utf8(self):
self.set_raw_stream([b'\xe3\x82\xb8\xe3\x83\xa7\xe3\x82\xb0'])
- self.assertEquals(self.response.text, u'\u30b8\u30e7\u30b0')
+ self.assertEqual(self.response.text, u'\u30b8\u30e7\u30b0')
class TestAWSHTTPConnection(unittest.TestCase):
--- a/tests/unit/test_client.py
+++ b/tests/unit/test_client.py
@@ -12,7 +12,7 @@
# language governing permissions and limitations under the License.
import botocore.config
from tests import unittest
-import mock
+from tests import mock
import botocore
from botocore import utils
@@ -554,8 +554,8 @@ class TestAutoGeneratedClient(unittest.T
creator = self.create_client_creator()
service_client = creator.create_client(
'myservice', 'us-west-2', credentials=self.credentials)
- with self.assertRaisesRegexp(
- TypeError, 'only accepts keyword arguments'):
+ with six.assertRaisesRegex(self, TypeError,
+ 'only accepts keyword arguments'):
service_client.test_operation('foo')
@mock.patch('botocore.args.RequestSigner.sign')
@@ -1550,15 +1550,15 @@ class TestConfig(unittest.TestCase):
self.assertEqual(config.read_timeout, 50)
def test_invalid_kwargs(self):
- with self.assertRaisesRegexp(TypeError, 'Got unexpected keyword'):
+ with six.assertRaisesRegex(self, TypeError, 'Got unexpected keyword'):
botocore.config.Config(foo='foo')
def test_pass_invalid_length_of_args(self):
- with self.assertRaisesRegexp(TypeError, 'Takes at most'):
+ with six.assertRaisesRegex(self, TypeError, 'Takes at most'):
botocore.config.Config('foo', *botocore.config.Config.OPTION_DEFAULTS.values())
def test_create_with_multiple_kwargs(self):
- with self.assertRaisesRegexp(TypeError, 'Got multiple values'):
+ with six.assertRaisesRegex(self, TypeError, 'Got multiple values'):
botocore.config.Config('us-east-1', region_name='us-east-1')
def test_merge_returns_new_config_object(self):
@@ -1610,10 +1610,10 @@ class TestConfig(unittest.TestCase):
self.assertEqual(config.retries['max_attempts'], 15)
def test_validates_retry_config(self):
- with self.assertRaisesRegexp(
- InvalidRetryConfigurationError,
- 'Cannot provide retry configuration for "not-allowed"'):
- botocore.config.Config(retries={'not-allowed': True})
+ with six.assertRaisesRegex(
+ self, InvalidRetryConfigurationError,
+ 'Cannot provide retry configuration for "not-allowed"'):
+ botocore.config.Config(retries={'not-allowed': True})
def test_validates_max_retry_attempts(self):
with self.assertRaises(InvalidMaxRetryAttemptsError):
--- a/tests/unit/test_credentials.py
+++ b/tests/unit/test_credentials.py
@@ -13,7 +13,7 @@
# language governing permissions and limitations under the License.
from datetime import datetime, timedelta
import subprocess
-import mock
+from tests import mock
import os
import tempfile
import shutil
@@ -1083,7 +1083,7 @@ class TestEnvVar(BaseEnvVar):
"Credentials were refreshed, but the refreshed credentials are "
"still expired."
)
- with self.assertRaisesRegexp(RuntimeError, error_message):
+ with six.assertRaisesRegex(self, RuntimeError, error_message):
creds.get_frozen_credentials()
def test_partial_creds_is_an_error(self):
@@ -1149,7 +1149,7 @@ class TestEnvVar(BaseEnvVar):
"Credentials were refreshed, but the refreshed credentials are "
"still expired."
)
- with self.assertRaisesRegexp(RuntimeError, error_message):
+ with six.assertRaisesRegex(self, RuntimeError, error_message):
creds.get_frozen_credentials()
# Now we update the environment with non-expired credentials,
@@ -2745,7 +2745,7 @@ class TestRefreshLogic(unittest.TestCase
mandatory_refresh=7,
refresh_function=fail_refresh
)
- with self.assertRaisesRegexp(Exception, 'refresh failed'):
+ with six.assertRaisesRegex(self, Exception, 'refresh failed'):
creds.get_frozen_credentials()
def test_exception_propogated_on_expired_credentials(self):
@@ -2758,7 +2758,7 @@ class TestRefreshLogic(unittest.TestCase
mandatory_refresh=7,
refresh_function=fail_refresh
)
- with self.assertRaisesRegexp(Exception, 'refresh failed'):
+ with six.assertRaisesRegex(self, Exception, 'refresh failed'):
# Because credentials are actually expired, any
# failure to refresh should be propagated.
creds.get_frozen_credentials()
@@ -2779,7 +2779,7 @@ class TestRefreshLogic(unittest.TestCase
creds_last_for=-2,
)
err_msg = 'refreshed credentials are still expired'
- with self.assertRaisesRegexp(RuntimeError, err_msg):
+ with six.assertRaisesRegex(self, RuntimeError, err_msg):
# Because credentials are actually expired, any
# failure to refresh should be propagated.
creds.get_frozen_credentials()
@@ -3067,7 +3067,7 @@ class TestProcessProvider(BaseEnvVar):
provider = self.create_process_provider()
exception = botocore.exceptions.CredentialRetrievalError
- with self.assertRaisesRegexp(exception, 'Error Message'):
+ with six.assertRaisesRegex(self, exception, 'Error Message'):
provider.load()
def test_unsupported_version_raises_mismatch(self):
@@ -3085,7 +3085,7 @@ class TestProcessProvider(BaseEnvVar):
provider = self.create_process_provider()
exception = botocore.exceptions.CredentialRetrievalError
- with self.assertRaisesRegexp(exception, 'Unsupported version'):
+ with six.assertRaisesRegex(self, exception, 'Unsupported version'):
provider.load()
def test_missing_version_in_payload_returned_raises_exception(self):
@@ -3102,7 +3102,7 @@ class TestProcessProvider(BaseEnvVar):
provider = self.create_process_provider()
exception = botocore.exceptions.CredentialRetrievalError
- with self.assertRaisesRegexp(exception, 'Unsupported version'):
+ with six.assertRaisesRegex(self, exception, 'Unsupported version'):
provider.load()
def test_missing_access_key_raises_exception(self):
@@ -3119,7 +3119,7 @@ class TestProcessProvider(BaseEnvVar):
provider = self.create_process_provider()
exception = botocore.exceptions.CredentialRetrievalError
- with self.assertRaisesRegexp(exception, 'Missing required key'):
+ with six.assertRaisesRegex(self, exception, 'Missing required key'):
provider.load()
def test_missing_secret_key_raises_exception(self):
@@ -3136,7 +3136,7 @@ class TestProcessProvider(BaseEnvVar):
provider = self.create_process_provider()
exception = botocore.exceptions.CredentialRetrievalError
- with self.assertRaisesRegexp(exception, 'Missing required key'):
+ with six.assertRaisesRegex(self, exception, 'Missing required key'):
provider.load()
def test_missing_session_token(self):
--- a/tests/unit/test_errorfactory.py
+++ b/tests/unit/test_errorfactory.py
@@ -12,6 +12,7 @@
# language governing permissions and limitations under the License.
from tests import unittest
+from botocore.compat import six
from botocore.exceptions import ClientError
from botocore.errorfactory import BaseClientExceptions
from botocore.errorfactory import ClientExceptionsFactory
@@ -39,7 +40,7 @@ class TestBaseClientExceptions(unittest.
def test_gettattr_message(self):
exception_cls = type('MyException', (ClientError,), {})
self.code_to_exception['MyExceptionCode'] = exception_cls
- with self.assertRaisesRegexp(
+ with six.assertRaisesRegex(self,
AttributeError, 'Valid exceptions are: MyException'):
self.exceptions.SomeUnmodeledError
--- a/tests/unit/test_handlers.py
+++ b/tests/unit/test_handlers.py
@@ -14,7 +14,7 @@
from tests import unittest, BaseSessionTest
import base64
-import mock
+from tests import mock
import copy
import os
import json
@@ -126,7 +126,7 @@ class TestHandlers(BaseSessionTest):
'foo/keyname%2B?versionId=asdf+')
def test_copy_source_has_validation_failure(self):
- with self.assertRaisesRegexp(ParamValidationError, 'Key'):
+ with six.assertRaisesRegex(self, ParamValidationError, 'Key'):
handlers.handle_copy_source_param(
{'CopySource': {'Bucket': 'foo'}})
--- a/tests/unit/test_loaders.py
+++ b/tests/unit/test_loaders.py
@@ -22,12 +22,13 @@
import os
import contextlib
import copy
-import mock
+from tests import mock
from botocore.exceptions import DataNotFoundError, UnknownServiceError
from botocore.loaders import JSONFileLoader
from botocore.loaders import Loader, create_loader
from botocore.loaders import ExtrasProcessor
+from botocore.compat import six
from tests import BaseEnvVar
@@ -156,8 +157,8 @@ class TestLoader(BaseEnvVar):
# Should have a) the unknown service name and b) list of valid
# service names.
- with self.assertRaisesRegexp(UnknownServiceError,
- 'Unknown service.*BAZ.*baz'):
+ with six.assertRaisesRegex(self, UnknownServiceError,
+ 'Unknown service.*BAZ.*baz'):
loader.load_service_model('BAZ', type_name='service-2')
def test_load_service_model_uses_provided_type_name(self):
@@ -169,8 +170,8 @@ class TestLoader(BaseEnvVar):
# Should have a) the unknown service name and b) list of valid
# service names.
provided_type_name = 'not-service-2'
- with self.assertRaisesRegexp(UnknownServiceError,
- 'Unknown service.*BAZ.*baz'):
+ with six.assertRaisesRegex(self, UnknownServiceError,
+ 'Unknown service.*BAZ.*baz'):
loader.load_service_model(
'BAZ', type_name=provided_type_name)
--- a/tests/unit/test_paginate.py
+++ b/tests/unit/test_paginate.py
@@ -20,7 +20,7 @@ from botocore.paginate import TokenEncod
from botocore.exceptions import PaginationError
from botocore.compat import six
-import mock
+from tests import mock
def encode_token(token):
@@ -823,7 +823,7 @@ class TestKeyIterators(unittest.TestCase
{"Users": ["User3"]},
]
self.method.side_effect = responses
- with self.assertRaisesRegexp(ValueError, 'Bad starting token'):
+ with six.assertRaisesRegex(self, ValueError, 'Bad starting token'):
pagination_config = {'StartingToken': 'does___not___work'}
self.paginator.paginate(
PaginationConfig=pagination_config).build_full_result()
--- a/tests/unit/test_s3_addressing.py
+++ b/tests/unit/test_s3_addressing.py
@@ -16,9 +16,13 @@
import os
from tests import BaseSessionTest, ClientHTTPStubber
-from mock import patch, Mock
+try:
+ from mock import patch, Mock
+except ImportError:
+ from unittest.mock import patch, Mock
from botocore.compat import OrderedDict
+from botocore.compat import six
from botocore.handlers import set_list_objects_encoding_type_url
@@ -198,7 +202,7 @@ class TestS3Addressing(BaseSessionTest):
'https://s3.us-west-2.amazonaws.com/192.168.5.256/mykeyname')
def test_invalid_endpoint_raises_exception(self):
- with self.assertRaisesRegexp(ValueError, 'Invalid endpoint'):
+ with six.assertRaisesRegex(self, ValueError, 'Invalid endpoint'):
self.session.create_client('s3', 'Invalid region')
def test_non_existent_region(self):
--- a/tests/unit/test_utils.py
+++ b/tests/unit/test_utils.py
@@ -15,7 +15,7 @@ from tests import RawResponse
from dateutil.tz import tzutc, tzoffset
import datetime
import copy
-import mock
+from tests import mock
import botocore
from botocore import xform_name
@@ -2003,7 +2003,7 @@ class TestContainerMetadataFetcher(unitt
response_body = {'foo': 'bar'}
self.set_http_responses_to(response_body)
fetcher = self.create_fetcher()
- with self.assertRaisesRegexp(ValueError, 'Unsupported host'):
+ with six.assertRaisesRegex(self, ValueError, 'Unsupported host'):
fetcher.retrieve_full_uri(full_uri)
self.assertFalse(self.http.send.called)
--- a/tests/unit/test_waiters.py
+++ b/tests/unit/test_waiters.py
@@ -13,7 +13,7 @@
import os
from tests import unittest, BaseEnvVar
-import mock
+from tests import mock
import botocore
from botocore.compat import six
@@ -389,7 +389,7 @@ class TestWaitersObjects(unittest.TestCa
)
waiter = Waiter('MyWaiter', config, operation_method)
- with self.assertRaisesRegexp(WaiterError, error_message):
+ with six.assertRaisesRegex(self, WaiterError, error_message):
waiter.wait()
def test_waiter_transitions_to_failure_state(self):
--- a/tests/functional/docs/test_shared_example_config.py
+++ b/tests/functional/docs/test_shared_example_config.py
@@ -27,7 +27,7 @@ def test_lint_shared_example_configs():
examples = example_config.get("examples", {})
for operation, operation_examples in examples.items():
for example in operation_examples:
- yield _lint_single_example, operation, example, service_model
+ _lint_single_example(operation, example, service_model)
def _lint_single_example(operation_name, example_config, service_model):
--- a/tests/functional/test_alias.py
+++ b/tests/functional/test_alias.py
@@ -49,13 +49,13 @@ ALIAS_CASES = [
def test_can_use_alias():
session = botocore.session.get_session()
for case in ALIAS_CASES:
- yield _can_use_parameter_in_client_call, session, case
+ _can_use_parameter_in_client_call(session, case)
def test_can_use_original_name():
session = botocore.session.get_session()
for case in ALIAS_CASES:
- yield _can_use_parameter_in_client_call, session, case, False
+ _can_use_parameter_in_client_call(session, case, False)
def _can_use_parameter_in_client_call(session, case, use_alias=True):
--- a/tests/functional/test_event_alias.py
+++ b/tests/functional/test_event_alias.py
@@ -584,8 +584,8 @@ def test_event_alias():
service_id = SERVICES[client_name]['service_id']
if endpoint_prefix is not None:
yield _assert_handler_called, client_name, endpoint_prefix
- yield _assert_handler_called, client_name, service_id
- yield _assert_handler_called, client_name, client_name
+ _assert_handler_called(client_name, service_id)
+ _assert_handler_called(client_name, client_name)
def _assert_handler_called(client_name, event_part):
--- a/tests/functional/test_h2_required.py
+++ b/tests/functional/test_h2_required.py
@@ -29,12 +29,12 @@ def test_all_uses_of_h2_are_known():
service_model = session.get_service_model(service)
h2_config = service_model.metadata.get('protocolSettings', {}).get('h2')
if h2_config == 'required':
- yield _assert_h2_service_is_known, service
+ _assert_h2_service_is_known(service)
elif h2_config == 'eventstream':
for operation in service_model.operation_names:
operation_model = service_model.operation_model(operation)
if operation_model.has_event_stream_output:
- yield _assert_h2_operation_is_known, service, operation
+ _assert_h2_operation_is_known(service, operation)
def _assert_h2_service_is_known(service):
--- a/tests/functional/test_model_completeness.py
+++ b/tests/functional/test_model_completeness.py
@@ -38,5 +38,6 @@ def test_paginators_and_waiters_are_not_
versions = Loader().list_api_versions(service_name, 'service-2')
if len(versions) > 1:
for type_name in ['paginators-1', 'waiters-2']:
- yield (_test_model_is_not_lost, service_name,
- type_name, versions[-2], versions[-1])
+ _test_model_is_not_lost(service_name,
+ type_name,
+ versions[-2], versions[-1])
--- a/tests/functional/test_paginator_config.py
+++ b/tests/functional/test_paginator_config.py
@@ -140,12 +140,7 @@ def test_lint_pagination_configs():
'paginators-1',
service_model.api_version)
for op_name, single_config in page_config['pagination'].items():
- yield (
- _lint_single_paginator,
- op_name,
- single_config,
- service_model
- )
+ _lint_single_paginator(op_name, single_config, service_model)
def _lint_single_paginator(operation_name, page_config,
--- a/tests/functional/test_public_apis.py
+++ b/tests/functional/test_public_apis.py
@@ -12,7 +12,7 @@
# language governing permissions and limitations under the License.
from collections import defaultdict
-import mock
+from tests import mock
from tests import ClientHTTPStubber
from botocore.session import Session
@@ -73,4 +73,4 @@ def test_public_apis_will_not_be_signed(
for operation_name in PUBLIC_API_TESTS[service_name]:
kwargs = PUBLIC_API_TESTS[service_name][operation_name]
method = getattr(client, xform_name(operation_name))
- yield _test_public_apis_will_not_be_signed, client, method, kwargs
+ _test_public_apis_will_not_be_signed(client, method, kwargs)
--- a/tests/functional/test_service_alias.py
+++ b/tests/functional/test_service_alias.py
@@ -17,7 +17,7 @@ from botocore.handlers import SERVICE_NA
def test_can_use_service_alias():
session = botocore.session.get_session()
for (alias, name) in SERVICE_NAME_ALIASES.items():
- yield _instantiates_the_same_client, session, name, alias
+ _instantiates_the_same_client(session, name, alias)
def _instantiates_the_same_client(session, service_name, service_alias):
--- a/tests/functional/test_six_imports.py
+++ b/tests/functional/test_six_imports.py
@@ -15,7 +15,7 @@ def test_no_bare_six_imports():
if not filename.endswith('.py'):
continue
fullname = os.path.join(rootdir, filename)
- yield _assert_no_bare_six_imports, fullname
+ _assert_no_bare_six_imports(fullname)
def _assert_no_bare_six_imports(filename):
--- a/tests/functional/test_waiter_config.py
+++ b/tests/functional/test_waiter_config.py
@@ -98,9 +98,9 @@ def test_lint_waiter_configs():
except UnknownServiceError:
# The service doesn't have waiters
continue
- yield _validate_schema, validator, waiter_model
+ _validate_schema(validator, waiter_model)
for waiter_name in client.waiter_names:
- yield _lint_single_waiter, client, waiter_name, service_model
+ _lint_single_waiter(client, waiter_name, service_model)
def _lint_single_waiter(client, waiter_name, service_model):
--- a/tests/functional/test_apigateway.py
+++ b/tests/functional/test_apigateway.py
@@ -10,7 +10,7 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
-import mock
+from tests import mock
from tests import BaseSessionTest, ClientHTTPStubber
--- a/tests/functional/test_cloudsearchdomain.py
+++ b/tests/functional/test_cloudsearchdomain.py
@@ -10,7 +10,7 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
-import mock
+from tests import mock
from tests import BaseSessionTest, ClientHTTPStubber
--- a/tests/functional/test_docdb.py
+++ b/tests/functional/test_docdb.py
@@ -10,7 +10,7 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
-import mock
+from tests import mock
from contextlib import contextmanager
import botocore.session
--- a/tests/functional/test_ec2.py
+++ b/tests/functional/test_ec2.py
@@ -11,7 +11,7 @@
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import datetime
-import mock
+from tests import mock
from tests import unittest, ClientHTTPStubber, BaseSessionTest
from botocore.compat import parse_qs, urlparse
--- a/tests/functional/test_lex.py
+++ b/tests/functional/test_lex.py
@@ -10,7 +10,7 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
-import mock
+from tests import mock
from datetime import datetime
from tests import BaseSessionTest, ClientHTTPStubber
--- a/tests/functional/test_machinelearning.py
+++ b/tests/functional/test_machinelearning.py
@@ -10,7 +10,7 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
-import mock
+from tests import mock
from tests import BaseSessionTest, ClientHTTPStubber
--- a/tests/functional/test_neptune.py
+++ b/tests/functional/test_neptune.py
@@ -10,7 +10,7 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
-import mock
+from tests import mock
from contextlib import contextmanager
import botocore.session
--- a/tests/functional/test_rds.py
+++ b/tests/functional/test_rds.py
@@ -10,7 +10,7 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
-import mock
+from tests import mock
from contextlib import contextmanager
import botocore.session
--- a/tests/functional/test_session.py
+++ b/tests/functional/test_session.py
@@ -12,7 +12,7 @@
# language governing permissions and limitations under the License.
from tests import unittest, temporary_file
-import mock
+from tests import mock
import botocore.session
from botocore.exceptions import ProfileNotFound
--- a/tests/functional/test_sts.py
+++ b/tests/functional/test_sts.py
@@ -13,7 +13,7 @@
from datetime import datetime
import re
-import mock
+from tests import mock
from tests import BaseSessionTest
from tests import temporary_file
--- a/tests/integration/test_credentials.py
+++ b/tests/integration/test_credentials.py
@@ -11,7 +11,7 @@
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
import os
-import mock
+from tests import mock
import tempfile
import shutil
import json
--- a/tests/integration/test_loaders.py
+++ b/tests/integration/test_loaders.py
@@ -13,7 +13,7 @@
import os
from tests import unittest
-import mock
+from tests import mock
import botocore.session
--- a/tests/unit/auth/test_signers.py
+++ b/tests/unit/auth/test_signers.py
@@ -18,7 +18,7 @@ import time
import base64
import json
-import mock
+from tests import mock
import botocore.auth
import botocore.credentials
--- a/tests/unit/docs/__init__.py
+++ b/tests/unit/docs/__init__.py
@@ -16,7 +16,7 @@ import tempfile
import shutil
from botocore.docs.bcdoc.restdoc import DocumentStructure
-import mock
+from tests import mock
from tests import unittest
from botocore.compat import OrderedDict
--- a/tests/unit/docs/bcdoc/test_docstringparser.py
+++ b/tests/unit/docs/bcdoc/test_docstringparser.py
@@ -18,7 +18,7 @@
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
-import mock
+from tests import mock
from tests import unittest
import botocore.docs.bcdoc.docstringparser as parser
--- a/tests/unit/docs/test_docs.py
+++ b/tests/unit/docs/test_docs.py
@@ -14,7 +14,7 @@ import os
import shutil
import tempfile
-import mock
+from tests import mock
from tests.unit.docs import BaseDocsTest
from botocore.session import get_session
--- a/tests/unit/docs/test_example.py
+++ b/tests/unit/docs/test_example.py
@@ -10,7 +10,7 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
-import mock
+from tests import mock
from tests.unit.docs import BaseDocsTest
from botocore.hooks import HierarchicalEmitter
--- a/tests/unit/docs/test_params.py
+++ b/tests/unit/docs/test_params.py
@@ -10,7 +10,7 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
-import mock
+from tests import mock
from tests.unit.docs import BaseDocsTest
from botocore.hooks import HierarchicalEmitter
--- a/tests/unit/docs/test_service.py
+++ b/tests/unit/docs/test_service.py
@@ -12,7 +12,7 @@
# language governing permissions and limitations under the License.
import os
-import mock
+from tests import mock
from tests.unit.docs import BaseDocsTest
from botocore.session import get_session
--- a/tests/unit/retries/test_adaptive.py
+++ b/tests/unit/retries/test_adaptive.py
@@ -1,6 +1,6 @@
from tests import unittest
-import mock
+from tests import mock
from botocore.retries import adaptive
from botocore.retries import standard
--- a/tests/unit/test_args.py
+++ b/tests/unit/test_args.py
@@ -15,7 +15,7 @@ import socket
import botocore.config
from tests import unittest
-import mock
+from tests import mock
from botocore import args
from botocore import exceptions
--- a/tests/unit/test_configloader.py
+++ b/tests/unit/test_configloader.py
@@ -14,7 +14,7 @@
# language governing permissions and limitations under the License.
from tests import unittest, BaseEnvVar
import os
-import mock
+from tests import mock
import tempfile
import shutil
--- a/tests/unit/test_history.py
+++ b/tests/unit/test_history.py
@@ -1,6 +1,6 @@
from tests import unittest
-import mock
+from tests import mock
from botocore.history import HistoryRecorder
from botocore.history import BaseHistoryHandler
--- a/tests/unit/test_idempotency.py
+++ b/tests/unit/test_idempotency.py
@@ -13,7 +13,7 @@
from tests import unittest
import re
-import mock
+from tests import mock
from botocore.handlers import generate_idempotent_uuid
--- a/tests/unit/test_retryhandler.py
+++ b/tests/unit/test_retryhandler.py
@@ -15,7 +15,7 @@
from tests import unittest
-import mock
+from tests import mock
from botocore import retryhandler
from botocore.exceptions import (
--- a/tests/unit/test_session.py
+++ b/tests/unit/test_session.py
@@ -19,7 +19,7 @@ import logging
import tempfile
import shutil
-import mock
+from tests import mock
import botocore.session
import botocore.exceptions
--- a/tests/unit/test_session_legacy.py
+++ b/tests/unit/test_session_legacy.py
@@ -19,7 +19,7 @@ import logging
import tempfile
import shutil
-import mock
+from tests import mock
import botocore.session
import botocore.exceptions
--- a/tests/unit/test_signers.py
+++ b/tests/unit/test_signers.py
@@ -10,7 +10,7 @@
# distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific
# language governing permissions and limitations under the License.
-import mock
+from tests import mock
import datetime
import json
--- a/tests/unit/test_stub.py
+++ b/tests/unit/test_stub.py
@@ -12,7 +12,7 @@
# language governing permissions and limitations under the License.
from tests import unittest
-import mock
+from tests import mock
from botocore.stub import Stubber
from botocore.exceptions import ParamValidationError, StubResponseError, UnStubbedResponseError
--- a/tests/unit/test_discovery.py
+++ b/tests/unit/test_discovery.py
@@ -1,5 +1,8 @@
import time
-from mock import Mock, call
+try:
+ from mock import Mock, call
+except ImportError:
+ from unittest.mock import Mock, call
from tests import unittest
from botocore.awsrequest import AWSRequest
--- a/tests/unit/test_endpoint.py
+++ b/tests/unit/test_endpoint.py
@@ -13,7 +13,10 @@
import socket
from tests import unittest
-from mock import Mock, patch, sentinel
+try:
+ from mock import Mock, patch, sentinel
+except ImportError:
+ from unittest.mock import Mock, patch, sentinel
from botocore.compat import six
from botocore.awsrequest import AWSRequest