Skip to content
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 100 additions & 3 deletions tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,12 @@ def setUpClass(cls):
def tearDownClass(cls):
cls.session.close()

@classmethod
def get_lab_app_object(cls, **query ):
url = "https://msidlab.com/api/app"
resp = cls.session.get(url, params=query)
return resp.json()[0]

@classmethod
def get_lab_user_secret(cls, lab_name="msidlab4"):
lab_name = lab_name.lower()
Expand All @@ -336,13 +342,17 @@ def get_lab_user_secret(cls, lab_name="msidlab4"):
def get_lab_user(cls, **query): # https://docs.msidlab.com/labapi/userapi.html
resp = cls.session.get("https://msidlab.com/api/user", params=query)
result = resp.json()[0]
authority_base = "https://login.microsoftonline.com/"
graph_endpoint = "https://graph.microsoft.com/.default"
if "azureenvironment" in query and query["azureenvironment"] == "azureusgovernment":
authority_base = "https://login.microsoftonline.us/"
graph_endpoint = "https://graph.microsoft.us/.default"
return { # Mapping lab API response to our simplified configuration format
"authority": "https://login.microsoftonline.com/{}.onmicrosoft.com".format(
result["labName"]),
"authority": authority_base + result["tenantID"],
"client_id": result["appId"],
"username": result["upn"],
"lab_name": result["labName"],
"scope": ["https://graph.microsoft.com/.default"],
"scope": [graph_endpoint],
}

def test_aad_managed_user(self): # Pure cloud
Expand Down Expand Up @@ -505,6 +515,93 @@ def test_b2c_acquire_token_by_ropc(self):
scope=["https://msidlabb2c.onmicrosoft.com/msidlabb2capi/read"],
)

def test_arlington_acquire_token_by_ropc(self):
config = self.get_lab_user(azureenvironment="azureusgovernment")
self._test_username_password(
password=self.get_lab_user_secret(config["lab_name"]), **config)

def test_arlington_acquire_token_by_client_secret(self):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Historically, there was no explicit test case to cover confidential client in world-wide (WW) cloud. (We implicitly tested it by creating a confidential client as test infrastructure, though.)
That's why I LOVE seeing you add an explicit test case for confidential client this time! Let's see if we can follow the suggestion above, to try to backport this test case for WW cloud, in a reusable style.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats good to have. I think that should go in a separate PR though once this is merged. Lets keep the scope of this PR to Arlington test cases ? Let me know if that works for you. If yes, I can go and create an issue tracking this.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If not doing this now, then please create a github issue to refactor test cases for multiple cloud enablement. You can point to this comment. Expect that I will be asking for the same automation to run in all the clouds we support and that code should be well structured to handle that (similar discussion happening in other repo's at this time)

config = self.get_lab_user(usertype="cloud", azureenvironment="azureusgovernment", publicClient="no")
app = msal.ConfidentialClientApplication(config.get("client_id"),
client_credential=self.get_lab_user_secret("ARLMSIDLAB1-IDLASBS-App-CC-Secret"),
authority=config.get("authority"),
)
result = app.acquire_token_for_client(config.get("scope"))
self.assertNotEqual(None, result.get("access_token"), str(result))

def test_arlington_acquire_token_obo(self):
obo_config = self.get_lab_user(
usertype="cloud", azureenvironment="azureusgovernment", publicClient="no")
obo_app_object = self.get_lab_app_object(
usertype="cloud", azureenvironment="azureusgovernment", publicClient="no")
downstream_scopes = ["https://graph.microsoft.com/.default"]
config = self.get_lab_user(usertype="cloud", azureenvironment="azureusgovernment", publicClient="yes")

# 1. An app obtains a token representing a user, for our mid-tier service
pca = msal.PublicClientApplication(
client_id=config.get("client_id"), authority=config.get("authority"))
pca_result = pca.acquire_token_by_username_password(
config["username"],
self.get_lab_user_secret(config["lab_name"]),
scopes=[ # The OBO app's scope. Yours might be different.
"{app_uri}/files.read".format(app_uri=obo_app_object.get("identifierUris"))],
)
self.assertIsNotNone(
pca_result.get("access_token"),
"PCA failed to get AT because %s" % json.dumps(pca_result, indent=2))

# 2. Our mid-tier service uses OBO to obtain a token for downstream service
cca = msal.ConfidentialClientApplication(
obo_config.get("client_id"),
client_credential=self.get_lab_user_secret("ARLMSIDLAB1-IDLASBS-App-CC-Secret"),
authority=obo_config.get("authority"),
# token_cache= ..., # Default token cache is all-tokens-store-in-memory.
# That's fine if OBO app uses short-lived msal instance per session.
# Otherwise, the OBO app need to implement a one-cache-per-user setup.
)
cca_result = cca.acquire_token_on_behalf_of(
pca_result['access_token'], downstream_scopes)
self.assertNotEqual(None, cca_result.get("access_token"), str(cca_result))

# 3. Now the OBO app can simply store downstream token(s) in same session.
# Alternatively, if you want to persist the downstream AT, and possibly
# the RT (if any) for prolonged access even after your own AT expires,
# now it is the time to persist current cache state for current user.
# Assuming you already did that (which is not shown in this test case),
# the following part shows one of the ways to obtain an AT from cache.
username = cca_result.get("id_token_claims", {}).get("preferred_username")
self.assertEqual(config["username"], username)
if username: # A precaution so that we won't use other user's token
account = cca.get_accounts(username=username)[0]
result = cca.acquire_token_silent(downstream_scopes, account)
self.assertEqual(cca_result["access_token"], result["access_token"])

def test_arlington_acquire_token_device_code(self):
config = self.get_lab_user(usertype="cloud", azureenvironment="azureusgovernment", publicClient="yes")
scopes = ["user.read"]
self.app = msal.PublicClientApplication(
config['client_id'], authority=config["authority"])
flow = self.app.initiate_device_flow(scopes=scopes)
assert "user_code" in flow, "DF does not seem to be provisioned: %s".format(
json.dumps(flow, indent=4))
logger.info(flow["message"])

duration = 60
logger.info("We will wait up to %d seconds for you to sign in" % duration)
flow["expires_at"] = min( # Shorten the time for quick test
flow["expires_at"], time.time() + duration)
result = self.app.acquire_token_by_device_flow(flow)
self.assertLoosely( # It will skip this test if there is no user interaction
result,
assertion=lambda: self.assertIn('access_token', result),
skippable_errors=self.app.client.DEVICE_FLOW_RETRIABLE_ERRORS)
if "access_token" not in result:
self.skip("End user did not complete Device Flow in time")
self.assertCacheWorksForUser(result, scopes, username=None)
result["access_token"] = result["refresh_token"] = "************"
logger.info(
"%s obtained tokens: %s", self.id(), json.dumps(result, indent=4))

if __name__ == "__main__":
unittest.main()