Skip to content

Commit

Permalink
Add GCE Credentials implementation supplying an ID token.
Browse files Browse the repository at this point in the history
 - Add Signer and IDTokenCredentials implementation
  • Loading branch information
kryzthov committed May 16, 2018
1 parent 2f5cb2d commit 21a9f5b
Show file tree
Hide file tree
Showing 2 changed files with 174 additions and 2 deletions.
6 changes: 5 additions & 1 deletion google/auth/compute_engine/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,12 @@
"""Google Compute Engine authentication."""

from google.auth.compute_engine.credentials import Credentials
from google.auth.compute_engine.credentials import IDTokenCredentials
from google.auth.compute_engine.credentials import Signer


__all__ = [
'Credentials'
'Credentials',
'IDTokenCredentials',
'Signer',
]
170 changes: 169 additions & 1 deletion google/auth/compute_engine/credentials.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2016 Google Inc.
#Copyright 2016 Google Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -19,11 +19,20 @@
"""

import base64
import datetime

import six

from google.auth import _helpers
from google.auth import credentials
from google.auth import crypt
from google.auth import exceptions
from google.auth import jwt
from google.auth.compute_engine import _metadata
from google.auth.transport.requests import AuthorizedSession
from google.auth.transport.requests import Request
from google.oauth2 import _client


class Credentials(credentials.ReadOnlyScoped, credentials.Credentials):
Expand Down Expand Up @@ -108,3 +117,162 @@ def service_account_email(self):
def requires_scopes(self):
"""False: Compute Engine credentials can not be scoped."""
return False


class Signer(crypt.Signer):
"""Signer that uses the default service account of a a GCE instance.
See https://cloud.google.com/iam/reference/rest/v1/projects.serviceAccounts/signBlob
"""

_REQUEST_PATH = (
"https://iam.googleapis.com/v1/"
"projects/{project}/serviceAccounts/{service_account}:signBlob")

def __init__(self):
self._creds = Credentials()
request = Request()
project_id = _metadata.get_project_id(request)
sa_info = _metadata.get_service_account_info(request)
self._service_account_email = sa_info['email']
self._request_path = self._REQUEST_PATH.format(
project=project_id,
service_account=self._service_account_email)
self._session = AuthorizedSession(self._creds)

@property
def key_id(self):
"""Optional[str]: The key ID used to identify this private key.
There is no known key ID associated to the default service account.
Any sign() call may be signed with a different key.
"""
return None

def sign(self, message):
"""Signs a message.
Args:
message (Union[str, bytes]): The message to be signed.
Returns:
bytes: The signature of the message.
"""
if not isinstance(message, bytes):
message = message.encode()
body = {
'bytesToSign': base64.b64encode(message).decode(),
}
rep = self._session.post(self._request_path, json=body)
rep.raise_for_status()
# Note: the response includes a key ID in rep.json()['keyId']
return base64.b64decode(rep.json()["signature"].encode())

@property
def service_account_email(self):
"""The email of the default service account on this GCE instance."""
return self._service_account_email


_DEFAULT_TOKEN_LIFETIME_SECS = 3600 # 1 hour in seconds
_DEFAULT_TOKEN_URI = 'https://www.googleapis.com/oauth2/v4/token'


class IDTokenCredentials(credentials.Credentials, credentials.Signing):
"""Open ID Connect ID Token-based service account credentials.
These credentials relies on the default service account of a GCE instance.
"""
def __init__(self, target_audience, token_uri=_DEFAULT_TOKEN_URI,
additional_claims=None):
"""
Args:
target_audience (str): The intended audience for these credentials,
used when requesting the ID Token. The ID Token's ``aud`` claim
will be set to this string.
token_uri (str): The OAuth 2.0 Token URI.
additional_claims (Mapping[str, str]): Any additional claims for
the JWT assertion used in the authorization grant.
"""
super(IDTokenCredentials, self).__init__()
self._signer = Signer()
self._token_uri = token_uri
self._target_audience = target_audience

if additional_claims is not None:
self._additional_claims = additional_claims
else:
self._additional_claims = {}

def with_target_audience(self, target_audience):
"""Create a copy of these credentials with the specified target
audience.
Args:
target_audience (str): The intended audience for these credentials,
used when requesting the ID Token.
Returns:
google.auth.service_account.IDTokenCredentials: A new credentials
instance.
"""
return self.__class__(
self._signer,
service_account_email=self._service_account_email,
token_uri=self._token_uri,
target_audience=target_audience,
additional_claims=self._additional_claims.copy())

def _make_authorization_grant_assertion(self):
"""Create the OAuth 2.0 assertion.
This assertion is used during the OAuth 2.0 grant to acquire an
ID token.
Returns:
bytes: The authorization grant assertion.
"""
now = _helpers.utcnow()
lifetime = datetime.timedelta(seconds=_DEFAULT_TOKEN_LIFETIME_SECS)
expiry = now + lifetime

payload = {
'iat': _helpers.datetime_to_secs(now),
'exp': _helpers.datetime_to_secs(expiry),
# The issuer must be the service account email.
'iss': self.service_account_email,
# The audience must be the auth token endpoint's URI
'aud': self._token_uri,
# The target audience specifies which service the ID token is
# intended for.
'target_audience': self._target_audience
}

payload.update(self._additional_claims)

token = jwt.encode(self._signer, payload)

return token

@_helpers.copy_docstring(credentials.Credentials)
def refresh(self, request):
assertion = self._make_authorization_grant_assertion()
access_token, expiry, _ = _client.id_token_jwt_grant(
request, self._token_uri, assertion)
self.token = access_token
self.expiry = expiry

@property
@_helpers.copy_docstring(credentials.Signing)
def signer_email(self):
return self._signer.service_account_email

@property
@_helpers.copy_docstring(credentials.Signing)
def signer(self):
return self._signer

@_helpers.copy_docstring(credentials.Signing)
def sign_bytes(self, message):
return self._signer.sign(message)

@property
def service_account_email(self):
"""The service account email."""
return self._signer.service_account_email

0 comments on commit 21a9f5b

Please sign in to comment.