Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Subject Name and Issuer Authentication #71

Merged
merged 7 commits into from
Jul 3, 2019
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions msal/application.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,17 @@ def decorate_scope(
return list(decorated)


def extract_certs(public_cert_content):
# Parses raw public certificate file contents and returns a list of strings
# Usage: headers = {"x5c": extract_certs(open("my_cert.pem").read())}
public_certificates = re.findall(
r'\-+BEGIN CERTIFICATE.+\-+(?P<cert_value>[^-]+)\-+END CERTIFICATE.+\-+',
rayluo marked this conversation as resolved.
Show resolved Hide resolved
public_cert_content, re.I)
if len(public_certificates):
rayluo marked this conversation as resolved.
Show resolved Hide resolved
return [cert.strip() for cert in public_certificates]
return [public_cert_content.strip()]
rayluo marked this conversation as resolved.
Show resolved Hide resolved


class ClientApplication(object):

def __init__(
Expand All @@ -59,7 +70,7 @@ def __init__(
verify=True, proxies=None, timeout=None):
"""Create an instance of application.

:param client_id: Your app has a clinet_id after you register it on AAD.
:param client_id: Your app has a client_id after you register it on AAD.
:param client_credential:
For :class:`PublicClientApplication`, you simply use `None` here.
For :class:`ConfidentialClientApplication`,
Expand All @@ -69,8 +80,12 @@ def __init__(
{
"private_key": "...-----BEGIN PRIVATE KEY-----...",
"thumbprint": "A1B2C3D4E5F6...",
"public_certificate": "...-----BEGIN CERTIFICATE-----..." (Optional. See below.)
}

public_certificate (optional) is public key certificate which is
sent through 'x5c' JWT header only for
subject name and issuer authentication to support cert auto rolls
:param str authority:
A URL that identifies a token authority. It should be of the format
https://login.microsoftonline.com/your_tenant
Expand Down Expand Up @@ -113,9 +128,12 @@ def _build_client(self, client_credential, authority):
if isinstance(client_credential, dict):
assert ("private_key" in client_credential
and "thumbprint" in client_credential)
headers = {}
if 'public_certificate' in client_credential:
headers["x5c"] = extract_certs(client_credential['public_certificate'])
signer = JwtSigner(
client_credential["private_key"], algorithm="RS256",
sha1_thumbprint=client_credential.get("thumbprint"))
sha1_thumbprint=client_credential.get("thumbprint"), headers=headers)
client_assertion = signer.sign_assertion(
audience=authority.token_endpoint, issuer=self.client_id)
client_assertion_type = Client.CLIENT_ASSERTION_TYPE_JWT
Expand Down
40 changes: 40 additions & 0 deletions tests/test_application.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,46 @@ def test_client_certificate(self):
self.assertIn('access_token', result)
self.assertCacheWorks(result, app.acquire_token_silent(scope, account=None))

def test_extract_a_tag_less_public_cert(self):
pem = "my_cert"
self.assertEqual(["my_cert"], extract_certs(pem))

def test_extract_a_tag_enclosed_cert(self):
pem = """
-----BEGIN CERTIFICATE-----
my_cert
-----END CERTIFICATE-----
"""
self.assertEqual(["my_cert"], extract_certs(pem))

def test_extract_multiple_tag_enclosed_certs(self):
pem = """
-----BEGIN CERTIFICATE-----
my_cert1
-----END CERTIFICATE-----

-----BEGIN CERTIFICATE-----
my_cert2
-----END CERTIFICATE-----
"""
self.assertEqual(["my_cert1", "my_cert2"], extract_certs(pem))

@unittest.skipUnless("public_certificate" in CONFIG, "Missing Public cert")
def test_subject_name_issuer_authentication(self):
assert ("private_key_file" in CONFIG
and "thumbprint" in CONFIG and "public_certificate" in CONFIG)
with open(os.path.join(THIS_FOLDER, CONFIG['private_key_file'])) as f:
pem = f.read()
with open(os.path.join(THIS_FOLDER, CONFIG['public_certificate'])) as f:
public_certificate = f.read()
app = ConfidentialClientApplication(
CONFIG['client_id'], authority=CONFIG["authority"],
client_credential={"private_key": pem, "thumbprint": CONFIG["thumbprint"],
"public_certificate": public_certificate})
scope = CONFIG.get("scope", [])
result = app.acquire_token_for_client(scope)
self.assertIn('access_token', result)
self.assertCacheWorks(result, app.acquire_token_silent(scope, account=None))

@unittest.skipUnless("client_id" in CONFIG, "client_id missing")
class TestPublicClientApplication(Oauth2TestCase):
Expand Down