Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
B2C implementation
  • Loading branch information
rayluo committed Oct 4, 2019
commit 7b2afc068409e99e399e525beac390ec38fa7081
41 changes: 25 additions & 16 deletions msal/authority.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import re
try:
from urllib.parse import urlparse
except ImportError: # Fall back to Python 2
from urlparse import urlparse
import logging

import requests
Expand Down Expand Up @@ -39,17 +42,13 @@ def __init__(self, authority_url, validate_authority=True,
self.verify = verify
self.proxies = proxies
self.timeout = timeout
canonicalized, self.instance, tenant = canonicalize(authority_url)
tenant_discovery_endpoint = (
'https://{}/{}{}/.well-known/openid-configuration'.format(
self.instance,
tenant,
"" if tenant == "adfs" else "/v2.0" # the AAD v2 endpoint
))
if (tenant != "adfs" and validate_authority
authority, self.instance, tenant = canonicalize(authority_url)
is_b2c = self.instance.endswith(".b2clogin.com")
if (tenant != "adfs" and (not is_b2c) and validate_authority
and self.instance not in WELL_KNOWN_AUTHORITY_HOSTS):
payload = instance_discovery(
canonicalized + "/oauth2/v2.0/authorize",
"https://{}{}/oauth2/v2.0/authorize".format(
self.instance, authority.path),
verify=verify, proxies=proxies, timeout=timeout)
if payload.get("error") == "invalid_instance":
raise ValueError(
Expand All @@ -60,6 +59,13 @@ def __init__(self, authority_url, validate_authority=True,
"validate_authority=False"
% authority_url)
tenant_discovery_endpoint = payload['tenant_discovery_endpoint']
else:
tenant_discovery_endpoint = (
'https://{}{}{}/.well-known/openid-configuration'.format(
self.instance,
authority.path, # In B2C scenario, it is "/tenant/policy"
"" if tenant == "adfs" else "/v2.0" # the AAD v2 endpoint
))
openid_config = tenant_discovery(
tenant_discovery_endpoint,
verify=verify, proxies=proxies, timeout=timeout)
Expand All @@ -85,15 +91,18 @@ def user_realm_discovery(self, username, response=None):
self.__class__._domains_without_user_realm_discovery.add(self.instance)
return {} # This can guide the caller to fall back normal ROPC flow

def canonicalize(url):
# Returns (canonicalized_url, netloc, tenant). Raises ValueError on errors.
match_object = re.match(r'https://([^/]+)/([^/?#]+)', url.lower())
if not match_object:

def canonicalize(authority_url):
authority = urlparse(authority_url)
parts = authority.path.split("/")
if authority.scheme != "https" or len(parts) < 2 or not parts[1]:
raise ValueError(
"Your given address (%s) should consist of "
"an https url with a minimum of one segment in a path: e.g. "
"https://login.microsoftonline.com/<tenant_name>" % url)
return match_object.group(0), match_object.group(1), match_object.group(2)
"https://login.microsoftonline.com/<tenant> "
"or https://<tenant_name>.b2clogin.com/<tenant_name>.onmicrosoft.com/policy"
% authority_url)
return authority, authority.netloc, parts[1]

def instance_discovery(url, **kwargs):
return requests.get( # Note: This URL seemingly returns V1 endpoint only
Expand Down
18 changes: 9 additions & 9 deletions tests/test_authority.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,19 +40,19 @@ def test_invalid_host_skipping_validation_meets_connection_error_down_the_road(s
class TestAuthorityInternalHelperCanonicalize(unittest.TestCase):

def test_canonicalize_tenant_followed_by_extra_paths(self):
self.assertEqual(
canonicalize("https://example.com/tenant/subpath?foo=bar#fragment"),
("https://example.com/tenant", "example.com", "tenant"))
_, i, t = canonicalize("https://example.com/tenant/subpath?foo=bar#fragment")
self.assertEqual("example.com", i)
self.assertEqual("tenant", t)

def test_canonicalize_tenant_followed_by_extra_query(self):
self.assertEqual(
canonicalize("https://example.com/tenant?foo=bar#fragment"),
("https://example.com/tenant", "example.com", "tenant"))
_, i, t = canonicalize("https://example.com/tenant?foo=bar#fragment")
self.assertEqual("example.com", i)
self.assertEqual("tenant", t)

def test_canonicalize_tenant_followed_by_extra_fragment(self):
self.assertEqual(
canonicalize("https://example.com/tenant#fragment"),
("https://example.com/tenant", "example.com", "tenant"))
_, i, t = canonicalize("https://example.com/tenant#fragment")
self.assertEqual("example.com", i)
self.assertEqual("tenant", t)

def test_canonicalize_rejects_non_https(self):
with self.assertRaises(ValueError):
Expand Down
98 changes: 81 additions & 17 deletions tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,21 @@
logging.basicConfig(level=logging.INFO)


def _get_app_and_auth_code(
client_id,
client_secret=None,
authority="https://login.microsoftonline.com/common",
port=44331,
scopes=["https://graph.windows.net/.default"],
):
from msal.oauth2cli.authcode import obtain_auth_code
app = msal.ClientApplication(client_id, client_secret, authority=authority)
redirect_uri = "http://localhost:%d" % port
ac = obtain_auth_code(port, auth_uri=app.get_authorization_request_url(
scopes, redirect_uri=redirect_uri))
assert ac is not None
return (app, ac, redirect_uri)

@unittest.skipIf(os.getenv("TRAVIS_TAG"), "Skip e2e tests during tagged release")
class E2eTestCase(unittest.TestCase):

Expand Down Expand Up @@ -49,9 +64,15 @@ def assertCacheWorksForUser(self, result_from_wire, scope, username=None):
result_from_cache = self.app.acquire_token_silent(scope, account=account)
self.assertIsNotNone(result_from_cache,
"We should get a result from acquire_token_silent(...) call")
self.assertNotEqual(
result_from_wire['access_token'], result_from_cache['access_token'],
"We should get a fresh AT (via RT)")
self.assertIsNotNone(
# We used to assert it this way:
# result_from_wire['access_token'] != result_from_cache['access_token']
# but ROPC in B2C tends to return the same AT we obtained seconds ago.
# Now looking back, "refresh_token grant would return a brand new AT"
# was just an empirical observation but never a committment in specs,
# so we adjust our way to assert here.
(result_from_cache or {}).get("access_token"),
"We should get an AT from acquire_token_silent(...) call")

def assertCacheWorksForApp(self, result_from_wire, scope):
# Going to test acquire_token_silent(...) to locate an AT from cache
Expand All @@ -70,7 +91,10 @@ def _test_username_password(self,
username, password, scopes=scope)
self.assertLoosely(result)
# self.assertEqual(None, result.get("error"), str(result))
self.assertCacheWorksForUser(result, scope, username=username)
self.assertCacheWorksForUser(
result, scope,
username=username if ".b2clogin.com" not in authority else None,
)


THIS_FOLDER = os.path.dirname(__file__)
Expand All @@ -95,23 +119,17 @@ def test_username_password(self):
self._test_username_password(**self.config)

def _get_app_and_auth_code(self):
from msal.oauth2cli.authcode import obtain_auth_code
app = msal.ClientApplication(
return _get_app_and_auth_code(
self.config["client_id"],
client_credential=self.config.get("client_secret"),
authority=self.config.get("authority"))
port = self.config.get("listen_port", 44331)
redirect_uri = "http://localhost:%s" % port
auth_request_uri = app.get_authorization_request_url(
self.config["scope"], redirect_uri=redirect_uri)
ac = obtain_auth_code(port, auth_uri=auth_request_uri)
self.assertNotEqual(ac, None)
return (app, ac, redirect_uri)
client_secret=self.config.get("client_secret"),
authority=self.config.get("authority"),
port=self.config.get("listen_port", 44331),
scopes=self.config["scope"],
)

def test_auth_code(self):
self.skipUnlessWithConfig(["client_id", "scope"])
(self.app, ac, redirect_uri) = self._get_app_and_auth_code()

result = self.app.acquire_token_by_authorization_code(
ac, self.config["scope"], redirect_uri=redirect_uri)
logger.debug("%s.cache = %s",
Expand Down Expand Up @@ -314,7 +332,7 @@ def get_lab_user_secret(cls, lab_name="msidlab4"):
lab_name = lab_name.lower()
if lab_name not in cls._secrets:
logger.info("Querying lab user password for %s", lab_name)
# Note: Short link won't work "https://aka.ms/GetLabUserSecret?Secret=%s"
# Short link only works in browser "https://aka.ms/GetLabUserSecret?Secret=%s"
# So we use the official link written in here
# https://microsoft.sharepoint-df.com/teams/MSIDLABSExtended/SitePages/Programmatically-accessing-LAB-API%27s.aspx
url = ("https://request.msidlab.com/api/GetLabUserSecret?code=KpY5uCcoKo0aW8VOL/CUO3wnu9UF2XbSnLFGk56BDnmQiwD80MQ7HA==&Secret=%s"
Expand Down Expand Up @@ -417,3 +435,49 @@ def test_acquire_token_obo(self):
result = cca.acquire_token_silent(downstream_scopes, account)
self.assertEqual(cca_result["access_token"], result["access_token"])

def _build_b2c_authority(self, policy):
base = "https://msidlabb2c.b2clogin.com/msidlabb2c.onmicrosoft.com"
return base + "/" + policy # We do not support base + "?p=" + policy

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's probably fine as supporting B2C in MSAL.Python it's a new feature.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be clear, that inline comment is inside an internal test case, so it is more like a remind-to-ourselves, but did not mean it as a limitation or deficiency. In fact, currently all the public materials we can find, uses the "base/policy" format, rather than "base?p=policy" format. Such as:

That being said, I did heard from a previous internal presentation that the "base?p=policy" format would also work. We (the entire MSAL fleet) might actually want to switch to that format at a later time, because the current way - putting policy inside authority - has some intrinsic issues, but that is a different topic.


@unittest.skipIf(os.getenv("TRAVIS"), "Browser automation is not yet implemented")
def test_b2c_acquire_token_by_auth_code(self):
"""
When prompted, you can manually login using this account:

username="[email protected]"
# This won't work https://msidlab.com/api/user?usertype=b2c
password="***" # From https://aka.ms/GetLabUserSecret?Secret=msidlabb2c
"""
scopes = ["https://msidlabb2c.onmicrosoft.com/msaapp/user_impersonation"]
(self.app, ac, redirect_uri) = _get_app_and_auth_code(
"b876a048-55a5-4fc5-9403-f5d90cb1c852",
client_secret=self.get_lab_user_secret("MSIDLABB2C-MSAapp-AppSecret"),
authority=self._build_b2c_authority("B2C_1_SignInPolicy"),
port=3843, # Lab defines 4 of them: [3843, 4584, 4843, 60000]
scopes=scopes,
)
result = self.app.acquire_token_by_authorization_code(
ac, scopes, redirect_uri=redirect_uri)
logger.debug(
"%s: cache = %s, id_token_claims = %s",
self.id(),
json.dumps(self.app.token_cache._cache, indent=4),
json.dumps(result.get("id_token_claims"), indent=4),
)
self.assertIn(
"access_token", result,
"{error}: {error_description}".format(
# Note: No interpolation here, cause error won't always present
error=result.get("error"),
error_description=result.get("error_description")))
self.assertCacheWorksForUser(result, scopes, username=None)

def test_b2c_acquire_token_by_ropc(self):
self._test_username_password(
authority=self._build_b2c_authority("B2C_1_ROPC_Auth"),
client_id="e3b9ad76-9763-4827-b088-80c7a7888f79",
username="[email protected]",
password=self.get_lab_user_secret("msidlabb2c"),
scope=["https://msidlabb2c.onmicrosoft.com/msidlabb2capi/read"],
)