python-pycognito/tests.py

501 lines
18 KiB
Python

import datetime
import unittest
import os.path
from unittest.mock import patch
import uuid
import freezegun
import moto
import moto.cognitoidp
import boto3
from botocore.exceptions import ParamValidationError
from botocore.stub import Stubber
from envs import env
import requests
import requests_mock
from pycognito import Cognito, UserObj, GroupObj, TokenVerificationException
from pycognito.aws_srp import AWSSRP
from pycognito.utils import RequestsSrpAuth
def _mock_authenticate_user(_, client=None, client_metadata=None):
return {
"AuthenticationResult": {
"TokenType": "admin",
"IdToken": "dummy_token",
"AccessToken": "dummy_token",
"RefreshToken": "dummy_token",
}
}
def _mock_get_params(_):
return {"USERNAME": "bob", "SRP_A": "srp"}
def _mock_verify_tokens(self, token, id_name, token_use):
if "wrong" in token:
raise TokenVerificationException
setattr(self, id_name, token)
class UserObjTestCase(unittest.TestCase):
def setUp(self):
if env("USE_CLIENT_SECRET", "False") == "True":
self.app_id = env("COGNITO_APP_WITH_SECRET_ID")
else:
self.app_id = env("COGNITO_APP_ID")
self.cognito_user_pool_id = env("COGNITO_USER_POOL_ID", "us-east-1_123456789")
self.username = env("COGNITO_TEST_USERNAME")
self.user = Cognito(
user_pool_id=self.cognito_user_pool_id,
client_id=self.app_id,
username=self.username,
)
self.user_metadata = {
"user_status": "CONFIRMED",
"username": "bjones",
}
self.user_info = [
{"Name": "name", "Value": "Brian Jones"},
{"Name": "given_name", "Value": "Brian"},
{"Name": "birthdate", "Value": "12/7/1980"},
]
def test_init(self):
user = UserObj("bjones", self.user_info, self.user, self.user_metadata)
self.assertEqual(user.username, self.user_metadata.get("username"))
self.assertEqual(user.name, self.user_info[0].get("Value"))
self.assertEqual(user.user_status, self.user_metadata.get("user_status"))
class GroupObjTestCase(unittest.TestCase):
def setUp(self):
if env("USE_CLIENT_SECRET", "False") == "True":
self.app_id = env("COGNITO_APP_WITH_SECRET_ID")
else:
self.app_id = env("COGNITO_APP_ID")
self.cognito_user_pool_id = env("COGNITO_USER_POOL_ID", "us-east-1_123456789")
self.group_data = {"GroupName": "test_group", "Precedence": 1}
self.cognito_obj = Cognito(
user_pool_id=self.cognito_user_pool_id, client_id=self.app_id
)
def test_init(self):
group = GroupObj(group_data=self.group_data, cognito_obj=self.cognito_obj)
self.assertEqual(group.group_name, "test_group")
self.assertEqual(group.precedence, 1)
class CognitoAuthTestCase(unittest.TestCase):
def setUp(self):
if env("USE_CLIENT_SECRET") == "True":
self.app_id = env("COGNITO_APP_WITH_SECRET_ID", "app")
self.client_secret = env("COGNITO_CLIENT_SECRET")
else:
self.app_id = env("COGNITO_APP_ID", "app")
self.client_secret = None
self.cognito_user_pool_id = env("COGNITO_USER_POOL_ID", "us-east-1_123456789")
self.username = env("COGNITO_TEST_USERNAME", "bob")
self.password = env("COGNITO_TEST_PASSWORD", "bobpassword")
self.user = Cognito(
self.cognito_user_pool_id,
self.app_id,
username=self.username,
client_secret=self.client_secret,
)
@patch("pycognito.aws_srp.AWSSRP.authenticate_user", _mock_authenticate_user)
@patch("pycognito.Cognito.verify_token", _mock_verify_tokens)
def test_authenticate(self):
self.user.authenticate(self.password)
self.assertNotEqual(self.user.access_token, None)
self.assertNotEqual(self.user.id_token, None)
self.assertNotEqual(self.user.refresh_token, None)
@patch("pycognito.aws_srp.AWSSRP.authenticate_user", _mock_authenticate_user)
@patch("pycognito.Cognito.verify_token", _mock_verify_tokens)
def test_verify_token(self):
self.user.authenticate(self.password)
bad_access_token = "{}wrong".format(self.user.access_token)
with self.assertRaises(TokenVerificationException):
self.user.verify_token(bad_access_token, "access_token", "access")
@patch("pycognito.Cognito", autospec=True)
def test_register(self, cognito_user):
user = cognito_user(
self.cognito_user_pool_id, self.app_id, username=self.username
)
base_attr = dict(
given_name="Brian",
family_name="Jones",
name="Brian Jones",
email="bjones39@capless.io",
phone_number="+19194894555",
gender="Male",
preferred_username="billyocean",
)
user.set_base_attributes(**base_attr)
user.register("sampleuser", "sample4#Password")
@patch("pycognito.aws_srp.AWSSRP.authenticate_user", _mock_authenticate_user)
@patch("pycognito.Cognito.verify_token", _mock_verify_tokens)
@patch("pycognito.Cognito._add_secret_hash", return_value=None)
def test_renew_tokens(self, _):
stub = Stubber(self.user.client)
# By the stubber nature, we need to add the sequence
# of calls for the AWS SRP auth to test the whole process
stub.add_response(
method="initiate_auth",
service_response={
"AuthenticationResult": {
"TokenType": "admin",
"IdToken": "dummy_token",
"AccessToken": "dummy_token",
"RefreshToken": "dummy_token",
},
"ResponseMetadata": {"HTTPStatusCode": 200},
},
expected_params={
"ClientId": self.app_id,
"AuthFlow": "REFRESH_TOKEN_AUTH",
"AuthParameters": {"REFRESH_TOKEN": "dummy_token"},
},
)
with stub:
self.user.authenticate(self.password)
self.user.renew_access_token()
stub.assert_no_pending_responses()
@patch("pycognito.Cognito", autospec=True)
def test_update_profile(self, cognito_user):
user = cognito_user(
self.cognito_user_pool_id, self.app_id, username=self.username
)
user.authenticate(self.password)
user.update_profile({"given_name": "Jenkins"})
def test_admin_get_user(self):
stub = Stubber(self.user.client)
stub.add_response(
method="admin_get_user",
service_response={
"Enabled": True,
"UserStatus": "CONFIRMED",
"Username": self.username,
"UserAttributes": [],
},
expected_params={
"UserPoolId": self.cognito_user_pool_id,
"Username": self.username,
},
)
with stub:
u = self.user.admin_get_user()
self.assertEqual(u.username, self.username)
stub.assert_no_pending_responses()
def test_check_token(self):
# This is a sample JWT with an expiration time set to January, 1st, 3000
self.user.access_token = (
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG"
"9lIiwiaWF0IjoxNTE2MjM5MDIyLCJleHAiOjMyNTAzNjgwMDAwfQ.C-1gPxrhUsiWeCvMvaZuuQYarkDNAc"
"pEGJPIqu_SrKQ"
)
try:
valid = self.user.check_token()
except OverflowError:
self.skipTest("This test requires 64-bit time_t")
else:
self.assertFalse(valid)
@patch("pycognito.Cognito", autospec=True)
def test_validate_verification(self, cognito_user):
u = cognito_user(self.cognito_user_pool_id, self.app_id, username=self.username)
u.validate_verification("4321")
@patch("pycognito.Cognito", autospec=True)
def test_confirm_forgot_password(self, cognito_user):
u = cognito_user(self.cognito_user_pool_id, self.app_id, username=self.username)
u.confirm_forgot_password("4553", "samplepassword")
with self.assertRaises(TypeError):
u.confirm_forgot_password(self.password)
@patch("pycognito.aws_srp.AWSSRP.authenticate_user", _mock_authenticate_user)
@patch("pycognito.Cognito.verify_token", _mock_verify_tokens)
@patch("pycognito.Cognito.check_token", return_value=True)
def test_change_password(self, _):
# u = cognito_user(self.cognito_user_pool_id, self.app_id,
# username=self.username)
self.user.authenticate(self.password)
stub = Stubber(self.user.client)
stub.add_response(
method="change_password",
service_response={"ResponseMetadata": {"HTTPStatusCode": 200}},
expected_params={
"PreviousPassword": self.password,
"ProposedPassword": "crazypassword$45DOG",
"AccessToken": self.user.access_token,
},
)
with stub:
self.user.change_password(self.password, "crazypassword$45DOG")
stub.assert_no_pending_responses()
with self.assertRaises(ParamValidationError):
self.user.change_password(self.password, None)
def test_set_attributes(self):
user = Cognito(self.cognito_user_pool_id, self.app_id)
user._set_attributes(
{"ResponseMetadata": {"HTTPStatusCode": 200}}, {"somerandom": "attribute"}
)
self.assertEqual(user.somerandom, "attribute")
@patch("pycognito.Cognito.verify_token", _mock_verify_tokens)
def test_admin_authenticate(self):
stub = Stubber(self.user.client)
# By the stubber nature, we need to add the sequence
# of calls for the AWS SRP auth to test the whole process
stub.add_response(
method="admin_initiate_auth",
service_response={
"AuthenticationResult": {
"TokenType": "admin",
"IdToken": "dummy_token",
"AccessToken": "dummy_token",
"RefreshToken": "dummy_token",
}
},
expected_params={
"UserPoolId": self.cognito_user_pool_id,
"ClientId": self.app_id,
"AuthFlow": "ADMIN_NO_SRP_AUTH",
"AuthParameters": {
"USERNAME": self.username,
"PASSWORD": self.password,
},
},
)
with stub:
self.user.admin_authenticate(self.password)
self.assertNotEqual(self.user.access_token, None)
self.assertNotEqual(self.user.id_token, None)
self.assertNotEqual(self.user.refresh_token, None)
stub.assert_no_pending_responses()
class AWSSRPTestCase(unittest.TestCase):
def setUp(self):
if env("USE_CLIENT_SECRET") == "True":
self.client_secret = env("COGNITO_CLIENT_SECRET")
self.app_id = env("COGNITO_APP_WITH_SECRET_ID", "app")
else:
self.app_id = env("COGNITO_APP_ID", "app")
self.client_secret = None
self.cognito_user_pool_id = env("COGNITO_USER_POOL_ID", "us-east-1_123456789")
self.username = env("COGNITO_TEST_USERNAME")
self.password = env("COGNITO_TEST_PASSWORD")
self.aws = AWSSRP(
username=self.username,
password=self.password,
pool_region="us-east-1",
pool_id=self.cognito_user_pool_id,
client_id=self.app_id,
client_secret=self.client_secret,
)
def tearDown(self):
del self.aws
@patch("pycognito.aws_srp.AWSSRP.get_auth_params", _mock_get_params)
@patch("pycognito.aws_srp.AWSSRP.process_challenge", return_value={})
def test_authenticate_user(self, _):
stub = Stubber(self.aws.client)
# By the stubber nature, we need to add the sequence
# of calls for the AWS SRP auth to test the whole process
stub.add_response(
method="initiate_auth",
service_response={
"ChallengeName": "PASSWORD_VERIFIER",
"ChallengeParameters": {},
},
expected_params={
"AuthFlow": "USER_SRP_AUTH",
"AuthParameters": _mock_get_params(None),
"ClientId": self.app_id,
},
)
stub.add_response(
method="respond_to_auth_challenge",
service_response={
"AuthenticationResult": {
"IdToken": "dummy_token",
"AccessToken": "dummy_token",
"RefreshToken": "dummy_token",
}
},
expected_params={
"ClientId": self.app_id,
"ChallengeName": "PASSWORD_VERIFIER",
"ChallengeResponses": {},
},
)
with stub:
tokens = self.aws.authenticate_user()
self.assertTrue("IdToken" in tokens["AuthenticationResult"])
self.assertTrue("AccessToken" in tokens["AuthenticationResult"])
self.assertTrue("RefreshToken" in tokens["AuthenticationResult"])
stub.assert_no_pending_responses()
def test_cognito_formatted_timestamp(self):
self.assertEqual(
self.aws.get_cognito_formatted_timestamp(
datetime.datetime(2022, 1, 1, 0, 0, 0)
),
"Sat Jan 1 00:00:00 UTC 2022",
)
self.assertEqual(
self.aws.get_cognito_formatted_timestamp(
datetime.datetime(2022, 1, 2, 12, 0, 0)
),
"Sun Jan 2 12:00:00 UTC 2022",
)
self.assertEqual(
self.aws.get_cognito_formatted_timestamp(
datetime.datetime(2022, 1, 3, 9, 0, 0)
),
"Mon Jan 3 09:00:00 UTC 2022",
)
self.assertEqual(
self.aws.get_cognito_formatted_timestamp(
datetime.datetime(2022, 12, 31, 23, 59, 59)
),
"Sat Dec 31 23:59:59 UTC 2022",
)
@moto.mock_aws
class UtilsTestCase(unittest.TestCase):
username = "bob@test.com"
password = "Test1234!"
def setUp(self) -> None:
cognitoidp_client = boto3.client("cognito-idp", region_name="us-east-1")
user_pool = cognitoidp_client.create_user_pool(
PoolName="pycognito-test-pool",
AliasAttributes=[
"email",
],
UsernameAttributes=[
"email",
],
)
self.user_pool_id = user_pool["UserPool"]["Id"]
user_pool_client = cognitoidp_client.create_user_pool_client(
UserPoolId=self.user_pool_id,
ClientName="test-client",
RefreshTokenValidity=1,
AccessTokenValidity=1,
IdTokenValidity=1,
TokenValidityUnits={
"AccessToken": "hour",
"IdToken": "hour",
"RefreshToken": "days",
},
)
self.client_id = user_pool_client["UserPoolClient"]["ClientId"]
cognitoidp_client.admin_create_user(
UserPoolId=self.user_pool_id,
Username=self.username,
TemporaryPassword=self.password,
MessageAction="SUPPRESS",
)
cognitoidp_client.admin_set_user_password(
UserPoolId=self.user_pool_id,
Username=self.username,
Password=self.password,
Permanent=True,
)
@requests_mock.Mocker()
def test_srp_requests_http_auth(self, m):
# Get Moto's static public jwks
jwks_public_key_filename = os.path.join(
os.path.dirname(moto.cognitoidp.__file__), "resources/jwks-public.json"
)
jwks_public_key_f = open(jwks_public_key_filename, "rb")
# Create some test data
test_data = str(uuid.uuid4())
# Mock a test endpoint. We pretend this endpoint would require an Authorization header
m.get("http://test.com", text=test_data)
# Pycognito will automatically verify the token it receives. Mock the proper endpoint and return the static
# key from above
m.get(
f"https://cognito-idp.us-east-1.amazonaws.com/{self.user_pool_id}/.well-known/jwks.json",
body=jwks_public_key_f,
)
now = datetime.datetime.utcnow()
# Standup the actual Requests plugin
srp_auth = RequestsSrpAuth(
username=self.username,
password=self.password,
user_pool_id=self.user_pool_id,
user_pool_region="us-east-1",
client_id=self.client_id,
)
# Make the actual request
req = requests.get("http://test.com", auth=srp_auth)
req.raise_for_status()
# Ensure the data returns matches the mocked endpoint
self.assertEqual(test_data, req.text)
# Get the access token used
access_token_orig = srp_auth.cognito_client.access_token
# Make a second request with a time 2 hours in the future
with freezegun.freeze_time(now + datetime.timedelta(hours=2)):
req = requests.get("http://test.com", auth=srp_auth)
req.raise_for_status()
access_token_new = srp_auth.cognito_client.access_token
# Check that the access token was refreshed to a new one
self.assertNotEqual(access_token_orig, access_token_new)
if __name__ == "__main__":
unittest.main()