Skip to content
Merged
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
281 changes: 179 additions & 102 deletions tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,12 @@ def setUpClass(cls):
def tearDownClass(cls):
cls.session.close()

@classmethod
def get_lab_app_object(cls, **query ): # https://msidlab.com/swagger/index.html
url = "https://msidlab.com/api/app"
resp = cls.session.get(url, params=query)
return resp.json()[0]

@classmethod
def get_lab_user_secret(cls, lab_name="msidlab4"):
lab_name = lab_name.lower()
Expand All @@ -336,67 +342,31 @@ def get_lab_user_secret(cls, lab_name="msidlab4"):
def get_lab_user(cls, **query): # https://docs.msidlab.com/labapi/userapi.html
resp = cls.session.get("https://msidlab.com/api/user", params=query)
result = resp.json()[0]
authority_base = "https://login.microsoftonline.com/"
graph_endpoint = "https://graph.microsoft.com/.default"
if "azureenvironment" in query and query["azureenvironment"] == "azureusgovernment":
authority_base = "https://login.microsoftonline.us/"
graph_endpoint = "https://graph.microsoft.us/.default"
return { # Mapping lab API response to our simplified configuration format
"authority": "https://login.microsoftonline.com/{}.onmicrosoft.com".format(
result["labName"]),
"authority": authority_base + result["tenantID"],
"client_id": result["appId"],
"username": result["upn"],
"lab_name": result["labName"],
"scope": ["https://graph.microsoft.com/.default"],
"scope": [graph_endpoint],
}

def test_aad_managed_user(self): # Pure cloud
config = self.get_lab_user(usertype="cloud")
self._test_username_password(
password=self.get_lab_user_secret(config["lab_name"]), **config)
def _test_username_password_lab(self, config):
self._test_username_password(**config)

def test_adfs4_fed_user(self):
config = self.get_lab_user(usertype="federated", federationProvider="ADFSv4")
self._test_username_password(
password=self.get_lab_user_secret(config["lab_name"]), **config)

def test_adfs3_fed_user(self):
config = self.get_lab_user(usertype="federated", federationProvider="ADFSv3")
self._test_username_password(
password=self.get_lab_user_secret(config["lab_name"]), **config)

def test_adfs2_fed_user(self):
config = self.get_lab_user(usertype="federated", federationProvider="ADFSv2")
self._test_username_password(
password=self.get_lab_user_secret(config["lab_name"]), **config)

def test_adfs2019_fed_user(self):
config = self.get_lab_user(usertype="federated", federationProvider="ADFSv2019")
self._test_username_password(
password=self.get_lab_user_secret(config["lab_name"]), **config)

def test_ropc_adfs2019_onprem(self):
config = self.get_lab_user(usertype="onprem", federationProvider="ADFSv2019")
config["authority"] = "https://fs.%s.com/adfs" % config["lab_name"]
config["client_id"] = "PublicClientId"
config["scope"] = self.adfs2019_scopes
self._test_username_password(
password=self.get_lab_user_secret(config["lab_name"]), **config)

@unittest.skipIf(os.getenv("TRAVIS"), "Browser automation is not yet implemented")
def test_adfs2019_onprem_acquire_token_by_auth_code(self):
"""When prompted, you can manually login using this account:

# https://msidlab.com/api/user?usertype=onprem&federationprovider=ADFSv2019
username = "..." # The upn from the link above
password="***" # From https://aka.ms/GetLabUserSecret?Secret=msidlabXYZ
"""
scopes = self.adfs2019_scopes
config = self.get_lab_user(usertype="onprem", federationProvider="ADFSv2019")
def _test_acquire_token_by_auth_code_lab(self, config):
(self.app, ac, redirect_uri) = _get_app_and_auth_code(
# Configuration is derived from https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.7.0/tests/Microsoft.Identity.Test.Common/TestConstants.cs#L250-L259
"PublicClientId",
authority="https://fs.%s.com/adfs" % config["lab_name"],
port=8080,
scopes=scopes,
config["client_id"],
authority=config["authority"],
port=config["port"],
scopes=config["scope"],
)
result = self.app.acquire_token_by_authorization_code(
ac, scopes, redirect_uri=redirect_uri)
ac, config["scopes"], redirect_uri=redirect_uri)
logger.debug(
"%s: cache = %s, id_token_claims = %s",
self.id(),
Expand All @@ -409,41 +379,32 @@ def test_adfs2019_onprem_acquire_token_by_auth_code(self):
# Note: No interpolation here, cause error won't always present
error=result.get("error"),
error_description=result.get("error_description")))
self.assertCacheWorksForUser(result, scopes, username=None)

@unittest.skipUnless(
os.getenv("LAB_OBO_CLIENT_SECRET"),
"Need LAB_OBO_CLIENT SECRET from https://msidlabs.vault.azure.net/secrets/TodoListServiceV2-OBO/c58ba97c34ca4464886943a847d1db56")
def test_acquire_token_obo(self):
# Some hardcoded, pre-defined settings
obo_client_id = "f4aa5217-e87c-42b2-82af-5624dd14ee72"
downstream_scopes = ["https://graph.microsoft.com/.default"]
config = self.get_lab_user(usertype="cloud")
self.assertCacheWorksForUser(result, config["scopes"], username=None)

def _test_acquire_token_obo_lab(self, config_pca, config_cca):
# 1. An app obtains a token representing a user, for our mid-tier service
pca = msal.PublicClientApplication(
"c0485386-1e9a-4663-bc96-7ab30656de7f", authority=config.get("authority"))
config_pca["client_id"], authority=config_pca["authority"])
pca_result = pca.acquire_token_by_username_password(
config["username"],
self.get_lab_user_secret(config["lab_name"]),
scopes=[ # The OBO app's scope. Yours might be different.
"api://%s/read" % obo_client_id],
config_pca["username"],
config_pca["password"],
scopes=config_pca["scope"],
)
self.assertIsNotNone(
pca_result.get("access_token"),
"PCA failed to get AT because %s" % json.dumps(pca_result, indent=2))

# 2. Our mid-tier service uses OBO to obtain a token for downstream service
cca = msal.ConfidentialClientApplication(
obo_client_id,
client_credential=os.getenv("LAB_OBO_CLIENT_SECRET"),
authority=config.get("authority"),
config_cca["client_id"],
client_credential=config_cca["client_secret"],
authority=config_cca["authority"],
# token_cache= ..., # Default token cache is all-tokens-store-in-memory.
# That's fine if OBO app uses short-lived msal instance per session.
# Otherwise, the OBO app need to implement a one-cache-per-user setup.
)
cca_result = cca.acquire_token_on_behalf_of(
pca_result['access_token'], downstream_scopes)
pca_result['access_token'], config_cca["scope"])
self.assertNotEqual(None, cca_result.get("access_token"), str(cca_result))

# 3. Now the OBO app can simply store downstream token(s) in same session.
Expand All @@ -453,12 +414,116 @@ def test_acquire_token_obo(self):
# Assuming you already did that (which is not shown in this test case),
# the following part shows one of the ways to obtain an AT from cache.
username = cca_result.get("id_token_claims", {}).get("preferred_username")
self.assertEqual(config["username"], username)
self.assertEqual(config_cca["username"], username)
if username: # A precaution so that we won't use other user's token
account = cca.get_accounts(username=username)[0]
result = cca.acquire_token_silent(downstream_scopes, account)
result = cca.acquire_token_silent(config_cca["scope"], account)
self.assertEqual(cca_result["access_token"], result["access_token"])

def _test_acquire_token_by_client_secret_lab(self, config):
app = msal.ConfidentialClientApplication(config["client_id"],
client_credential=config["client_secret"],
authority=config["authority"],
)
result = app.acquire_token_for_client(config["scope"])
self.assertNotEqual(None, result.get("access_token"), str(result))

def _test_acquire_token_device_code_lab(self, config):
self.app = msal.PublicClientApplication(
config['client_id'], authority=config["authority"])
flow = self.app.initiate_device_flow(scopes=config["scope"])
assert "user_code" in flow, "DF does not seem to be provisioned: %s".format(
json.dumps(flow, indent=4))
logger.info(flow["message"])

duration = 60
logger.info("We will wait up to %d seconds for you to sign in" % duration)
flow["expires_at"] = min( # Shorten the time for quick test
flow["expires_at"], time.time() + duration)
result = self.app.acquire_token_by_device_flow(flow)
self.assertLoosely( # It will skip this test if there is no user interaction
result,
assertion=lambda: self.assertIn('access_token', result),
skippable_errors=self.app.client.DEVICE_FLOW_RETRIABLE_ERRORS)
if "access_token" not in result:
self.skip("End user did not complete Device Flow in time")
self.assertCacheWorksForUser(result, config["scope"], username=None)
result["access_token"] = result["refresh_token"] = "************"
logger.info(
"%s obtained tokens: %s", self.id(), json.dumps(result, indent=4))


class WorldWideTestCase(LabBasedTestCase):

def test_aad_managed_user(self): # Pure cloud
config = self.get_lab_user(usertype="cloud")
config["password"] = self.get_lab_user_secret(config["lab_name"])
self._test_username_password_lab(config)

def test_adfs4_fed_user(self):
config = self.get_lab_user(usertype="federated", federationProvider="ADFSv4")
config["password"] = self.get_lab_user_secret(config["lab_name"])
self._test_username_password_lab(config)

def test_adfs3_fed_user(self):
config = self.get_lab_user(usertype="federated", federationProvider="ADFSv3")
config["password"] = self.get_lab_user_secret(config["lab_name"])
self._test_username_password_lab(config)

def test_adfs2_fed_user(self):
config = self.get_lab_user(usertype="federated", federationProvider="ADFSv2")
config["password"] = self.get_lab_user_secret(config["lab_name"])
self._test_username_password_lab(config)

def test_adfs2019_fed_user(self):
config = self.get_lab_user(usertype="federated", federationProvider="ADFSv2019")
config["password"] = self.get_lab_user_secret(config["lab_name"])
self._test_username_password_lab(config)

def test_ropc_adfs2019_onprem(self):
# Configuration is derived from https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.7.0/tests/Microsoft.Identity.Test.Common/TestConstants.cs#L250-L259
config = self.get_lab_user(usertype="onprem", federationProvider="ADFSv2019")
config["authority"] = "https://fs.%s.com/adfs" % config["lab_name"]
config["client_id"] = "PublicClientId"
config["scope"] = self.adfs2019_scopes
config["password"] = self.get_lab_user_secret(config["lab_name"])
self._test_username_password_lab(config)

@unittest.skipIf(os.getenv("TRAVIS"), "Browser automation is not yet implemented")
def test_adfs2019_onprem_acquire_token_by_auth_code(self):
"""When prompted, you can manually login using this account:

# https://msidlab.com/api/user?usertype=onprem&federationprovider=ADFSv2019
username = "..." # The upn from the link above
password="***" # From https://aka.ms/GetLabUserSecret?Secret=msidlabXYZ
"""
config = self.get_lab_user(usertype="onprem", federationProvider="ADFSv2019")
config["authority"] = "https://fs.%s.com/adfs" % config["lab_name"]
config["client_id"] = "PublicClientId"
config["scope"] = self.adfs2019_scopes
config["port"] = 8080
self._test_acquire_token_by_auth_code_lab(config)

@unittest.skipUnless(
os.getenv("LAB_OBO_CLIENT_SECRET"),
"Need LAB_OBO_CLIENT SECRET from https://msidlabs.vault.azure.net/secrets/TodoListServiceV2-OBO/c58ba97c34ca4464886943a847d1db56")
def test_acquire_token_obo(self):
config = self.get_lab_user(usertype="cloud")

config_cca = {}
config_cca.update(config)
config_cca["client_id"] = "f4aa5217-e87c-42b2-82af-5624dd14ee72"
config_cca["scope"] = ["https://graph.microsoft.com/.default"]
config_cca["client_secret"] = os.getenv("LAB_OBO_CLIENT_SECRET")

config_pca = {}
config_pca.update(config)
config_pca["client_id"] = "c0485386-1e9a-4663-bc96-7ab30656de7f"
config_pca["password"] = self.get_lab_user_secret(config_pca["lab_name"])
config_pca["scope"] = ["api://%s/read" % config_cca["client_id"]]

self._test_acquire_token_obo_lab(config_pca, config_cca)

def _build_b2c_authority(self, policy):
base = "https://msidlabb2c.b2clogin.com/msidlabb2c.onmicrosoft.com"
return base + "/" + policy # We do not support base + "?p=" + policy
Expand All @@ -472,38 +537,50 @@ def test_b2c_acquire_token_by_auth_code(self):
# This won't work https://msidlab.com/api/user?usertype=b2c
password="***" # From https://aka.ms/GetLabUserSecret?Secret=msidlabb2c
"""
scopes = ["https://msidlabb2c.onmicrosoft.com/msaapp/user_impersonation"]
(self.app, ac, redirect_uri) = _get_app_and_auth_code(
"b876a048-55a5-4fc5-9403-f5d90cb1c852",
client_secret=self.get_lab_user_secret("MSIDLABB2C-MSAapp-AppSecret"),
authority=self._build_b2c_authority("B2C_1_SignInPolicy"),
port=3843, # Lab defines 4 of them: [3843, 4584, 4843, 60000]
scopes=scopes,
)
result = self.app.acquire_token_by_authorization_code(
ac, scopes, redirect_uri=redirect_uri)
logger.debug(
"%s: cache = %s, id_token_claims = %s",
self.id(),
json.dumps(self.app.token_cache._cache, indent=4),
json.dumps(result.get("id_token_claims"), indent=4),
)
self.assertIn(
"access_token", result,
"{error}: {error_description}".format(
# Note: No interpolation here, cause error won't always present
error=result.get("error"),
error_description=result.get("error_description")))
self.assertCacheWorksForUser(result, scopes, username=None)
config = {"authority": self._build_b2c_authority("B2C_1_SignInPolicy"),
"client_id": "b876a048-55a5-4fc5-9403-f5d90cb1c852",
"scope": ["https://msidlabb2c.onmicrosoft.com/msaapp/user_impersonation"], "port": 3843}
self._test_acquire_token_by_auth_code_lab(config)

def test_b2c_acquire_token_by_ropc(self):
self._test_username_password(
authority=self._build_b2c_authority("B2C_1_ROPC_Auth"),
client_id="e3b9ad76-9763-4827-b088-80c7a7888f79",
username="[email protected]",
password=self.get_lab_user_secret("msidlabb2c"),
scope=["https://msidlabb2c.onmicrosoft.com/msidlabb2capi/read"],
)
config = {"authority": self._build_b2c_authority("B2C_1_ROPC_Auth"),
"client_id": "e3b9ad76-9763-4827-b088-80c7a7888f79",
"username": "[email protected]", "password": self.get_lab_user_secret("msidlabb2c"),
"scope": ["https://msidlabb2c.onmicrosoft.com/msidlabb2capi/read"]}
self._test_username_password_lab(config)


class SovereignCloudTestCase(LabBasedTestCase):

def test_arlington_acquire_token_by_ropc(self):
config = self.get_lab_user(azureenvironment="azureusgovernment")
config["password"] = self.get_lab_user_secret(config["lab_name"])
self._test_username_password_lab(config)

def test_arlington_acquire_token_by_client_secret(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Historically, there was no explicit test case to cover confidential client in world-wide (WW) cloud. (We implicitly tested it by creating a confidential client as test infrastructure, though.)
That's why I LOVE seeing you add an explicit test case for confidential client this time! Let's see if we can follow the suggestion above, to try to backport this test case for WW cloud, in a reusable style.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats good to have. I think that should go in a separate PR though once this is merged. Lets keep the scope of this PR to Arlington test cases ? Let me know if that works for you. If yes, I can go and create an issue tracking this.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not doing this now, then please create a github issue to refactor test cases for multiple cloud enablement. You can point to this comment. Expect that I will be asking for the same automation to run in all the clouds we support and that code should be well structured to handle that (similar discussion happening in other repo's at this time)

config = self.get_lab_user(usertype="cloud", azureenvironment="azureusgovernment", publicClient="no")
config["client_secret"] = self.get_lab_user_secret("ARLMSIDLAB1-IDLASBS-App-CC-Secret")
self._test_acquire_token_by_client_secret_lab(config)

def test_arlington_acquire_token_obo(self):
config_cca = self.get_lab_user(
usertype="cloud", azureenvironment="azureusgovernment", publicClient="no")
config_cca["scope"] = ["https://graph.microsoft.us/.default"]
config_cca["client_secret"] = self.get_lab_user_secret("ARLMSIDLAB1-IDLASBS-App-CC-Secret")

config_pca = self.get_lab_user(usertype="cloud", azureenvironment="azureusgovernment", publicClient="yes")
obo_app_object = self.get_lab_app_object(
usertype="cloud", azureenvironment="azureusgovernment", publicClient="no")
config_pca["password"] = self.get_lab_user_secret(config_pca["lab_name"])
config_pca["scope"] = ["{app_uri}/files.read".format(app_uri=obo_app_object.get("identifierUris"))]

self._test_acquire_token_obo_lab(config_pca, config_cca)

def test_arlington_acquire_token_device_code(self):
config = self.get_lab_user(usertype="cloud", azureenvironment="azureusgovernment", publicClient="yes")
config["scope"] = ["user.read"]
self._test_acquire_token_device_code_lab(config)


if __name__ == "__main__":
unittest.main()
Expand Down