-
Notifications
You must be signed in to change notification settings - Fork 207
Arlington automation #165
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Arlington automation #165
Changes from 7 commits
8e0b7dc
179a2ca
e62f34e
b254f74
7039052
3c397c3
34781cf
f29954b
c14ee36
c3cfbca
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -102,6 +102,32 @@ def _test_username_password(self, | |
| username=username if ".b2clogin.com" not in authority else None, | ||
| ) | ||
|
|
||
| def _test_device_code( | ||
| self, client_id=None, authority=None, scope=None, **ignored): | ||
| assert client_id and authority and scope | ||
rayluo marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| self.app = msal.PublicClientApplication( | ||
| client_id, authority=authority) | ||
| flow = self.app.initiate_device_flow(scopes=scope) | ||
| assert "user_code" in flow, "DF does not seem to be provisioned: %s".format( | ||
| json.dumps(flow, indent=4)) | ||
| logger.info(flow["message"]) | ||
|
|
||
| duration = 60 | ||
| logger.info("We will wait up to %d seconds for you to sign in" % duration) | ||
| flow["expires_at"] = min( # Shorten the time for quick test | ||
| flow["expires_at"], time.time() + duration) | ||
| result = self.app.acquire_token_by_device_flow(flow) | ||
| self.assertLoosely( # It will skip this test if there is no user interaction | ||
| result, | ||
| assertion=lambda: self.assertIn('access_token', result), | ||
| skippable_errors=self.app.client.DEVICE_FLOW_RETRIABLE_ERRORS) | ||
| if "access_token" not in result: | ||
| self.skip("End user did not complete Device Flow in time") | ||
| self.assertCacheWorksForUser(result, scope, username=None) | ||
| result["access_token"] = result["refresh_token"] = "************" | ||
| logger.info( | ||
| "%s obtained tokens: %s", self.id(), json.dumps(result, indent=4)) | ||
|
|
||
|
|
||
| THIS_FOLDER = os.path.dirname(__file__) | ||
| CONFIG = os.path.join(THIS_FOLDER, "config.json") | ||
|
|
@@ -244,29 +270,7 @@ def setUpClass(cls): | |
| cls.config = json.load(f) | ||
|
|
||
| def test_device_flow(self): | ||
| scopes = self.config["scope"] | ||
| self.app = msal.PublicClientApplication( | ||
| self.config['client_id'], authority=self.config["authority"]) | ||
| flow = self.app.initiate_device_flow(scopes=scopes) | ||
| assert "user_code" in flow, "DF does not seem to be provisioned: %s".format( | ||
| json.dumps(flow, indent=4)) | ||
| logger.info(flow["message"]) | ||
|
|
||
| duration = 60 | ||
| logger.info("We will wait up to %d seconds for you to sign in" % duration) | ||
| flow["expires_at"] = min( # Shorten the time for quick test | ||
| flow["expires_at"], time.time() + duration) | ||
| result = self.app.acquire_token_by_device_flow(flow) | ||
| self.assertLoosely( # It will skip this test if there is no user interaction | ||
| result, | ||
| assertion=lambda: self.assertIn('access_token', result), | ||
| skippable_errors=self.app.client.DEVICE_FLOW_RETRIABLE_ERRORS) | ||
| if "access_token" not in result: | ||
| self.skip("End user did not complete Device Flow in time") | ||
| self.assertCacheWorksForUser(result, scopes, username=None) | ||
| result["access_token"] = result["refresh_token"] = "************" | ||
| logger.info( | ||
| "%s obtained tokens: %s", self.id(), json.dumps(result, indent=4)) | ||
| self._test_device_code(**self.config) | ||
|
|
||
|
|
||
| def get_lab_app( | ||
|
|
@@ -322,6 +326,12 @@ def setUpClass(cls): | |
| def tearDownClass(cls): | ||
| cls.session.close() | ||
|
|
||
| @classmethod | ||
| def get_lab_app_object(cls, **query): # https://msidlab.com/swagger/index.html | ||
| url = "https://msidlab.com/api/app" | ||
| resp = cls.session.get(url, params=query) | ||
| return resp.json()[0] | ||
|
|
||
| @classmethod | ||
| def get_lab_user_secret(cls, lab_name="msidlab4"): | ||
| lab_name = lab_name.lower() | ||
|
|
@@ -336,67 +346,29 @@ def get_lab_user_secret(cls, lab_name="msidlab4"): | |
| def get_lab_user(cls, **query): # https://docs.msidlab.com/labapi/userapi.html | ||
| resp = cls.session.get("https://msidlab.com/api/user", params=query) | ||
| result = resp.json()[0] | ||
| _env = query.get("azureenvironment", "").lower() | ||
| authority_base = { | ||
| "azureusgovernment": "https://login.microsoftonline.us/" | ||
| }.get(_env, "https://login.microsoftonline.com/") | ||
rayluo marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| scope = { | ||
| "azureusgovernment": ["https://graph.microsoft.us/.default"], | ||
| }.get(_env, ["https://graph.microsoft.com/.default"]) | ||
rayluo marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| return { # Mapping lab API response to our simplified configuration format | ||
| "authority": "https://login.microsoftonline.com/{}.onmicrosoft.com".format( | ||
| result["labName"]), | ||
| "authority": authority_base + result["tenantID"], | ||
| "client_id": result["appId"], | ||
| "username": result["upn"], | ||
| "lab_name": result["labName"], | ||
| "scope": ["https://graph.microsoft.com/.default"], | ||
| "scope": scope, | ||
| } | ||
|
|
||
| def test_aad_managed_user(self): # Pure cloud | ||
| config = self.get_lab_user(usertype="cloud") | ||
| self._test_username_password( | ||
| password=self.get_lab_user_secret(config["lab_name"]), **config) | ||
|
|
||
| def test_adfs4_fed_user(self): | ||
| config = self.get_lab_user(usertype="federated", federationProvider="ADFSv4") | ||
| self._test_username_password( | ||
| password=self.get_lab_user_secret(config["lab_name"]), **config) | ||
|
|
||
| def test_adfs3_fed_user(self): | ||
| config = self.get_lab_user(usertype="federated", federationProvider="ADFSv3") | ||
| self._test_username_password( | ||
| password=self.get_lab_user_secret(config["lab_name"]), **config) | ||
|
|
||
| def test_adfs2_fed_user(self): | ||
| config = self.get_lab_user(usertype="federated", federationProvider="ADFSv2") | ||
| self._test_username_password( | ||
| password=self.get_lab_user_secret(config["lab_name"]), **config) | ||
|
|
||
| def test_adfs2019_fed_user(self): | ||
| config = self.get_lab_user(usertype="federated", federationProvider="ADFSv2019") | ||
| self._test_username_password( | ||
| password=self.get_lab_user_secret(config["lab_name"]), **config) | ||
|
|
||
| def test_ropc_adfs2019_onprem(self): | ||
| config = self.get_lab_user(usertype="onprem", federationProvider="ADFSv2019") | ||
| config["authority"] = "https://fs.%s.com/adfs" % config["lab_name"] | ||
| config["client_id"] = "PublicClientId" | ||
| config["scope"] = self.adfs2019_scopes | ||
| self._test_username_password( | ||
| password=self.get_lab_user_secret(config["lab_name"]), **config) | ||
|
|
||
| @unittest.skipIf(os.getenv("TRAVIS"), "Browser automation is not yet implemented") | ||
| def test_adfs2019_onprem_acquire_token_by_auth_code(self): | ||
| """When prompted, you can manually login using this account: | ||
|
|
||
| # https://msidlab.com/api/user?usertype=onprem&federationprovider=ADFSv2019 | ||
| username = "..." # The upn from the link above | ||
| password="***" # From https://aka.ms/GetLabUserSecret?Secret=msidlabXYZ | ||
| """ | ||
| scopes = self.adfs2019_scopes | ||
| config = self.get_lab_user(usertype="onprem", federationProvider="ADFSv2019") | ||
| def _test_acquire_token_by_auth_code( | ||
| self, client_id=None, authority=None, port=None, scope=None, | ||
| **ignored): | ||
| assert client_id and authority and port and scope | ||
| (self.app, ac, redirect_uri) = _get_app_and_auth_code( | ||
| # Configuration is derived from https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.7.0/tests/Microsoft.Identity.Test.Common/TestConstants.cs#L250-L259 | ||
| "PublicClientId", | ||
| authority="https://fs.%s.com/adfs" % config["lab_name"], | ||
| port=8080, | ||
| scopes=scopes, | ||
| ) | ||
| client_id, authority, port, scope) | ||
rayluo marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| result = self.app.acquire_token_by_authorization_code( | ||
| ac, scopes, redirect_uri=redirect_uri) | ||
| ac, scope, redirect_uri=redirect_uri) | ||
| logger.debug( | ||
| "%s: cache = %s, id_token_claims = %s", | ||
| self.id(), | ||
|
|
@@ -409,41 +381,32 @@ def test_adfs2019_onprem_acquire_token_by_auth_code(self): | |
| # Note: No interpolation here, cause error won't always present | ||
| error=result.get("error"), | ||
| error_description=result.get("error_description"))) | ||
| self.assertCacheWorksForUser(result, scopes, username=None) | ||
|
|
||
| @unittest.skipUnless( | ||
| os.getenv("LAB_OBO_CLIENT_SECRET"), | ||
| "Need LAB_OBO_CLIENT SECRET from https://msidlabs.vault.azure.net/secrets/TodoListServiceV2-OBO/c58ba97c34ca4464886943a847d1db56") | ||
| def test_acquire_token_obo(self): | ||
| # Some hardcoded, pre-defined settings | ||
| obo_client_id = "f4aa5217-e87c-42b2-82af-5624dd14ee72" | ||
| downstream_scopes = ["https://graph.microsoft.com/.default"] | ||
| config = self.get_lab_user(usertype="cloud") | ||
| self.assertCacheWorksForUser(result, scope, username=None) | ||
|
|
||
| def _test_acquire_token_obo(self, config_pca, config_cca): | ||
| # 1. An app obtains a token representing a user, for our mid-tier service | ||
| pca = msal.PublicClientApplication( | ||
| "c0485386-1e9a-4663-bc96-7ab30656de7f", authority=config.get("authority")) | ||
| config_pca["client_id"], authority=config_pca["authority"]) | ||
| pca_result = pca.acquire_token_by_username_password( | ||
| config["username"], | ||
| self.get_lab_user_secret(config["lab_name"]), | ||
| scopes=[ # The OBO app's scope. Yours might be different. | ||
| "api://%s/read" % obo_client_id], | ||
| config_pca["username"], | ||
| config_pca["password"], | ||
| scopes=config_pca["scope"], | ||
| ) | ||
| self.assertIsNotNone( | ||
| pca_result.get("access_token"), | ||
| "PCA failed to get AT because %s" % json.dumps(pca_result, indent=2)) | ||
|
|
||
| # 2. Our mid-tier service uses OBO to obtain a token for downstream service | ||
| cca = msal.ConfidentialClientApplication( | ||
| obo_client_id, | ||
| client_credential=os.getenv("LAB_OBO_CLIENT_SECRET"), | ||
| authority=config.get("authority"), | ||
| config_cca["client_id"], | ||
| client_credential=config_cca["client_secret"], | ||
| authority=config_cca["authority"], | ||
| # token_cache= ..., # Default token cache is all-tokens-store-in-memory. | ||
| # That's fine if OBO app uses short-lived msal instance per session. | ||
| # Otherwise, the OBO app need to implement a one-cache-per-user setup. | ||
| ) | ||
| cca_result = cca.acquire_token_on_behalf_of( | ||
| pca_result['access_token'], downstream_scopes) | ||
| pca_result['access_token'], config_cca["scope"]) | ||
| self.assertNotEqual(None, cca_result.get("access_token"), str(cca_result)) | ||
|
|
||
| # 3. Now the OBO app can simply store downstream token(s) in same session. | ||
|
|
@@ -453,12 +416,93 @@ def test_acquire_token_obo(self): | |
| # Assuming you already did that (which is not shown in this test case), | ||
| # the following part shows one of the ways to obtain an AT from cache. | ||
| username = cca_result.get("id_token_claims", {}).get("preferred_username") | ||
| self.assertEqual(config["username"], username) | ||
| self.assertEqual(config_cca["username"], username) | ||
| if username: # A precaution so that we won't use other user's token | ||
| account = cca.get_accounts(username=username)[0] | ||
| result = cca.acquire_token_silent(downstream_scopes, account) | ||
| result = cca.acquire_token_silent(config_cca["scope"], account) | ||
| self.assertEqual(cca_result["access_token"], result["access_token"]) | ||
|
|
||
| def _test_acquire_token_by_client_secret( | ||
| self, client_id=None, client_secret=None, authority=None, scope=None, | ||
| **ignored): | ||
| assert client_id and client_secret and authority and scope | ||
| app = msal.ConfidentialClientApplication( | ||
| client_id, client_credential=client_secret, authority=authority) | ||
| result = app.acquire_token_for_client(scope) | ||
| self.assertIsNotNone(result.get("access_token"), "Got %s instead" % result) | ||
|
|
||
|
|
||
| class WorldWideTestCase(LabBasedTestCase): | ||
|
|
||
| def test_aad_managed_user(self): # Pure cloud | ||
| config = self.get_lab_user(usertype="cloud") | ||
| config["password"] = self.get_lab_user_secret(config["lab_name"]) | ||
| self._test_username_password(**config) | ||
|
|
||
| def test_adfs4_fed_user(self): | ||
| config = self.get_lab_user(usertype="federated", federationProvider="ADFSv4") | ||
| config["password"] = self.get_lab_user_secret(config["lab_name"]) | ||
| self._test_username_password(**config) | ||
|
|
||
| def test_adfs3_fed_user(self): | ||
| config = self.get_lab_user(usertype="federated", federationProvider="ADFSv3") | ||
| config["password"] = self.get_lab_user_secret(config["lab_name"]) | ||
| self._test_username_password(**config) | ||
|
|
||
| def test_adfs2_fed_user(self): | ||
| config = self.get_lab_user(usertype="federated", federationProvider="ADFSv2") | ||
| config["password"] = self.get_lab_user_secret(config["lab_name"]) | ||
| self._test_username_password(**config) | ||
|
|
||
| def test_adfs2019_fed_user(self): | ||
| config = self.get_lab_user(usertype="federated", federationProvider="ADFSv2019") | ||
| config["password"] = self.get_lab_user_secret(config["lab_name"]) | ||
| self._test_username_password(**config) | ||
|
|
||
| def test_ropc_adfs2019_onprem(self): | ||
| # Configuration is derived from https://github.com/AzureAD/microsoft-authentication-library-for-dotnet/blob/4.7.0/tests/Microsoft.Identity.Test.Common/TestConstants.cs#L250-L259 | ||
| config = self.get_lab_user(usertype="onprem", federationProvider="ADFSv2019") | ||
| config["authority"] = "https://fs.%s.com/adfs" % config["lab_name"] | ||
| config["client_id"] = "PublicClientId" | ||
| config["scope"] = self.adfs2019_scopes | ||
| config["password"] = self.get_lab_user_secret(config["lab_name"]) | ||
| self._test_username_password(**config) | ||
|
|
||
| @unittest.skipIf(os.getenv("TRAVIS"), "Browser automation is not yet implemented") | ||
| def test_adfs2019_onprem_acquire_token_by_auth_code(self): | ||
| """When prompted, you can manually login using this account: | ||
|
|
||
| # https://msidlab.com/api/user?usertype=onprem&federationprovider=ADFSv2019 | ||
| username = "..." # The upn from the link above | ||
| password="***" # From https://aka.ms/GetLabUserSecret?Secret=msidlabXYZ | ||
| """ | ||
| config = self.get_lab_user(usertype="onprem", federationProvider="ADFSv2019") | ||
| config["authority"] = "https://fs.%s.com/adfs" % config["lab_name"] | ||
| config["client_id"] = "PublicClientId" | ||
| config["scope"] = self.adfs2019_scopes | ||
| config["port"] = 8080 | ||
| self._test_acquire_token_by_auth_code(**config) | ||
|
|
||
| @unittest.skipUnless( | ||
| os.getenv("LAB_OBO_CLIENT_SECRET"), | ||
| "Need LAB_OBO_CLIENT SECRET from https://msidlabs.vault.azure.net/secrets/TodoListServiceV2-OBO/c58ba97c34ca4464886943a847d1db56") | ||
| def test_acquire_token_obo(self): | ||
| config = self.get_lab_user(usertype="cloud") | ||
|
|
||
| config_cca = {} | ||
| config_cca.update(config) | ||
| config_cca["client_id"] = "f4aa5217-e87c-42b2-82af-5624dd14ee72" | ||
| config_cca["scope"] = ["https://graph.microsoft.com/.default"] | ||
| config_cca["client_secret"] = os.getenv("LAB_OBO_CLIENT_SECRET") | ||
|
|
||
| config_pca = {} | ||
| config_pca.update(config) | ||
| config_pca["client_id"] = "c0485386-1e9a-4663-bc96-7ab30656de7f" | ||
| config_pca["password"] = self.get_lab_user_secret(config_pca["lab_name"]) | ||
| config_pca["scope"] = ["api://%s/read" % config_cca["client_id"]] | ||
|
|
||
| self._test_acquire_token_obo(config_pca, config_cca) | ||
|
|
||
| def _build_b2c_authority(self, policy): | ||
| base = "https://msidlabb2c.b2clogin.com/msidlabb2c.onmicrosoft.com" | ||
| return base + "/" + policy # We do not support base + "?p=" + policy | ||
|
|
@@ -472,38 +516,51 @@ def test_b2c_acquire_token_by_auth_code(self): | |
| # This won't work https://msidlab.com/api/user?usertype=b2c | ||
| password="***" # From https://aka.ms/GetLabUserSecret?Secret=msidlabb2c | ||
| """ | ||
| scopes = ["https://msidlabb2c.onmicrosoft.com/msaapp/user_impersonation"] | ||
| (self.app, ac, redirect_uri) = _get_app_and_auth_code( | ||
| "b876a048-55a5-4fc5-9403-f5d90cb1c852", | ||
| client_secret=self.get_lab_user_secret("MSIDLABB2C-MSAapp-AppSecret"), | ||
| authority=self._build_b2c_authority("B2C_1_SignInPolicy"), | ||
| port=3843, # Lab defines 4 of them: [3843, 4584, 4843, 60000] | ||
| scopes=scopes, | ||
| ) | ||
| result = self.app.acquire_token_by_authorization_code( | ||
| ac, scopes, redirect_uri=redirect_uri) | ||
| logger.debug( | ||
| "%s: cache = %s, id_token_claims = %s", | ||
| self.id(), | ||
| json.dumps(self.app.token_cache._cache, indent=4), | ||
| json.dumps(result.get("id_token_claims"), indent=4), | ||
| ) | ||
| self.assertIn( | ||
| "access_token", result, | ||
| "{error}: {error_description}".format( | ||
| # Note: No interpolation here, cause error won't always present | ||
| error=result.get("error"), | ||
| error_description=result.get("error_description"))) | ||
| self.assertCacheWorksForUser(result, scopes, username=None) | ||
| config = {"authority": self._build_b2c_authority("B2C_1_SignInPolicy"), | ||
rayluo marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| "client_id": "b876a048-55a5-4fc5-9403-f5d90cb1c852", | ||
| "scope": ["https://msidlabb2c.onmicrosoft.com/msaapp/user_impersonation"], "port": 3843} | ||
| self._test_acquire_token_by_auth_code(**config) | ||
|
|
||
| def test_b2c_acquire_token_by_ropc(self): | ||
| self._test_username_password( | ||
| authority=self._build_b2c_authority("B2C_1_ROPC_Auth"), | ||
| client_id="e3b9ad76-9763-4827-b088-80c7a7888f79", | ||
| username="[email protected]", | ||
| password=self.get_lab_user_secret("msidlabb2c"), | ||
| scope=["https://msidlabb2c.onmicrosoft.com/msidlabb2capi/read"], | ||
| ) | ||
| config = {"authority": self._build_b2c_authority("B2C_1_ROPC_Auth"), | ||
rayluo marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| "client_id": "e3b9ad76-9763-4827-b088-80c7a7888f79", | ||
| "username": "[email protected]", "password": self.get_lab_user_secret("msidlabb2c"), | ||
| "scope": ["https://msidlabb2c.onmicrosoft.com/msidlabb2capi/read"]} | ||
| self._test_username_password(**config) | ||
|
|
||
|
|
||
| class ArlingtonCloudTestCase(LabBasedTestCase): | ||
| environment = "azureusgovernment" | ||
|
|
||
| def test_arlington_acquire_token_by_ropc(self): | ||
rayluo marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| config = self.get_lab_user(azureenvironment=self.environment) | ||
| config["password"] = self.get_lab_user_secret(config["lab_name"]) | ||
| self._test_username_password(**config) | ||
|
|
||
| def test_arlington_acquire_token_by_client_secret(self): | ||
|
||
| config = self.get_lab_user(usertype="cloud", azureenvironment=self.environment, publicClient="no") | ||
| config["client_secret"] = self.get_lab_user_secret("ARLMSIDLAB1-IDLASBS-App-CC-Secret") | ||
| self._test_acquire_token_by_client_secret(**config) | ||
|
|
||
| def test_arlington_acquire_token_obo(self): | ||
| config_cca = self.get_lab_user( | ||
| usertype="cloud", azureenvironment=self.environment, publicClient="no") | ||
| config_cca["scope"] = ["https://graph.microsoft.us/.default"] | ||
| config_cca["client_secret"] = self.get_lab_user_secret("ARLMSIDLAB1-IDLASBS-App-CC-Secret") | ||
|
|
||
| config_pca = self.get_lab_user(usertype="cloud", azureenvironment=self.environment, publicClient="yes") | ||
| obo_app_object = self.get_lab_app_object( | ||
| usertype="cloud", azureenvironment=self.environment, publicClient="no") | ||
| config_pca["password"] = self.get_lab_user_secret(config_pca["lab_name"]) | ||
| config_pca["scope"] = ["{app_uri}/files.read".format(app_uri=obo_app_object.get("identifierUris"))] | ||
|
|
||
| self._test_acquire_token_obo(config_pca, config_cca) | ||
|
|
||
| def test_arlington_acquire_token_device_code(self): | ||
| config = self.get_lab_user(usertype="cloud", azureenvironment=self.environment, publicClient="yes") | ||
| config["scope"] = ["user.read"] | ||
| self._test_device_code(**config) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
| unittest.main() | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.