Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Combine multiple sub-classes back to one test class
  • Loading branch information
rayluo committed Sep 11, 2019
commit fb56287d0165a17d9fdd2b8abd188136415d552c
149 changes: 67 additions & 82 deletions tests/test_e2e.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,6 @@


class E2eTestCase(unittest.TestCase):
config = {}

def skipIfNotConfigured(self, fields):
for field in fields:
if not self.config.get(field):
self.skipTest('"%s" not found in configuration' % field)

def assertLoosely(self, response, assertion=None,
skippable_errors=("invalid_grant", "interaction_required")):
Expand All @@ -36,39 +30,36 @@ def assertLoosely(self, response, assertion=None,
error_description=response.get("error_description")))
assertion()

def assertCacheWorks(self, result_from_wire):
def assertCacheWorks(self, result_from_wire, username, scope):
result = result_from_wire
# You can filter by predefined username, or let end user to choose one
accounts = self.app.get_accounts(username=self.config.get("username"))
accounts = self.app.get_accounts(username=username)
self.assertNotEqual(0, len(accounts))
account = accounts[0]
# Going to test acquire_token_silent(...) to locate an AT from cache
result_from_cache = self.app.acquire_token_silent(
self.config["scope"], account=account)
result_from_cache = self.app.acquire_token_silent(scope, account=account)
self.assertIsNotNone(result_from_cache)
self.assertEqual(result['access_token'], result_from_cache['access_token'],
"We should get a cached AT")

# Going to test acquire_token_silent(...) to obtain an AT by a RT from cache
self.app.token_cache._cache["AccessToken"] = {} # A hacky way to clear ATs
result_from_cache = self.app.acquire_token_silent(
self.config["scope"], account=account)
result_from_cache = self.app.acquire_token_silent(scope, account=account)
self.assertIsNotNone(result_from_cache,
"We should get a result from acquire_token_silent(...) call")
self.assertNotEqual(result['access_token'], result_from_cache['access_token'],
"We should get a fresh AT (via RT)")

def test_username_password(self):
self.skipIfNotConfigured([
"authority", "client_id", "username", "password", "scope"])
self.app = msal.PublicClientApplication(
self.config["client_id"], authority=self.config["authority"])
def _test_username_password(self,
authority=None, client_id=None, username=None, password=None, scope=None,
**ignored):
assert authority and client_id and username and password and scope
self.app = msal.PublicClientApplication(client_id, authority=authority)
result = self.app.acquire_token_by_username_password(
self.config["username"], self.config["password"],
scopes=self.config.get("scope"))
username, password, scopes=scope)
self.assertLoosely(result)
# self.assertEqual(None, result.get("error"), str(result))
self.assertCacheWorks(result)
self.assertCacheWorks(result, username, scope)


CONFIG = os.path.join(os.path.dirname(__file__), "config.json")
Expand All @@ -78,10 +69,14 @@ def setUp(self):
with open(CONFIG) as f:
self.config = json.load(f)

def test_username_password(self):
self._test_username_password(**self.config)

def get_lab_user(query): # This API requires no authorization
def get_lab_user(mam=False, mfa=False, isFederated=False, federationProvider=None):
# Based on https://microsoft.sharepoint-df.com/teams/MSIDLABSExtended/SitePages/LAB.aspx
user = requests.get("https://api.msidlab.com/api/user", params=query).json()
user = requests.get("https://api.msidlab.com/api/user", params=dict( # Publicly available
mam=mam, mfa=mfa, isFederated=isFederated, federationProvider=federationProvider,
)).json()
return { # Mapping lab API response to our simplified configuration format
"authority": user["Authority"][0] + user["Users"]["tenantId"],
"client_id": user["AppID"],
Expand Down Expand Up @@ -112,25 +107,35 @@ def get_lab_app(
logger.info("ENV variables %s and/or %s are not defined. Fall back to MSI.",
env_client_id, env_client_secret)
# See also https://microsoft.sharepoint-df.com/teams/MSIDLABSExtended/SitePages/Programmatically-accessing-LAB-API's.aspx
raise NotImplementedError("MSI-based mechanism has not been implemented yet")
raise unittest.SkipTest("MSI-based mechanism has not been implemented yet")
return msal.ConfidentialClientApplication(client_id, client_secret,
authority="https://login.microsoftonline.com/"
"72f988bf-86f1-41af-91ab-2d7cd011db47", # Microsoft tenant ID
)

def get_session(lab_app): # BTW, this infrastructure tests the confidential client flow
def get_session(lab_app, scopes): # BTW, this infrastructure tests the confidential client flow
logger.info("Creating session")
lab_token = lab_app.acquire_token_for_client("https://request.msidlab.com/.default")
lab_token = lab_app.acquire_token_for_client(scopes)
session = requests.Session()
session.headers.update({"Authorization": "Bearer %s" % lab_token["access_token"]})
session.hooks["response"].append(lambda r, *args, **kwargs: r.raise_for_status())
return session


class LabBasedTestCase(E2eTestCase):
session = get_session(get_lab_app()) # It will run even all test cases are skipped
_secrets = {}

@classmethod
def setUpClass(cls):
cls.session = get_session(get_lab_app(), [
"https://request.msidlab.com/.default", # Existing user & password API
# "https://user.msidlab.com/.default", # New user API
])

@classmethod
def tearDownClass(cls):
cls.session.close()

@classmethod
def get_lab_user_secret(cls, lab_name="msidlab4"):
lab_name = lab_name.lower()
Expand All @@ -146,7 +151,7 @@ def get_lab_user_secret(cls, lab_name="msidlab4"):
return cls._secrets[lab_name]

@classmethod
def get_lab_user(cls, query): # The query format is in lab team's Aug 9 email
def get_lab_user(cls, query): # Experimental: The query format is in lab team's Aug 9 email
resp = cls.session.get("https://user.msidlab.com/api/user", params=query)
result = resp.json()[0]
return { # Mapping lab API response to our simplified configuration format
Expand All @@ -157,59 +162,39 @@ def get_lab_user(cls, query): # The query format is in lab team's Aug 9 email
"scope": ["https://graph.microsoft.com/.default"],
}

DEFAULT_QUERY = {"mam": False, "mfa": False}

class AadManagedUserTestCase(LabBasedTestCase):
@classmethod
def setUpClass(cls):
cls.config = get_lab_user(dict(DEFAULT_QUERY,
isFederated=False, # Supposed to find a pure managed user,
# but lab still gives us a [email protected]
))
cls.config["password"] = cls.get_lab_user_secret(cls.config["lab"]["labname"])

class Adfs4FedUserTestCase(LabBasedTestCase):
@classmethod
def setUpClass(cls):
cls.config = get_lab_user(dict(
DEFAULT_QUERY, isFederated=True, federationProvider="ADFSv4"))
cls.config["password"] = cls.get_lab_user_secret(cls.config["lab"]["labname"])

class Adfs4ManagedUserTestCase(LabBasedTestCase): # a.k.a. the hybrid
@classmethod
def setUpClass(cls):
cls.config = get_lab_user(dict(
DEFAULT_QUERY, isFederated=False, federationProvider="ADFSv4"))
cls.config["password"] = cls.get_lab_user_secret(cls.config["lab"]["labname"])

class Adfs3FedUserTestCase(LabBasedTestCase):
@classmethod
def setUpClass(cls):
cls.config = get_lab_user(dict(
DEFAULT_QUERY, isFederated=True, federationProvider="ADFSv3"))
#cls.config = cls.get_lab_user({
# "MFA": "none", "UserType": "federated", "FederationProvider": "adfsv3"})
cls.config["password"] = cls.get_lab_user_secret(cls.config["lab"]["labname"])

class Adfs3ManagedUserTestCase(LabBasedTestCase): # a.k.a. the hybrid
@classmethod
def setUpClass(cls):
cls.config = get_lab_user(dict(
DEFAULT_QUERY, isFederated=False, federationProvider="ADFSv3"))
cls.config["password"] = cls.get_lab_user_secret(cls.config["lab"]["labname"])

class Adfs2FedUserTestCase(LabBasedTestCase):
@classmethod
def setUpClass(cls):
cls.config = get_lab_user(dict(
DEFAULT_QUERY, isFederated=True, federationProvider="ADFSv2"))
cls.config["password"] = cls.get_lab_user_secret(cls.config["lab"]["labname"])

@unittest.skip("Lab API returns nothing. We might need to switch to beta api")
class Adfs2019FedUserTestCase(LabBasedTestCase):
@classmethod
def setUpClass(cls):
cls.config = get_lab_user(dict(
DEFAULT_QUERY, isFederated=True, federationProvider="ADFSv2019"))
cls.config["password"] = cls.get_lab_user_secret(cls.config["lab"]["labname"])
def test_aad_managed_user(self): # Pure cloud or hybrid
config = get_lab_user(isFederated=False)
self._test_username_password(
password=self.get_lab_user_secret(config["lab"]["labname"]), **config)

def test_adfs4_fed_user(self):
config = get_lab_user(isFederated=True, federationProvider="ADFSv4")
self._test_username_password(
password=self.get_lab_user_secret(config["lab"]["labname"]), **config)

def test_adfs4_managed_user(self): # Conceptually the hybrid
config = get_lab_user(isFederated=False, federationProvider="ADFSv4")
self._test_username_password(
password=self.get_lab_user_secret(config["lab"]["labname"]), **config)

def test_adfs3_fed_user(self):
config = get_lab_user(isFederated=True, federationProvider="ADFSv3")
self._test_username_password(
password=self.get_lab_user_secret(config["lab"]["labname"]), **config)

def test_adfs3_managed_user(self):
config = get_lab_user(isFederated=False, federationProvider="ADFSv3")
self._test_username_password(
password=self.get_lab_user_secret(config["lab"]["labname"]), **config)

def test_adfs2_fed_user(self):
config = get_lab_user(isFederated=True, federationProvider="ADFSv2")
self._test_username_password(
password=self.get_lab_user_secret(config["lab"]["labname"]), **config)

@unittest.skip("Old Lab API returns nothing. We will switch to new api later")
def test_adfs2019_fed_user(self):
config = get_lab_user(isFederated=True, federationProvider="ADFSv2019")
self._test_username_password(
password=self.get_lab_user_secret(config["lab"]["labname"]), **config)