-
Notifications
You must be signed in to change notification settings - Fork 202
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
B2c implementation #104
B2c implementation #104
Changes from all commits
e613b25
3c40b7a
7b2afc0
7d5468a
d31d472
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,21 @@ | |
logging.basicConfig(level=logging.INFO) | ||
|
||
|
||
def _get_app_and_auth_code( | ||
client_id, | ||
client_secret=None, | ||
authority="https://login.microsoftonline.com/common", | ||
port=44331, | ||
scopes=["https://graph.microsoft.com/.default"], # Microsoft Graph | ||
): | ||
from msal.oauth2cli.authcode import obtain_auth_code | ||
app = msal.ClientApplication(client_id, client_secret, authority=authority) | ||
redirect_uri = "http://localhost:%d" % port | ||
ac = obtain_auth_code(port, auth_uri=app.get_authorization_request_url( | ||
scopes, redirect_uri=redirect_uri)) | ||
assert ac is not None | ||
return (app, ac, redirect_uri) | ||
|
||
@unittest.skipIf(os.getenv("TRAVIS_TAG"), "Skip e2e tests during tagged release") | ||
class E2eTestCase(unittest.TestCase): | ||
|
||
|
@@ -49,9 +64,15 @@ def assertCacheWorksForUser(self, result_from_wire, scope, username=None): | |
result_from_cache = self.app.acquire_token_silent(scope, account=account) | ||
self.assertIsNotNone(result_from_cache, | ||
"We should get a result from acquire_token_silent(...) call") | ||
self.assertNotEqual( | ||
result_from_wire['access_token'], result_from_cache['access_token'], | ||
"We should get a fresh AT (via RT)") | ||
self.assertIsNotNone( | ||
# We used to assert it this way: | ||
# result_from_wire['access_token'] != result_from_cache['access_token'] | ||
# but ROPC in B2C tends to return the same AT we obtained seconds ago. | ||
# Now looking back, "refresh_token grant would return a brand new AT" | ||
# was just an empirical observation but never a committment in specs, | ||
# so we adjust our way to assert here. | ||
(result_from_cache or {}).get("access_token"), | ||
"We should get an AT from acquire_token_silent(...) call") | ||
|
||
def assertCacheWorksForApp(self, result_from_wire, scope): | ||
# Going to test acquire_token_silent(...) to locate an AT from cache | ||
|
@@ -70,7 +91,10 @@ def _test_username_password(self, | |
username, password, scopes=scope) | ||
self.assertLoosely(result) | ||
# self.assertEqual(None, result.get("error"), str(result)) | ||
self.assertCacheWorksForUser(result, scope, username=username) | ||
self.assertCacheWorksForUser( | ||
result, scope, | ||
username=username if ".b2clogin.com" not in authority else None, | ||
) | ||
|
||
|
||
THIS_FOLDER = os.path.dirname(__file__) | ||
|
@@ -95,23 +119,17 @@ def test_username_password(self): | |
self._test_username_password(**self.config) | ||
|
||
def _get_app_and_auth_code(self): | ||
from msal.oauth2cli.authcode import obtain_auth_code | ||
app = msal.ClientApplication( | ||
return _get_app_and_auth_code( | ||
self.config["client_id"], | ||
client_credential=self.config.get("client_secret"), | ||
authority=self.config.get("authority")) | ||
port = self.config.get("listen_port", 44331) | ||
redirect_uri = "http://localhost:%s" % port | ||
auth_request_uri = app.get_authorization_request_url( | ||
self.config["scope"], redirect_uri=redirect_uri) | ||
ac = obtain_auth_code(port, auth_uri=auth_request_uri) | ||
self.assertNotEqual(ac, None) | ||
return (app, ac, redirect_uri) | ||
client_secret=self.config.get("client_secret"), | ||
authority=self.config.get("authority"), | ||
port=self.config.get("listen_port", 44331), | ||
scopes=self.config["scope"], | ||
) | ||
|
||
def test_auth_code(self): | ||
self.skipUnlessWithConfig(["client_id", "scope"]) | ||
(self.app, ac, redirect_uri) = self._get_app_and_auth_code() | ||
|
||
result = self.app.acquire_token_by_authorization_code( | ||
ac, self.config["scope"], redirect_uri=redirect_uri) | ||
logger.debug("%s.cache = %s", | ||
|
@@ -314,7 +332,7 @@ def get_lab_user_secret(cls, lab_name="msidlab4"): | |
lab_name = lab_name.lower() | ||
if lab_name not in cls._secrets: | ||
logger.info("Querying lab user password for %s", lab_name) | ||
# Note: Short link won't work "https://aka.ms/GetLabUserSecret?Secret=%s" | ||
# Short link only works in browser "https://aka.ms/GetLabUserSecret?Secret=%s" | ||
# So we use the official link written in here | ||
# https://microsoft.sharepoint-df.com/teams/MSIDLABSExtended/SitePages/Programmatically-accessing-LAB-API%27s.aspx | ||
url = ("https://request.msidlab.com/api/GetLabUserSecret?code=KpY5uCcoKo0aW8VOL/CUO3wnu9UF2XbSnLFGk56BDnmQiwD80MQ7HA==&Secret=%s" | ||
|
@@ -417,3 +435,49 @@ def test_acquire_token_obo(self): | |
result = cca.acquire_token_silent(downstream_scopes, account) | ||
self.assertEqual(cca_result["access_token"], result["access_token"]) | ||
|
||
def _build_b2c_authority(self, policy): | ||
base = "https://msidlabb2c.b2clogin.com/msidlabb2c.onmicrosoft.com" | ||
return base + "/" + policy # We do not support base + "?p=" + policy | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. it's probably fine as supporting B2C in MSAL.Python it's a new feature. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. To be clear, that inline comment is inside an internal test case, so it is more like a remind-to-ourselves, but did not mean it as a limitation or deficiency. In fact, currently all the public materials we can find, uses the "base/policy" format, rather than "base?p=policy" format. Such as:
That being said, I did heard from a previous internal presentation that the "base?p=policy" format would also work. We (the entire MSAL fleet) might actually want to switch to that format at a later time, because the current way - putting policy inside authority - has some intrinsic issues, but that is a different topic. |
||
|
||
@unittest.skipIf(os.getenv("TRAVIS"), "Browser automation is not yet implemented") | ||
def test_b2c_acquire_token_by_auth_code(self): | ||
""" | ||
When prompted, you can manually login using this account: | ||
|
||
username="b2clocal@msidlabb2c.onmicrosoft.com" | ||
# This won't work https://msidlab.com/api/user?usertype=b2c | ||
password="***" # From https://aka.ms/GetLabUserSecret?Secret=msidlabb2c | ||
""" | ||
scopes = ["https://msidlabb2c.onmicrosoft.com/msaapp/user_impersonation"] | ||
(self.app, ac, redirect_uri) = _get_app_and_auth_code( | ||
"b876a048-55a5-4fc5-9403-f5d90cb1c852", | ||
client_secret=self.get_lab_user_secret("MSIDLABB2C-MSAapp-AppSecret"), | ||
authority=self._build_b2c_authority("B2C_1_SignInPolicy"), | ||
port=3843, # Lab defines 4 of them: [3843, 4584, 4843, 60000] | ||
scopes=scopes, | ||
) | ||
result = self.app.acquire_token_by_authorization_code( | ||
ac, scopes, redirect_uri=redirect_uri) | ||
logger.debug( | ||
"%s: cache = %s, id_token_claims = %s", | ||
self.id(), | ||
json.dumps(self.app.token_cache._cache, indent=4), | ||
json.dumps(result.get("id_token_claims"), indent=4), | ||
) | ||
self.assertIn( | ||
"access_token", result, | ||
"{error}: {error_description}".format( | ||
# Note: No interpolation here, cause error won't always present | ||
error=result.get("error"), | ||
error_description=result.get("error_description"))) | ||
self.assertCacheWorksForUser(result, scopes, username=None) | ||
|
||
def test_b2c_acquire_token_by_ropc(self): | ||
self._test_username_password( | ||
authority=self._build_b2c_authority("B2C_1_ROPC_Auth"), | ||
client_id="e3b9ad76-9763-4827-b088-80c7a7888f79", | ||
username="b2clocal@msidlabb2c.onmicrosoft.com", | ||
password=self.get_lab_user_secret("msidlabb2c"), | ||
scope=["https://msidlabb2c.onmicrosoft.com/msidlabb2capi/read"], | ||
) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: maybe use "on the allow list" instead...idk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually I thought about several options when I drafted that sentence. To me, "allow" does not sound right, because customer can of course choose a customized domain
Contoso.com
and we MSFT is in no position to disallow that. The precise cause of this error message is that theContoso.com
is not a so-called "well known" domain to MSFT Identity platform. And then again, it feels a bit weird to comment on other's name as "not well known", so I think "not whitelisted" sounds more neutral in tone. What do you think?