Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactor: user_realm_discovery() memorizes domains not supporting URD
This would become handy when we meet B2C authority with customized domain.
  • Loading branch information
rayluo committed Oct 4, 2019
commit 3c40b7ae3eb2e4fdf05bae95d99fa66834109df5
24 changes: 15 additions & 9 deletions msal/authority.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ class Authority(object):
Once constructed, it contains members named "*_endpoint" for this instance.
TODO: It will also cache the previously-validated authority instances.
"""
_domains_without_user_realm_discovery = set([])

def __init__(self, authority_url, validate_authority=True,
verify=True, proxies=None, timeout=None,
):
Expand Down Expand Up @@ -67,17 +69,21 @@ def __init__(self, authority_url, validate_authority=True,
_, _, self.tenant = canonicalize(self.token_endpoint) # Usually a GUID
self.is_adfs = self.tenant.lower() == 'adfs'

def user_realm_discovery(self, username):
resp = requests.get(
"https://{netloc}/common/userrealm/{username}?api-version=1.0".format(
netloc=self.instance, username=username),
headers={'Accept':'application/json'},
verify=self.verify, proxies=self.proxies, timeout=self.timeout)
resp.raise_for_status()
return resp.json()
# It will typically contain "ver", "account_type",
def user_realm_discovery(self, username, response=None):
# It will typically return a dict containing "ver", "account_type",
# "federation_protocol", "cloud_audience_urn",
# "federation_metadata_url", "federation_active_auth_url", etc.
if self.instance not in self.__class__._domains_without_user_realm_discovery:
resp = response or requests.get(
"https://{netloc}/common/userrealm/{username}?api-version=1.0".format(
netloc=self.instance, username=username),
headers={'Accept':'application/json'},
verify=self.verify, proxies=self.proxies, timeout=self.timeout)
if resp.status_code != 404:
resp.raise_for_status()
return resp.json()
self.__class__._domains_without_user_realm_discovery.add(self.instance)
return {} # This can guide the caller to fall back normal ROPC flow

def canonicalize(url):
# Returns (canonicalized_url, netloc, tenant). Raises ValueError on errors.
Expand Down
20 changes: 20 additions & 0 deletions tests/test_authority.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,23 @@ def test_canonicalize_rejects_tenantless_host_with_trailing_slash(self):
with self.assertRaises(ValueError):
canonicalize("https://no.tenant.example.com/")


@unittest.skipIf(os.getenv("TRAVIS_TAG"), "Skip network io during tagged release")
class TestAuthorityInternalHelperUserRealmDiscovery(unittest.TestCase):
def test_memorize(self):
# We use a real authority so the constructor can finish tenant discovery
authority = "https://login.microsoftonline.com/common"
self.assertNotIn(authority, Authority._domains_without_user_realm_discovery)
a = Authority(authority, validate_authority=False)

# We now pretend this authority supports no User Realm Discovery
class MockResponse(object):
status_code = 404
a.user_realm_discovery("[email protected]", response=MockResponse())
self.assertIn(
"login.microsoftonline.com",
Authority._domains_without_user_realm_discovery,
"user_realm_discovery() should memorize domains not supporting URD")
a.user_realm_discovery("[email protected]",
response="This would cause exception if memorization did not work")