Skip to content
Prev Previous commit
Next Next commit
Test case for ADFS-direct
  • Loading branch information
rayluo committed Jul 20, 2019
commit 6cd1ab26ea124caecd25648b68c99b8f58db8a0f
87 changes: 79 additions & 8 deletions tests/test_token_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,30 +16,29 @@ class TokenCacheTestCase(unittest.TestCase):
@staticmethod
def build_id_token(
iss="issuer", sub="subject", aud="my_client_id", exp=None, iat=None,
preferred_username="me", **claims):
**claims): # AAD issues "preferred_username", ADFS issues "upn"
return "header.%s.signature" % base64.b64encode(json.dumps(dict({
"iss": iss,
"sub": sub,
"aud": aud,
"exp": exp or (time.time() + 100),
"iat": iat or time.time(),
"preferred_username": preferred_username,
}, **claims)).encode()).decode('utf-8')

@staticmethod
def build_response( # simulate a response from AAD
uid="uid", utid="utid", # They will form client_info
uid=None, utid=None, # If present, they will form client_info
access_token=None, expires_in=3600, token_type="some type",
refresh_token=None,
foci=None,
id_token=None, # or something generated by build_id_token()
error=None,
):
response = {
"client_info": base64.b64encode(json.dumps({
response = {}
if uid and utid: # Mimic the AAD behavior for "client_info=1" request
response["client_info"] = base64.b64encode(json.dumps({
"uid": uid, "utid": utid,
}).encode()).decode('utf-8'),
}
}).encode()).decode('utf-8')
if error:
response["error"] = error
if access_token:
Expand All @@ -59,7 +58,7 @@ def build_response( # simulate a response from AAD
def setUp(self):
self.cache = TokenCache()

def testAdd(self):
def testAddByAad(self):
client_id = "my_client_id"
id_token = self.build_id_token(
oid="object1234", preferred_username="John Doe", aud=client_id)
Expand Down Expand Up @@ -132,6 +131,78 @@ def testAdd(self):
"appmetadata-login.example.com-my_client_id")
)

def testAddByAdfs(self):
client_id = "my_client_id"
id_token = self.build_id_token(aud=client_id, upn="[email protected]")
self.cache.add({
"client_id": client_id,
"scope": ["s2", "s1", "s3"], # Not in particular order
"token_endpoint": "https://fs.msidlab8.com/adfs/oauth2/token",
"response": self.build_response(
uid=None, utid=None, # ADFS will provide no client_info
expires_in=3600, access_token="an access token",
id_token=id_token, refresh_token="a refresh token"),
}, now=1000)
self.assertEqual(
{
'cached_at': "1000",
'client_id': 'my_client_id',
'credential_type': 'AccessToken',
'environment': 'fs.msidlab8.com',
'expires_on': "4600",
'extended_expires_on': "4600",
'home_account_id': "subject.adfs",
'realm': 'adfs',
'secret': 'an access token',
'target': 's2 s1 s3',
},
self.cache._cache["AccessToken"].get(
'subject.adfs-fs.msidlab8.com-accesstoken-my_client_id-adfs-s2 s1 s3')
)
self.assertEqual(
{
'client_id': 'my_client_id',
'credential_type': 'RefreshToken',
'environment': 'fs.msidlab8.com',
'home_account_id': "subject.adfs",
'secret': 'a refresh token',
'target': 's2 s1 s3',
},
self.cache._cache["RefreshToken"].get(
'subject.adfs-fs.msidlab8.com-refreshtoken-my_client_id--s2 s1 s3')
)
self.assertEqual(
{
'home_account_id': "subject.adfs",
'environment': 'fs.msidlab8.com',
'realm': 'adfs',
'local_account_id': "subject",
'username': "[email protected]",
'authority_type': "ADFS",
},
self.cache._cache["Account"].get('subject.adfs-fs.msidlab8.com-adfs')
)
self.assertEqual(
{
'credential_type': 'IdToken',
'secret': id_token,
'home_account_id': "subject.adfs",
'environment': 'fs.msidlab8.com',
'realm': 'adfs',
'client_id': 'my_client_id',
},
self.cache._cache["IdToken"].get(
'subject.adfs-fs.msidlab8.com-idtoken-my_client_id-adfs-')
)
self.assertEqual(
{
"client_id": "my_client_id",
'environment': 'fs.msidlab8.com',
},
self.cache._cache.get("AppMetadata", {}).get(
"appmetadata-fs.msidlab8.com-my_client_id")
)


class SerializableTokenCacheTestCase(TokenCacheTestCase):
# Run all inherited test methods, and have extra check in tearDown()
Expand Down