diff --git a/.travis.yml b/.travis.yml index d16e16c..d98723f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,6 +1,5 @@ language: python python: - - "2.6" - "2.7" - "3.4" - "3.5" @@ -11,7 +10,9 @@ install: - pip install . - pip install coverage - pip install coveralls + - pip install flake8 script: + - flake8 --ignore E501,E722 - coverage run --source=aws_google_auth/ --omit=aws_google_auth/tests/* setup.py test after_script: - coverage report diff --git a/README.rst b/README.rst index bac4d06..c652467 100644 --- a/README.rst +++ b/README.rst @@ -154,12 +154,11 @@ Storage of profile credentials Through the use of AWS profiles, using the ``-p`` or ``--profile`` flag, the ``aws-google-auth`` utility will store the supplied username, IDP and SP details in your ``./aws/config`` files. -When re-authenticating using the same profile, the values will be remembered to speed up the re-authentication process. -This enables an approach that enables you to enter your username, IPD and SP values once and then after only need to re-enter your password (and MFA if enabled). - Creating an alias as below can be a quick and easy way to re-authenticate with a simple command shortcut. -``alias aws-development='unset AWS_PROFILE; aws-google-auth -p aws-dev; export AWS_PROFILE=aws-dev'`` +``` +alias aws-development='unset AWS_PROFILE; aws-google-auth -I $GOOGLE_IDP_ID -S $GOOGLE_SP_ID -u $USERNAME -p aws-dev ; export AWS_PROFILE=aws-dev' +``` Notes on Authentication diff --git a/aws_google_auth/__init__.py b/aws_google_auth/__init__.py index 31ed977..1324ced 100644 --- a/aws_google_auth/__init__.py +++ b/aws_google_auth/__init__.py @@ -1,359 +1,17 @@ #!/usr/bin/env python from . import _version -from . import prepare +from . import configuration +from . import util +from . import google +from . import amazon import argparse import getpass -import base64 -import boto3 import os import sys -import requests -import json -from bs4 import BeautifulSoup -from lxml import etree -import configparser from tzlocal import get_localzone -# In Python3, the library 'urlparse' was renamed to 'urllib.parse'. For this to -# maintain compatibility with both Python 2 and Python 3, the import must be -# dynamically chosen based on the version detected. -if sys.version_info >= (3, 0): - import urllib.parse as urlparse -else: - import urlparse - -REGION = os.getenv("AWS_DEFAULT_REGION") or "ap-southeast-2" -IDP_ID = os.getenv("GOOGLE_IDP_ID") -SP_ID = os.getenv("GOOGLE_SP_ID") -USERNAME = os.getenv("GOOGLE_USERNAME") -MAX_DURATION = 3600 -DURATION = int(os.getenv("DURATION") or MAX_DURATION) -PROFILE = os.getenv("AWS_PROFILE") -ASK_ROLE = os.getenv("AWS_ASK_ROLE") or False -ROLE_ARN = os.getenv("AWS_ROLE_ARN") -U2F_DISABLED = os.getenv("U2F_DISABLED") or False - - -if not U2F_DISABLED: - try: - from . import u2f - except ImportError: - print("Failed to import U2F libraries, U2F login unavailable. Other " - "methods can still continue.") - - -class GoogleAuth: - def __init__(self, **kwargs): - """The GoogleAuth object holds authentication state - for a given session. You need to supply: - - username: FQDN Google username, eg first.last@example.com - password: obvious - idp_id: Google's assigned IdP identifier for your G-suite account - sp_id: Google's assigned SP identifier for your AWS SAML app - - Optionally, you can supply: - duration_seconds: number of seconds for the session to be active (max 3600) - """ - - self.version = _version.__version__ - - self.username = kwargs.pop('username') - self.password = kwargs.pop('password') - self.idp_id = kwargs.pop('idp_id') - self.sp_id = kwargs.pop('sp_id') - if kwargs.get('duration_seconds'): - try: - self.duration_seconds = int(kwargs.pop('duration_seconds')) - except ValueError as e: - raise ValueError('GoogleAuth: duration_seconds must be an integer') - - if self.duration_seconds > 3600: - print("WARNING: Clamping duration_seconds to 3600") - self.duration_seconds = 3600 - - self.login_url = "https://accounts.google.com/o/saml2/initsso?idpid=%s&spid=%s&forceauthn=false" % (self.idp_id, self.sp_id) - - def do_login(self): - self.session = requests.Session() - self.session.headers['User-Agent'] = "AWS Sign-in/%s (Cevo aws-google-auth)" % self.version - sess = self.session.get(self.login_url) - sess.raise_for_status() - - # Collect information from the page source - first_page = BeautifulSoup(sess.text, 'html.parser') - gxf = first_page.find('input', {'name': 'gxf'}).get('value') - self.cont = first_page.find('input', {'name': 'continue'}).get('value') - page = first_page.find('input', {'name': 'Page'}).get('value') - sign_in = first_page.find('input', {'name': 'signIn'}).get('value') - account_login_url = first_page.find('form', {'id': 'gaia_loginform'}).get('action') - - payload = { - 'bgresponse': 'js_disabled', - 'checkConnection': '', - 'checkedDomains': 'youtube', - 'continue': self.cont, - 'Email': self.username, - 'gxf': gxf, - 'identifier-captcha-input': '', - 'identifiertoken': '', - 'identifiertoken_audio': '', - 'ltmpl': 'popup', - 'oauth': 1, - 'Page': page, - 'Passwd': '', - 'PersistentCookie': 'yes', - 'ProfileInformation': '', - 'pstMsg': 0, - 'sarp': 1, - 'scc': 1, - 'SessionState': '', - 'signIn': sign_in, - '_utf8': '?', - } - - # GALX is sometimes not there - try: - galx = first_page.find('input', {'name': 'GALX'}).get('value') - payload['GALX'] = galx - except: - pass - - # POST to account login info page, to collect profile and session info - sess = self.session.post(account_login_url, data=payload) - sess.raise_for_status() - self.session.headers['Referer'] = sess.url - - # Collect ProfileInformation, SessionState, signIn, and Password Challenge URL - challenge_page = BeautifulSoup(sess.text, 'html.parser') - - profile_information = challenge_page.find('input', {'name': 'ProfileInformation'}).get('value') - session_state = challenge_page.find('input', {'name': 'SessionState'}).get('value') - sign_in = challenge_page.find('input', {'name': 'signIn'}).get('value') - passwd_challenge_url = challenge_page.find('form', {'id': 'gaia_loginform'}).get('action') - - # Update the payload - payload['SessionState'] = session_state - payload['ProfileInformation'] = profile_information - payload['signIn'] = sign_in - payload['Passwd'] = self.password - - # POST to Authenticate Password - sess = self.session.post(passwd_challenge_url, data=payload) - sess.raise_for_status() - response_page = BeautifulSoup(sess.text, 'html.parser') - error = response_page.find(class_='error-msg') - cap = response_page.find('input', {'name': 'logincaptcha'}) - - # Were there any errors logging in? Could be invalid username or password - # There could also sometimes be a Captcha, which means Google thinks you, - # or someone using the same outbound IP address as you, is a bot. - if error is not None: - raise ValueError('Invalid username or password') - - if cap is not None: - raise ValueError('Captcha Required. Manually Login to remove this.') - - self.session.headers['Referer'] = sess.url - - # Was there an MFA challenge? - if "challenge/totp/" in sess.url: - sess = self.handle_totp(sess) - elif "challenge/ipp/" in sess.url: - sess = self.handle_sms(sess) - elif "challenge/az/" in sess.url: - sess = self.handle_prompt(sess) - elif "challenge/sk/" in sess.url: - sess = self.handle_sk(sess) - - # ... there are different URLs for backup codes (printed) - # and security keys (eg yubikey) as well - # save for later - self.session_state = sess - - def parse_saml(self): - if self.session_state is None: - raise StandardError('You must use do_login() before calling parse_saml()') - - parsed = BeautifulSoup(self.session_state.text, 'html.parser') - try: - saml_element = parsed.find('input', {'name': 'SAMLResponse'}).get('value') - except: - raise StandardError('Could not find SAML response, check your credentials') - - return saml_element - - def handle_sk(self, sess): - response_page = BeautifulSoup(sess.text, 'html.parser') - challenge_url = sess.url.split("?")[0] - - challenges_txt = response_page.find('input', {'name': "id-challenge"}).get('value') - challenges = json.loads(challenges_txt) - - facet_url = urlparse.urlparse(challenge_url) - facet = facet_url.scheme + "://" + facet_url.netloc - app_id = challenges["appId"] - u2f_challenges = [] - for c in challenges["challenges"]: - c["appId"] = app_id - u2f_challenges.append(c) - - auth_response = json.dumps(u2f.u2f_auth(u2f_challenges, facet)) - - payload = { - 'challengeId': response_page.find('input', {'name': 'challengeId'}).get('value'), - 'challengeType': response_page.find('input', {'name': 'challengeType'}).get('value'), - 'continue': response_page.find('input', {'name': 'continue'}).get('value'), - 'scc': response_page.find('input', {'name': 'scc'}).get('value'), - 'sarp': response_page.find('input', {'name': 'sarp'}).get('value'), - 'checkedDomains': response_page.find('input', {'name': 'checkedDomains'}).get('value'), - 'pstMsg': response_page.find('input', {'name': 'pstMsg'}).get('value'), - 'TL': response_page.find('input', {'name': 'TL'}).get('value'), - 'gxf': response_page.find('input', {'name': 'gxf'}).get('value'), - 'id-challenge': challenges_txt, - 'id-assertion': auth_response, - 'TrustDevice': 'on', - } - - sess = self.session.post(challenge_url, data=payload) - sess.raise_for_status() - - return sess - - def handle_sms(self, sess): - response_page = BeautifulSoup(sess.text, 'html.parser') - challenge_url = sess.url.split("?")[0] - - try: - sms_token = raw_input("Enter SMS token: G-") or None - except NameError: - sms_token = input("Enter SMS token: G-") or None - - payload = { - 'challengeId': response_page.find('input', {'name': 'challengeId'}).get('value'), - 'challengeType': response_page.find('input', {'name': 'challengeType'}).get('value'), - 'continue': response_page.find('input', {'name': 'continue'}).get('value'), - 'scc': response_page.find('input', {'name': 'scc'}).get('value'), - 'sarp': response_page.find('input', {'name': 'sarp'}).get('value'), - 'checkedDomains': response_page.find('input', {'name': 'checkedDomains'}).get('value'), - 'pstMsg': response_page.find('input', {'name': 'pstMsg'}).get('value'), - 'TL': response_page.find('input', {'name': 'TL'}).get('value'), - 'gxf': response_page.find('input', {'name': 'gxf'}).get('value'), - 'Pin': sms_token, - 'TrustDevice': 'on', - } - - # Submit IPP (SMS code) - sess = self.session.post(challenge_url, data=payload) - sess.raise_for_status() - - return sess - - def handle_prompt(self, sess): - response_page = BeautifulSoup(sess.text, 'html.parser') - challenge_url = sess.url.split("?")[0] - - data_key = response_page.find('div', {'data-api-key': True}).get('data-api-key') - data_tx_id = response_page.find('div', {'data-tx-id': True}).get('data-tx-id') - - # Need to post this to the verification/pause endpoint - await_url = "https://content.googleapis.com/cryptauth/v1/authzen/awaittx?alt=json&key=%s" % data_key - await_body = {'txId': data_tx_id} - - print("Open the Google App, and tap 'Yes' on the prompt to sign in ...") - - self.session.headers['Referer'] = sess.url - response = self.session.post(await_url, json=await_body) - parsed = json.loads(response.text) - - payload = { - 'challengeId': response_page.find('input', {'name': 'challengeId'}).get('value'), - 'challengeType': response_page.find('input', {'name': 'challengeType'}).get('value'), - 'continue': response_page.find('input', {'name': 'continue'}).get('value'), - 'scc': response_page.find('input', {'name': 'scc'}).get('value'), - 'sarp': response_page.find('input', {'name': 'sarp'}).get('value'), - 'checkedDomains': response_page.find('input', {'name': 'checkedDomains'}).get('value'), - 'checkConnection': 'youtube:1295:1', - 'pstMsg': response_page.find('input', {'name': 'pstMsg'}).get('value'), - 'TL': response_page.find('input', {'name': 'TL'}).get('value'), - 'gxf': response_page.find('input', {'name': 'gxf'}).get('value'), - 'token': parsed['txToken'], - 'action': response_page.find('input', {'name': 'action'}).get('value'), - 'TrustDevice': 'on', - } - - sess = self.session.post(challenge_url, data=payload) - sess.raise_for_status() - - return sess - - def handle_totp(self, sess): - response_page = BeautifulSoup(sess.text, 'html.parser') - tl = response_page.find('input', {'name': 'TL'}).get('value') - gxf = response_page.find('input', {'name': 'gxf'}).get('value') - challenge_url = sess.url.split("?")[0] - challenge_id = challenge_url.split("totp/")[1] - - try: - mfa_token = raw_input("MFA token: ") or None - except NameError: - mfa_token = input("MFA token: ") or None - - if not mfa_token: - raise ValueError("MFA token required for % but none supplied" % self.username) - - payload = { - 'challengeId': challenge_id, - 'challengeType': 6, - 'continue': self.cont, - 'scc': 1, - 'sarp': 1, - 'checkedDomains': 'youtube', - 'pstMsg': 0, - 'TL': tl, - 'gxf': gxf, - 'Pin': mfa_token, - 'TrustDevice': 'on', - } - - # Submit TOTP - sess = self.session.post(challenge_url, data=payload) - sess.raise_for_status() - - return sess - - -def pick_one(roles): - while True: - for i, role in enumerate(roles): - print("[{:>3d}] {}".format(i+1, role)) - - prompt = 'Type the number (1 - {:d}) of the role to assume: '.format(len(roles)) - try: - choice = raw_input(prompt) - except NameError: - choice = input(prompt) - - try: - num = int(choice) - return list(roles.items())[num - 1] - except: - print("Invalid choice, try again") - - -def parse_roles(doc): - roles = {} - for x in doc.xpath('//*[@Name = "https://aws.amazon.com/SAML/Attributes/Role"]//text()'): - if "arn:aws:iam:" not in x: - continue - - res = x.split(',') - roles[res[0]] = res[1] - - return roles - def parse_args(args): parser = argparse.ArgumentParser( @@ -361,22 +19,15 @@ def parse_args(args): description="Acquire temporary AWS credentials via Google SSO", ) - parser.add_argument('-u', '--username', default=USERNAME, - help='Google Apps username ($GOOGLE_USERNAME)') - parser.add_argument('-I', '--idp-id', default=IDP_ID, - help='Google SSO IDP identifier ($GOOGLE_IDP_ID)') - parser.add_argument('-S', '--sp-id', default=SP_ID, - help='Google SSO SP identifier ($GOOGLE_SP_ID)') - parser.add_argument('-R', '--region', default=REGION, - help='AWS region endpoint ($AWS_DEFAULT_REGION)') - parser.add_argument('-d', '--duration', type=int, default=DURATION, - help='Credential duration ($DURATION)') - parser.add_argument('-p', '--profile', default=PROFILE, - help='AWS profile (defaults to value of $AWS_PROFILE, then falls back to \'sts\')') + parser.add_argument('-u', '--username', help='Google Apps username ($GOOGLE_USERNAME)') + parser.add_argument('-I', '--idp-id', help='Google SSO IDP identifier ($GOOGLE_IDP_ID)') + parser.add_argument('-S', '--sp-id', help='Google SSO SP identifier ($GOOGLE_SP_ID)') + parser.add_argument('-R', '--region', help='AWS region endpoint ($AWS_DEFAULT_REGION)') + parser.add_argument('-d', '--duration', type=int, help='Credential duration ($DURATION)') + parser.add_argument('-p', '--profile', help='AWS profile (defaults to value of $AWS_PROFILE, then falls back to \'sts\')') role_group = parser.add_mutually_exclusive_group() - role_group.add_argument('-a', '--ask-role', default=ASK_ROLE, - action='store_true', help='Set true to always pick the role') + role_group.add_argument('-a', '--ask-role', action='store_true', help='Set true to always pick the role') role_group.add_argument('-r', '--role-arn', help='The ARN of the role to assume') parser.add_argument('-V', '--version', action='version', version='%(prog)s {version}'.format(version=_version.__version__)) @@ -392,133 +43,51 @@ def main(): def cli(cli_args): - args = parse_args(args=cli_args) - if args.duration > MAX_DURATION: - print("Duration must be less than or equal to %d" % MAX_DURATION) - args.duration = MAX_DURATION - - config = prepare.get_prepared_config( - args.profile, - args.region, - args.username, - args.idp_id, - args.sp_id, - args.duration, - args.ask_role, - args.role_arn - ) - - if config.google_username is None: - try: - config.google_username = raw_input("Google username: ") - except NameError: - config.google_username = input("Google username: ") - else: - print("Google username: " + config.google_username) - - if config.google_idp_id is None: - try: - config.google_idp_id = raw_input("Google idp: ") - except NameError: - config.google_idp_id = input("Google idp: ") - - if config.google_sp_id is None: - try: - config.google_sp_id = raw_input("Google sp: ") - except NameError: - config.google_sp_id = input("Google sp: ") - - passwd = getpass.getpass() - - google = GoogleAuth( - username=config.google_username, - password=passwd, - idp_id=config.google_idp_id, - sp_id=config.google_sp_id - ) - - google.do_login() - encoded_saml = google.parse_saml() - - # Parse out the roles from the SAML so we can offer them as a choice - doc = etree.fromstring(base64.b64decode(encoded_saml)) - roles = parse_roles(doc) - + # If there are arguments that are needed, we can interactively prompt the + # user. Note, the environment variables here are also in configuration.py + # but we need to check the presense here to know if we need to prompot. + # This is intentional. Any non-required params just get passed directly in, + # as we don't care if they were set or not. + username = args.username or os.getenv("GOOGLE_USERNAME") or util.Util.get_input("Google username: ") + idp_id = args.idp_id or os.getenv("GOOGLE_IDP_ID") or util.Util.get_input("Google IDP ID: ") + sp_id = args.sp_id or os.getenv("GOOGLE_SP_ID") or util.Util.get_input("Google SP ID: ") + + # There is no way (intentional) to pass in the password via the command + # line nor environment variables. This prevents password leakage. + passwd = getpass.getpass("Google Password: ") + + # Build the configuration with all the user's options + config = configuration.Configuration( + ask_role=args.ask_role, + duration=args.duration, + idp_id=idp_id, + profile=args.profile, + region=args.region, + role_arn=args.role_arn, + sp_id=sp_id, + username=username, + password=passwd) + + google_client = google.Google(config) + google_client.do_login() + encoded_saml = google_client.parse_saml() + + amazon_client = amazon.Amazon(config, encoded_saml) + roles = amazon_client.roles + + # Determine the provider and the role arn (if the the user provided isn't an option) if config.role_arn in roles and not config.ask_role: config.provider = roles[config.role_arn] else: - config.role_arn, config.provider = pick_one(roles) + config.role_arn, config.provider = util.Util.pick_a_role(roles) print("Assuming " + config.role_arn) + print("Credentials Expiration: " + format(amazon_client.expiration.astimezone(get_localzone()))) - sts = boto3.client('sts', region_name=config.region) - token = sts.assume_role_with_saml( - RoleArn=config.role_arn, - PrincipalArn=config.provider, - SAMLAssertion=encoded_saml, - DurationSeconds=config.duration) - - print("Credentials Expiration: " + format(token['Credentials']['Expiration'].astimezone(get_localzone()))) - - print_exports(token) - _store(config, token) - - -def print_exports(token): - export_template = "export AWS_ACCESS_KEY_ID='{}' AWS_SECRET_ACCESS_KEY='{}' AWS_SESSION_TOKEN='{}' AWS_SESSION_EXPIRATION='{}'" - - formatted = export_template.format( - token['Credentials']['AccessKeyId'], - token['Credentials']['SecretAccessKey'], - token['Credentials']['SessionToken'], - token['Credentials']['Expiration'].strftime('%Y-%m-%dT%H:%M:%S%z') - ) - - print(formatted) - - -def _store(config, aws_session_token): - - def store_config(profile, config_location, storer): - assert (profile is not None), "Can not store config/credentials if the AWS_PROFILE is None." - config_file = configparser.RawConfigParser() - config_file.read(config_location) - - if not config_file.has_section(profile): - config_file.add_section(profile) - - storer(config_file, profile) - - with open(config_location, 'w+') as f: - try: - config_file.write(f) - finally: - f.close() - - def credentials_storer(config_file, profile): - config_file.set(profile, 'aws_access_key_id', aws_session_token['Credentials']['AccessKeyId']) - config_file.set(profile, 'aws_secret_access_key', aws_session_token['Credentials']['SecretAccessKey']) - config_file.set(profile, 'aws_session_token', aws_session_token['Credentials']['SessionToken']) - config_file.set(profile, 'aws_security_token', aws_session_token['Credentials']['SessionToken']) - config_file.set(profile, 'aws_session_expiration', aws_session_token['Credentials']['Expiration'].strftime('%Y-%m-%dT%H:%M:%S%z')) - - def config_storer(config_file, profile): - config_file.set(profile, 'region', config.region) - config_file.set(profile, 'output', config.output_format) - config_file.set(profile, 'google_config.role_arn', config.role_arn) - config_file.set(profile, 'google_config.provider', config.provider) - config_file.set(profile, 'google_config.google_idp_id', config.google_idp_id) - config_file.set(profile, 'google_config.google_sp_id', config.google_sp_id) - config_file.set(profile, 'google_config.google_username', config.google_username) - config_file.set(profile, 'google_config.duration', config.duration) - - store_config(config.profile, config.aws_credentials_location, credentials_storer) - if config.profile == 'default': - store_config(config.profile, config.aws_config_location, config_storer) - else: - store_config('profile {}'.format(config.profile), config.aws_config_location, config_storer) + amazon_client.print_export_line() + config.write(amazon_client) if __name__ == '__main__': diff --git a/aws_google_auth/_version.py b/aws_google_auth/_version.py index 6561790..d62d967 100644 --- a/aws_google_auth/_version.py +++ b/aws_google_auth/_version.py @@ -1 +1 @@ -__version__ = "0.0.15" +__version__ = "0.0.16" diff --git a/aws_google_auth/amazon.py b/aws_google_auth/amazon.py new file mode 100644 index 0000000..57d1b90 --- /dev/null +++ b/aws_google_auth/amazon.py @@ -0,0 +1,68 @@ +#!/usr/bin/env python + +import boto3 +import base64 +from lxml import etree + + +class Amazon: + + def __init__(self, config, encoded_saml): + self.config = config + self.encoded_saml = encoded_saml + self.__token = None + + @property + def sts_client(self): + return boto3.client('sts', region_name=self.config.region) + + @property + def decoded_saml(self): + return base64.b64decode(self.encoded_saml) + + @property + def token(self): + if self.__token is None: + self.__token = self.sts_client.assume_role_with_saml( + RoleArn=self.config.role_arn, + PrincipalArn=self.config.provider, + SAMLAssertion=self.encoded_saml, + DurationSeconds=self.config.duration) + return self.__token + + @property + def access_key_id(self): + return self.token['Credentials']['AccessKeyId'] + + @property + def secret_access_key(self): + return self.token['Credentials']['SecretAccessKey'] + + @property + def session_token(self): + return self.token['Credentials']['SessionToken'] + + @property + def expiration(self): + return self.token['Credentials']['Expiration'] + + def print_export_line(self): + export_template = "export AWS_ACCESS_KEY_ID='{}' AWS_SECRET_ACCESS_KEY='{}' AWS_SESSION_TOKEN='{}' AWS_SESSION_EXPIRATION='{}'" + + formatted = export_template.format( + self.access_key_id, + self.secret_access_key, + self.session_token, + self.expiration.strftime('%Y-%m-%dT%H:%M:%S%z')) + + print(formatted) + + @property + def roles(self): + doc = etree.fromstring(self.decoded_saml) + roles = {} + for x in doc.xpath('//*[@Name = "https://aws.amazon.com/SAML/Attributes/Role"]//text()'): + if "arn:aws:iam:" in x: + res = x.split(',') + roles[res[0]] = res[1] + return roles diff --git a/aws_google_auth/configuration.py b/aws_google_auth/configuration.py new file mode 100644 index 0000000..ebd666d --- /dev/null +++ b/aws_google_auth/configuration.py @@ -0,0 +1,166 @@ +#!/usr/bin/env python + +import os +import botocore.session +import configparser + +from . import util + + +class Configuration: + + def __init__(self, **kwargs): + self.set_ask_role(kwargs.get('ask_role', None) or os.getenv("AWS_ASK_ROLE") or False) + self.set_duration(kwargs.get('duration', None) or os.getenv("DURATION") or self.max_duration) + self.set_u2f_disabled(kwargs.get('u2f_disabled', None) or os.getenv("U2F_DISABLED") or False) + self.set_region(kwargs.get('region', None) or os.getenv("AWS_DEFAULT_REGION") or 'ap-southeast-2') + self.set_profile(kwargs.get('profile', None) or os.getenv("AWS_PROFILE") or 'sts') + self.set_role_arn(kwargs.get('role_arn', None) or os.getenv("AWS_ROLE_ARN") or None) + self.set_idp_id(kwargs.get('idp_id', None) or os.getenv("GOOGLE_IDP_ID") or None) + self.set_sp_id(kwargs.get('sp_id', None) or os.getenv("GOOGLE_SP_ID") or None) + self.set_username(kwargs.get('username', None) or os.getenv("GOOGLE_USERNAME") or None) + self.password = kwargs.get('password', None) or None + self.__boto_session = botocore.session.Session() + + @property + def max_duration(self): + return 3600 + + def get_ask_role(self): + return self.__ask_role + + def set_ask_role(self, value): + assert (value.__class__ is bool), "Expected ask_role to be a boolean. Got {}.".format(value.__class__) + self.__ask_role = value + + ask_role = property(get_ask_role, set_ask_role) + + def get_duration(self): + return self.__duration + + def set_duration(self, duration_seconds): + assert (duration_seconds.__class__ is int), "Expected duration to be an integer. Got {}.".format(duration_seconds.__class__) + assert (duration_seconds > 0), "Expected duration to be greater than 0. Got {}.".format(duration_seconds) + assert (duration_seconds <= self.max_duration), "Expected duration to be less than or equal to max_duration ({}). Got {}.".format(self.max_duration, duration_seconds) + self.__duration = duration_seconds + + duration = property(get_duration, set_duration) + + def get_profile(self): + return self.__profile + + def set_profile(self, profile): + assert (profile.__class__ is str), "Expected profile to be a string. Got {}.".format(profile.__class__) + self.__profile = profile + + profile = property(get_profile, set_profile) + + def get_region(self): + return self.__region + + def set_region(self, region): + assert (region.__class__ is str), "Expected region to be a string. Got {}.".format(region.__class__) + self.__region = region + + region = property(get_region, set_region) + + def get_idp_id(self): + return self.__idp_id + + def set_idp_id(self, idp): + assert (idp is not None), "Expected idp_id to be set to non-None value." + self.__idp_id = idp + + idp_id = property(get_idp_id, set_idp_id) + + def get_sp_id(self): + return self.__sp_id + + def set_sp_id(self, sp_id): + assert (sp_id is not None), "Expected sp_id to be set to non-None value." + self.__sp_id = sp_id + + sp_id = property(get_sp_id, set_sp_id) + + def get_username(self): + return self.__username + + def set_username(self, username): + assert (username.__class__ is str), "Expected username to be a string. Got {}.".format(username.__class__) + self.__username = username + + username = property(get_username, set_username) + + def get_role_arn(self): + return self.__role_arn + + def set_role_arn(self, arn): + if arn is not None: + assert (arn.__class__ is str), "Expected role_arn to be None or a string. Got {}.".format(arn.__class__) + assert ("arn:aws:iam::" in arn), "Expected role_arn to contain 'arn:aws:iam::'. Got '{}'.".format(arn) + self.__role_arn = arn + + role_arn = property(get_role_arn, set_role_arn) + + def get_u2f_disabled(self): + return self.__u2f_disabled + + def set_u2f_disabled(self, u2f_disabled): + assert (u2f_disabled.__class__ is bool), "Expected u2f_disabled to be a boolean. Got {}.".format(u2f_disabled.__class__) + self.__u2f_disabled = u2f_disabled + + u2f_disabled = property(get_u2f_disabled, set_u2f_disabled) + + def get_credentials_file(self): + return os.path.expanduser(self.__boto_session.get_config_variable('credentials_file')) + + credentials_file = property(get_credentials_file) + + def get_config_file(self): + return os.path.expanduser(self.__boto_session.get_config_variable('config_file')) + + config_file = property(get_config_file) + + def ensure_config_files_exist(self): + for file in [self.config_file, self.credentials_file]: + directory = os.path.dirname(file) + if not os.path.exists(directory): + os.mkdir(directory, 0o700) + if not os.path.exists(file): + util.Util.touch(file) + + def write(self, amazon_object): + self.ensure_config_files_exist() + + assert (self.profile is not None), "Can not store config/credentials if the AWS_PROFILE is None." + + # Write to the configuration file + config_parser = configparser.RawConfigParser() + config_parser.read(self.config_file) + if not config_parser.has_section(self.profile): + config_parser.add_section(self.profile) + config_parser.set(self.profile, 'region', self.region) + config_parser.set(self.profile, 'aws_google_auth_ask_role', self.ask_role) + config_parser.set(self.profile, 'aws_google_auth_duration', self.duration) + config_parser.set(self.profile, 'aws_google_auth_u2f_disabled', self.u2f_disabled) + config_parser.set(self.profile, 'aws_google_auth_region', self.region) + config_parser.set(self.profile, 'aws_google_auth_profile', self.profile) + config_parser.set(self.profile, 'aws_google_auth_role_arn', self.role_arn) + config_parser.set(self.profile, 'aws_google_auth_idp_id', self.idp_id) + config_parser.set(self.profile, 'aws_google_auth_sp_id', self.sp_id) + config_parser.set(self.profile, 'aws_google_auth_username', self.username) + with open(self.config_file, 'w+') as f: + config_parser.write(f) + + # Write to the credentials file + credentials_parser = configparser.RawConfigParser() + credentials_parser.read(self.credentials_file) + if not credentials_parser.has_section(self.profile): + credentials_parser.add_section(self.profile) + credentials_parser.set(self.profile, 'aws_access_key_id', amazon_object.access_key_id) + credentials_parser.set(self.profile, 'aws_secret_access_key', amazon_object.secret_access_key) + credentials_parser.set(self.profile, 'aws_session_token', amazon_object.session_token) + credentials_parser.set(self.profile, 'aws_security_token', amazon_object.session_token) + credentials_parser.set(self.profile, 'aws_session_expiration', amazon_object.expiration.strftime('%Y-%m-%dT%H:%M:%S%z')) + with open(self.credentials_file, 'w+') as f: + credentials_parser.write(f) diff --git a/aws_google_auth/google.py b/aws_google_auth/google.py new file mode 100644 index 0000000..7e08b40 --- /dev/null +++ b/aws_google_auth/google.py @@ -0,0 +1,293 @@ +#!/usr/bin/env python + +from . import _version + +import sys +import requests +import json +from bs4 import BeautifulSoup + +# In Python3, the library 'urlparse' was renamed to 'urllib.parse'. For this to +# maintain compatibility with both Python 2 and Python 3, the import must be +# dynamically chosen based on the version detected. +if sys.version_info >= (3, 0): + import urllib.parse as urlparse +else: + import urlparse + +# The U2F USB Library is optional, if it's there, include it. +try: + from . import u2f +except ImportError: + print("Failed to import U2F libraries, U2F login unavailable. Other " + "methods can still continue.") + + +class Google: + def __init__(self, config): + """The Google object holds authentication state + for a given session. You need to supply: + + username: FQDN Google username, eg first.last@example.com + password: obvious + idp_id: Google's assigned IdP identifier for your G-suite account + sp_id: Google's assigned SP identifier for your AWS SAML app + + Optionally, you can supply: + duration_seconds: number of seconds for the session to be active (max 3600) + """ + + self.version = _version.__version__ + self.config = config + + @property + def login_url(self): + return "https://accounts.google.com/o/saml2/initsso?idpid={}&spid={}&forceauthn=false".format(self.config.idp_id, self.config.sp_id) + + def do_login(self): + self.session = requests.Session() + self.session.headers['User-Agent'] = "AWS Sign-in/{} (Cevo aws-google-auth)".format(self.version) + sess = self.session.get(self.login_url) + sess.raise_for_status() + + # Collect information from the page source + first_page = BeautifulSoup(sess.text, 'html.parser') + gxf = first_page.find('input', {'name': 'gxf'}).get('value') + self.cont = first_page.find('input', {'name': 'continue'}).get('value') + page = first_page.find('input', {'name': 'Page'}).get('value') + sign_in = first_page.find('input', {'name': 'signIn'}).get('value') + account_login_url = first_page.find('form', {'id': 'gaia_loginform'}).get('action') + + payload = { + 'bgresponse': 'js_disabled', + 'checkConnection': '', + 'checkedDomains': 'youtube', + 'continue': self.cont, + 'Email': self.config.username, + 'gxf': gxf, + 'identifier-captcha-input': '', + 'identifiertoken': '', + 'identifiertoken_audio': '', + 'ltmpl': 'popup', + 'oauth': 1, + 'Page': page, + 'Passwd': '', + 'PersistentCookie': 'yes', + 'ProfileInformation': '', + 'pstMsg': 0, + 'sarp': 1, + 'scc': 1, + 'SessionState': '', + 'signIn': sign_in, + '_utf8': '?', + } + + # GALX is sometimes not there + try: + galx = first_page.find('input', {'name': 'GALX'}).get('value') + payload['GALX'] = galx + except: + pass + + # POST to account login info page, to collect profile and session info + sess = self.session.post(account_login_url, data=payload) + sess.raise_for_status() + self.session.headers['Referer'] = sess.url + + # Collect ProfileInformation, SessionState, signIn, and Password Challenge URL + challenge_page = BeautifulSoup(sess.text, 'html.parser') + + profile_information = challenge_page.find('input', {'name': 'ProfileInformation'}).get('value') + session_state = challenge_page.find('input', {'name': 'SessionState'}).get('value') + sign_in = challenge_page.find('input', {'name': 'signIn'}).get('value') + passwd_challenge_url = challenge_page.find('form', {'id': 'gaia_loginform'}).get('action') + + # Update the payload + payload['SessionState'] = session_state + payload['ProfileInformation'] = profile_information + payload['signIn'] = sign_in + payload['Passwd'] = self.config.password + + # POST to Authenticate Password + sess = self.session.post(passwd_challenge_url, data=payload) + sess.raise_for_status() + response_page = BeautifulSoup(sess.text, 'html.parser') + error = response_page.find(class_='error-msg') + cap = response_page.find('input', {'name': 'logincaptcha'}) + + # Were there any errors logging in? Could be invalid username or password + # There could also sometimes be a Captcha, which means Google thinks you, + # or someone using the same outbound IP address as you, is a bot. + if error is not None: + raise ValueError('Invalid username or password') + + if cap is not None: + raise ValueError('Captcha Required. Manually Login to remove this.') + + self.session.headers['Referer'] = sess.url + + # Was there an MFA challenge? + if "challenge/totp/" in sess.url: + sess = self.handle_totp(sess) + elif "challenge/ipp/" in sess.url: + sess = self.handle_sms(sess) + elif "challenge/az/" in sess.url: + sess = self.handle_prompt(sess) + elif "challenge/sk/" in sess.url: + sess = self.handle_sk(sess) + + # ... there are different URLs for backup codes (printed) + # and security keys (eg yubikey) as well + # save for later + self.session_state = sess + + def parse_saml(self): + if self.session_state is None: + raise RuntimeError('You must use do_login() before calling parse_saml()') + + parsed = BeautifulSoup(self.session_state.text, 'html.parser') + try: + saml_element = parsed.find('input', {'name': 'SAMLResponse'}).get('value') + except: + raise RuntimeError('Could not find SAML response, check your credentials') + + return saml_element + + def handle_sk(self, sess): + response_page = BeautifulSoup(sess.text, 'html.parser') + challenge_url = sess.url.split("?")[0] + + challenges_txt = response_page.find('input', {'name': "id-challenge"}).get('value') + challenges = json.loads(challenges_txt) + + facet_url = urlparse.urlparse(challenge_url) + facet = facet_url.scheme + "://" + facet_url.netloc + app_id = challenges["appId"] + u2f_challenges = [] + for c in challenges["challenges"]: + c["appId"] = app_id + u2f_challenges.append(c) + + auth_response = json.dumps(u2f.u2f_auth(u2f_challenges, facet)) + + payload = { + 'challengeId': response_page.find('input', {'name': 'challengeId'}).get('value'), + 'challengeType': response_page.find('input', {'name': 'challengeType'}).get('value'), + 'continue': response_page.find('input', {'name': 'continue'}).get('value'), + 'scc': response_page.find('input', {'name': 'scc'}).get('value'), + 'sarp': response_page.find('input', {'name': 'sarp'}).get('value'), + 'checkedDomains': response_page.find('input', {'name': 'checkedDomains'}).get('value'), + 'pstMsg': response_page.find('input', {'name': 'pstMsg'}).get('value'), + 'TL': response_page.find('input', {'name': 'TL'}).get('value'), + 'gxf': response_page.find('input', {'name': 'gxf'}).get('value'), + 'id-challenge': challenges_txt, + 'id-assertion': auth_response, + 'TrustDevice': 'on', + } + + sess = self.session.post(challenge_url, data=payload) + sess.raise_for_status() + + return sess + + def handle_sms(self, sess): + response_page = BeautifulSoup(sess.text, 'html.parser') + challenge_url = sess.url.split("?")[0] + + try: + sms_token = raw_input("Enter SMS token: G-") or None + except NameError: + sms_token = input("Enter SMS token: G-") or None + + payload = { + 'challengeId': response_page.find('input', {'name': 'challengeId'}).get('value'), + 'challengeType': response_page.find('input', {'name': 'challengeType'}).get('value'), + 'continue': response_page.find('input', {'name': 'continue'}).get('value'), + 'scc': response_page.find('input', {'name': 'scc'}).get('value'), + 'sarp': response_page.find('input', {'name': 'sarp'}).get('value'), + 'checkedDomains': response_page.find('input', {'name': 'checkedDomains'}).get('value'), + 'pstMsg': response_page.find('input', {'name': 'pstMsg'}).get('value'), + 'TL': response_page.find('input', {'name': 'TL'}).get('value'), + 'gxf': response_page.find('input', {'name': 'gxf'}).get('value'), + 'Pin': sms_token, + 'TrustDevice': 'on', + } + + # Submit IPP (SMS code) + sess = self.session.post(challenge_url, data=payload) + sess.raise_for_status() + + return sess + + def handle_prompt(self, sess): + response_page = BeautifulSoup(sess.text, 'html.parser') + challenge_url = sess.url.split("?")[0] + + data_key = response_page.find('div', {'data-api-key': True}).get('data-api-key') + data_tx_id = response_page.find('div', {'data-tx-id': True}).get('data-tx-id') + + # Need to post this to the verification/pause endpoint + await_url = "https://content.googleapis.com/cryptauth/v1/authzen/awaittx?alt=json&key=%s" % data_key + await_body = {'txId': data_tx_id} + + print("Open the Google App, and tap 'Yes' on the prompt to sign in ...") + + self.session.headers['Referer'] = sess.url + response = self.session.post(await_url, json=await_body) + parsed = json.loads(response.text) + + payload = { + 'challengeId': response_page.find('input', {'name': 'challengeId'}).get('value'), + 'challengeType': response_page.find('input', {'name': 'challengeType'}).get('value'), + 'continue': response_page.find('input', {'name': 'continue'}).get('value'), + 'scc': response_page.find('input', {'name': 'scc'}).get('value'), + 'sarp': response_page.find('input', {'name': 'sarp'}).get('value'), + 'checkedDomains': response_page.find('input', {'name': 'checkedDomains'}).get('value'), + 'checkConnection': 'youtube:1295:1', + 'pstMsg': response_page.find('input', {'name': 'pstMsg'}).get('value'), + 'TL': response_page.find('input', {'name': 'TL'}).get('value'), + 'gxf': response_page.find('input', {'name': 'gxf'}).get('value'), + 'token': parsed['txToken'], + 'action': response_page.find('input', {'name': 'action'}).get('value'), + 'TrustDevice': 'on', + } + + sess = self.session.post(challenge_url, data=payload) + sess.raise_for_status() + + return sess + + def handle_totp(self, sess): + response_page = BeautifulSoup(sess.text, 'html.parser') + tl = response_page.find('input', {'name': 'TL'}).get('value') + gxf = response_page.find('input', {'name': 'gxf'}).get('value') + challenge_url = sess.url.split("?")[0] + challenge_id = challenge_url.split("totp/")[1] + + try: + mfa_token = raw_input("MFA token: ") or None + except NameError: + mfa_token = input("MFA token: ") or None + + if not mfa_token: + raise ValueError("MFA token required for {} but none supplied.".format(self.config.username)) + + payload = { + 'challengeId': challenge_id, + 'challengeType': 6, + 'continue': self.cont, + 'scc': 1, + 'sarp': 1, + 'checkedDomains': 'youtube', + 'pstMsg': 0, + 'TL': tl, + 'gxf': gxf, + 'Pin': mfa_token, + 'TrustDevice': 'on', + } + + # Submit TOTP + sess = self.session.post(challenge_url, data=payload) + sess.raise_for_status() + + return sess diff --git a/aws_google_auth/prepare.py b/aws_google_auth/prepare.py deleted file mode 100644 index bb86ad8..0000000 --- a/aws_google_auth/prepare.py +++ /dev/null @@ -1,131 +0,0 @@ -import configparser -import os -import botocore.session -from types import MethodType - - -def get_prepared_config( - profile, - region, - google_username, - google_idp_id, - google_sp_id, - duration, - ask_role, - role_arn -): - - def default_if_none(value, default): - return value if value is not None else default - - # If no profile is specified, default to "sts" so we don't clobber the user's default. - google_config.profile = default_if_none(profile, google_config.profile) or "sts" - _create_base_aws_cli_config_files_if_needed(google_config) - - if google_config.profile is not None: - _load_google_config_from_stored_profile(google_config, google_config.profile) - - google_config.region = default_if_none(region, google_config.region) - google_config.google_username = default_if_none(google_username, google_config.google_username) - google_config.google_idp_id = default_if_none(google_idp_id, google_config.google_idp_id) - google_config.google_sp_id = default_if_none(google_sp_id, google_config.google_sp_id) - google_config.duration = default_if_none(duration, google_config.duration) - google_config.ask_role = default_if_none(ask_role, google_config.ask_role) - google_config.role_arn = default_if_none(role_arn, google_config.role_arn) - - return google_config - - -def _create_google_default_config(): - config = type('', (), {})() - - # Use botocore session API to get defaults - session = botocore.session.Session() - - # region: The default AWS region that this script will connect - # to for all API calls - config.region = session.get_config_variable('region') or 'eu-central-1' - - # aws cli profile to store config and access keys into - config.profile = session.profile or None - - # output format: The AWS CLI output format that will be configured in the - # adf profile (affects subsequent CLI calls) - config.output_format = session.get_config_variable('format') or 'json' - - # aws credential location: The file where this script will store the temp - # credentials under the configured profile - config.aws_credentials_location = os.path.expanduser(session.get_config_variable('credentials_file')) - config.aws_config_location = os.path.expanduser(session.get_config_variable('config_file')) - - config.role_arn = None - config.provider = None - - config.google_sp_id = None - config.google_idp_id = None - config.google_username = None - config.duration = 3600 - config.ask_role = False - - return config - - -def _load_google_config_from_stored_profile(google_config, profile): - - def get_or(self, profile, option, default_value): - if self.has_option(profile, option): - return self.get(profile, option) - return default_value - - def load_from_config(config_location, profile, loader): - config = configparser.RawConfigParser() - config.read(config_location) - if config.has_section(profile): - setattr(config, get_or.__name__, MethodType(get_or, config)) - loader(config, profile) - - del config - - def load_config(config, profile): - google_config.region = config.get_or(profile, 'region', google_config.region) - google_config.output_format = config.get_or(profile, 'output', google_config.output_format) - - google_config.role_arn = config.get_or(profile, 'google_config.role_arn', google_config.role_arn) - google_config.provider = config.get_or(profile, 'google_config.provider', google_config.provider) - google_config.google_idp_id = config.get_or(profile, 'google_config.google_idp_id', google_config.google_idp_id) - google_config.google_sp_id = config.get_or(profile, 'google_config.google_sp_id', google_config.google_sp_id) - google_config.google_username = config.get_or(profile, 'google_config.google_username', google_config.google_username) - - if profile == 'default': - load_from_config(google_config.aws_config_location, profile, load_config) - else: - load_from_config(google_config.aws_config_location, 'profile ' + profile, load_config) - - -def _create_base_aws_cli_config_files_if_needed(google_config): - def touch(fname, mode=0o600): - flags = os.O_CREAT | os.O_APPEND - with os.fdopen(os.open(fname, flags, mode)) as f: - try: - os.utime(fname, None) - finally: - f.close() - - aws_config_root = os.path.dirname(google_config.aws_config_location) - - if not os.path.exists(aws_config_root): - os.mkdir(aws_config_root, 0o700) - - if not os.path.exists(google_config.aws_credentials_location): - touch(google_config.aws_credentials_location) - - aws_credentials_root = os.path.dirname(google_config.aws_credentials_location) - - if not os.path.exists(aws_credentials_root): - os.mkdir(aws_credentials_root, 0o700) - - if not os.path.exists(google_config.aws_config_location): - touch(google_config.aws_config_location) - - -google_config = _create_google_default_config() diff --git a/aws_google_auth/tests/test_amazon.py b/aws_google_auth/tests/test_amazon.py new file mode 100644 index 0000000..673b273 --- /dev/null +++ b/aws_google_auth/tests/test_amazon.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python + +import base64 +import unittest + +from aws_google_auth import amazon +from aws_google_auth import configuration +from os import path + + +class TestAmazon(unittest.TestCase): + + def valid_config(self): + return configuration.Configuration( + idp_id="IDPID", + sp_id="SPID", + username="user@example.com", + password="hunter2") + + def valid_saml_response(self): + here = path.abspath(path.dirname(__file__)) + with open(path.join(here, 'valid-response.xml')) as fp: + return base64.b64encode(fp.read().encode('utf-8')) + return None + + def extra_comma_saml_response(self): + here = path.abspath(path.dirname(__file__)) + with open(path.join(here, 'too-many-commas.xml')) as fp: + return base64.b64encode(fp.read().encode('utf-8')) + return None + + def test_sts_client(self): + a = amazon.Amazon(self.valid_config(), "encoded-saml") + self.assertEqual(str(a.sts_client.__class__), "") + + def test_role_extraction(self): + a = amazon.Amazon(self.valid_config(), self.valid_saml_response()) + self.assertIsInstance(a.roles, dict) + list_of_testing_roles = [ + "arn:aws:iam::123456789012:role/admin", + "arn:aws:iam::123456789012:role/read-only", + "arn:aws:iam::123456789012:role/test"] + self.assertEqual(sorted(list(a.roles.keys())), sorted(list_of_testing_roles)) + + def test_role_extraction_too_many_commas(self): + # See https://github.com/cevoaustralia/aws-google-auth/issues/12 + a = amazon.Amazon(self.valid_config(), self.extra_comma_saml_response()) + self.assertIsInstance(a.roles, dict) + list_of_testing_roles = [ + "arn:aws:iam::123456789012:role/admin", + "arn:aws:iam::123456789012:role/read-only", + "arn:aws:iam::123456789012:role/test"] + self.assertEqual(sorted(list(a.roles.keys())), sorted(list_of_testing_roles)) diff --git a/aws_google_auth/tests/test_configuration.py b/aws_google_auth/tests/test_configuration.py new file mode 100644 index 0000000..951e0e4 --- /dev/null +++ b/aws_google_auth/tests/test_configuration.py @@ -0,0 +1,276 @@ +#!/usr/bin/env python + +import unittest +from aws_google_auth import configuration + + +class TestConfigurationMethods(unittest.TestCase): + + def test_duration_invalid_values(self): + # Duration must be an integer + with self.assertRaises(AssertionError) as e: + configuration.Configuration( + idp_id="sample_idp_id", + sp_id="sample_sp_id", + username="sample_username", + duration="bad_type") + self.assertIn("Expected duration to be an integer.", str(e.exception)) + + # Duration can not be negative + with self.assertRaises(AssertionError) as e: + configuration.Configuration( + idp_id="sample_idp_id", + sp_id="sample_sp_id", + username="sample_username", + duration=-1) + self.assertIn("Expected duration to be greater than 0.", str(e.exception)) + + # Duration can not be greater than MAX_DURATION + with self.assertRaises(AssertionError) as e: + valid = configuration.Configuration( + idp_id="sample_idp_id", + sp_id="sample_sp_id", + username="sample_username", + duration=100) + configuration.Configuration( + idp_id="sample_idp_id", + sp_id="sample_sp_id", + username="sample_username", + duration=(valid.max_duration + 1)) + self.assertIn("Expected duration to be less than or equal to max_duration", str(e.exception)) + + def test_duration_valid_values(self): + c = configuration.Configuration( + idp_id="sample_idp_id", + sp_id="sample_sp_id", + username="sample_username", + duration=100) + self.assertEqual(c.duration, 100) + c.duration = c.max_duration + self.assertEqual(c.duration, c.max_duration) + c.duration = (c.max_duration - 1) + self.assertEqual(c.duration, c.max_duration - 1) + + def test_duration_defaults_to_max_duration(self): + c = configuration.Configuration( + idp_id="sample_idp_id", + sp_id="sample_sp_id", + username="sample_username") + self.assertEqual(c.duration, c.max_duration) + + def test_ask_role_invalid_values(self): + # ask_role must be a boolean + with self.assertRaises(AssertionError) as e: + configuration.Configuration( + idp_id="sample_idp_id", + sp_id="sample_sp_id", + username="sample_username", + ask_role="bad_value") + self.assertIn("Expected ask_role to be a boolean.", str(e.exception)) + + def test_ask_role_valid_values(self): + c = configuration.Configuration( + idp_id="sample_idp_id", + sp_id="sample_sp_id", + username="sample_username", + ask_role=True) + self.assertTrue(c.ask_role) + c = configuration.Configuration( + idp_id="sample_idp_id", + sp_id="sample_sp_id", + username="sample_username", + ask_role=False) + self.assertFalse(c.ask_role) + + def test_ask_role_optional(self): + c = configuration.Configuration( + idp_id="sample_idp_id", + sp_id="sample_sp_id", + username="sample_username") + self.assertFalse(c.ask_role) + + def test_idp_id_invalid_values(self): + # idp_id must not be None + with self.assertRaises(AssertionError) as e: + configuration.Configuration( + sp_id="sample_sp_id", + username="sample_username") + self.assertIn("Expected idp_id to be set to non-None value.", str(e.exception)) + + def test_idp_id_valid_values(self): + c = configuration.Configuration( + idp_id="sample_idp_id", + sp_id="sample_sp_id", + username="sample_username") + self.assertEqual(c.idp_id, "sample_idp_id") + c.idp_id = 123456 + self.assertEqual(c.idp_id, 123456) + + def test_sp_id_invalid_values(self): + # sp_id must not be None + with self.assertRaises(AssertionError) as e: + configuration.Configuration( + idp_id="sample_idp_id", + username="sample_username") + self.assertIn("Expected sp_id to be set to non-None value.", str(e.exception)) + + def test_username_valid_values(self): + c = configuration.Configuration( + idp_id="sample_idp_id", + sp_id="sample_sp_id", + username="sample_username") + self.assertEqual(c.username, "sample_username") + c.username = "123456" + self.assertEqual(c.username, "123456") + + def test_username_invalid_values(self): + # username must be set + with self.assertRaises(AssertionError) as e: + configuration.Configuration( + idp_id="sample_idp_id", + sp_id="sample_sp_id") + self.assertIn("Expected username to be a string.", str(e.exception)) + # username must be be string + with self.assertRaises(AssertionError) as e: + configuration.Configuration( + idp_id="sample_idp_id", + sp_id="sample_sp_id", + username=123456) + self.assertIn("Expected username to be a string.", str(e.exception)) + + def test_sp_id_valid_values(self): + c = configuration.Configuration( + idp_id="sample_idp_id", + sp_id="sample_sp_id", + username="sample_username") + self.assertEqual(c.sp_id, "sample_sp_id") + c.sp_id = 123456 + self.assertEqual(c.sp_id, 123456) + + def test_profile_defaults_to_sts(self): + c = configuration.Configuration( + idp_id="sample_idp_id", + sp_id="sample_sp_id", + username="sample_username") + self.assertEqual(c.profile, "sts") + + def test_profile_invalid_values(self): + # profile must be a string + with self.assertRaises(AssertionError) as e: + configuration.Configuration( + idp_id="sample_idp_id", + sp_id="sample_sp_id", + username="sample_username", + profile=123456) + self.assertIn("Expected profile to be a string.", str(e.exception)) + + def test_profile_valid_values(self): + c = configuration.Configuration( + idp_id="sample_idp_id", + sp_id="sample_sp_id", + username="sample_username", + profile="default") + self.assertEqual(c.profile, "default") + c.profile = "sts" + self.assertEqual(c.profile, "sts") + + def test_profile_defaults(self): + c = configuration.Configuration( + idp_id="sample_idp_id", + sp_id="sample_sp_id", + username="sample_username") + self.assertEqual(c.profile, 'sts') + + def test_region_invalid_values(self): + # region must be a string + with self.assertRaises(AssertionError) as e: + configuration.Configuration( + idp_id="sample_idp_id", + sp_id="sample_sp_id", + username="sample_username", + region=1234) + self.assertIn("Expected region to be a string.", str(e.exception)) + + def test_region_valid_values(self): + c = configuration.Configuration( + idp_id="sample_idp_id", + sp_id="sample_sp_id", + username="sample_username", + region="us-east-1") + self.assertEqual(c.region, "us-east-1") + c.region = "us-west-2" + self.assertEqual(c.region, "us-west-2") + + def test_region_defaults_to_ap_southeast_2(self): + c = configuration.Configuration( + idp_id="sample_idp_id", + sp_id="sample_sp_id", + username="sample_username") + self.assertEqual(c.region, "ap-southeast-2") + + def test_role_arn_invalid_values(self): + # role_arn must be a string + with self.assertRaises(AssertionError) as e: + configuration.Configuration( + idp_id="sample_idp_id", + sp_id="sample_sp_id", + username="sample_username", + role_arn=1234) + self.assertIn("Expected role_arn to be None or a string.", str(e.exception)) + + # role_arn be a arn-looking string + with self.assertRaises(AssertionError) as e: + configuration.Configuration( + idp_id="sample_idp_id", + sp_id="sample_sp_id", + username="sample_username", + role_arn="bad_string") + self.assertIn("Expected role_arn to contain 'arn:aws:iam::'", str(e.exception)) + + def test_role_arn_is_optional(self): + c = configuration.Configuration( + idp_id="sample_idp_id", + sp_id="sample_sp_id", + username="sample_username") + self.assertIsNone(c.role_arn) + + def test_role_arn_valid_values(self): + c = configuration.Configuration( + idp_id="sample_idp_id", + sp_id="sample_sp_id", + username="sample_username", + role_arn="arn:aws:iam::some_arn_1") + self.assertEqual(c.role_arn, "arn:aws:iam::some_arn_1") + c.role_arn = "arn:aws:iam::some_other_arn_2" + self.assertEqual(c.role_arn, "arn:aws:iam::some_other_arn_2") + + def test_u2f_disabled_invalid_values(self): + # u2f_disabled must be a boolean + with self.assertRaises(AssertionError) as e: + configuration.Configuration( + idp_id="sample_idp_id", + sp_id="sample_sp_id", + username="sample_username", + u2f_disabled=1234) + self.assertIn("Expected u2f_disabled to be a boolean.", str(e.exception)) + + def test_u2f_disabled_valid_values(self): + c = configuration.Configuration( + idp_id="sample_idp_id", + sp_id="sample_sp_id", + username="sample_username", + u2f_disabled=True) + self.assertTrue(c.u2f_disabled) + c = configuration.Configuration( + idp_id="sample_idp_id", + sp_id="sample_sp_id", + username="sample_username", + u2f_disabled=False) + self.assertFalse(c.u2f_disabled) + + def test_u2f_disabled_is_optional(self): + c = configuration.Configuration( + idp_id="sample_idp_id", + sp_id="sample_sp_id", + username="sample_username") + self.assertFalse(c.u2f_disabled) diff --git a/aws_google_auth/tests/test_invalid_duration.py b/aws_google_auth/tests/test_invalid_duration.py deleted file mode 100644 index 5c9c96a..0000000 --- a/aws_google_auth/tests/test_invalid_duration.py +++ /dev/null @@ -1,14 +0,0 @@ -from aws_google_auth import GoogleAuth - - -def test_non_int(): - try: - GoogleAuth( - username='foo', - password='bar', - idp_id='banana', - sp_id='potato', - duration_seconds='cheese', - ) - except ValueError as e: - assert(str(e) == 'GoogleAuth: duration_seconds must be an integer') diff --git a/aws_google_auth/tests/test_issue_12_extra_comma.py b/aws_google_auth/tests/test_issue_12_extra_comma.py deleted file mode 100644 index 9f2e06d..0000000 --- a/aws_google_auth/tests/test_issue_12_extra_comma.py +++ /dev/null @@ -1,23 +0,0 @@ -import aws_google_auth -from os import path -from lxml import etree - -here = path.abspath(path.dirname(__file__)) - -with open(path.join(here, 'valid-response.xml')) as fp: - VALID_DOC = etree.fromstring(fp.read().encode('utf-8')) - -with open(path.join(here, 'too-many-commas.xml')) as fp: - TOO_MANY_COMMAS_DOC = etree.fromstring(fp.read().encode('utf-8')) - -VALID_ROLE_ARN = "arn:aws:iam::123456789012:role/admin" - - -def test_parsing_valid_response(): - roles = aws_google_auth.parse_roles(VALID_DOC) - assert(VALID_ROLE_ARN in roles) - - -def test_parsing_extra_commas(): - roles = aws_google_auth.parse_roles(TOO_MANY_COMMAS_DOC) - assert(VALID_ROLE_ARN in roles) diff --git a/aws_google_auth/tests/test_persist_profile.py b/aws_google_auth/tests/test_persist_profile.py deleted file mode 100644 index feb8156..0000000 --- a/aws_google_auth/tests/test_persist_profile.py +++ /dev/null @@ -1,111 +0,0 @@ -import aws_google_auth -from aws_google_auth import prepare - -from os import path -from lxml import etree - -import unittest -import mock - - -class TestPersistConfig(unittest.TestCase): - - def setUp(self): - - prepare.google_config.region = "test_region" - prepare.google_config.role_arn = None - - prepare.google_config.profile = None - - prepare.google_config.output_format = "json" - # prepare.google_config.aws_credentials_location = "creds" - # prepare.google_config.aws_config_location = "config" - - prepare.google_config.role_arn = None - prepare.google_config.provider = None - - prepare.google_config.google_sp_id = None - prepare.google_config.google_idp_id = None - prepare.google_config.google_username = None - prepare.google_config.duration = 3600 - prepare.google_config.ask_role = False - - def test_when_there_is_no_profile_use_supplied_values(self): - # given profile to read the configuration doesn't exist - not_existing_profile = 'not_existing_profile' - prepare.configparser = mock.Mock() - config_without_non_existing_profile = mock.Mock() - prepare.configparser.RawConfigParser = mock.Mock(return_value=config_without_non_existing_profile) - config_without_non_existing_profile.has_section = mock.Mock(return_value=False) - - # and values supplied are setup as follows - default_username = 'default_username' - default_region = 'default_region' - default_idp_id = 'default_idp_id' - default_sp_id = 'default_sp_id' - default_duration = 'default_duration' - ask_role = 'default_ask_role' - role_arn = 'default_role_arn' - - # when configuration is prepared for not existing profile - config = prepare.get_prepared_config( - not_existing_profile, - default_region, - default_username, - default_idp_id, - default_sp_id, - default_duration, - ask_role, - role_arn - ) - - # then the supplied values are merged with the defaults - self.assertTrue(config is not None) - self.assertEquals(config.region, default_region) - self.assertEquals(config.google_username, default_username) - self.assertEquals(config.google_idp_id, default_idp_id) - self.assertEquals(config.google_sp_id, default_sp_id) - self.assertEquals(config.duration, default_duration) - self.assertEquals(config.ask_role, ask_role) - self.assertEquals(config.role_arn, role_arn) - - def test_when_there_is_no_profile_use_default_values(self): - - # given profile to read the configuration doesn't exist - prepare.configparser = mock.Mock() - config_without_non_existing_profile = mock.Mock() - prepare.configparser.RawConfigParser = mock.Mock(return_value=config_without_non_existing_profile) - config_without_non_existing_profile.has_section = mock.Mock(return_value=False) - - # and no values are supplied - profile = None - region = None - username = None - idp_id = None - sp_id = None - duration = None - ask_role = None - role_arn = None - - # when configuration is prepared for not existing profile - config = prepare.get_prepared_config( - profile, - region, - username, - idp_id, - sp_id, - duration, - ask_role, - role_arn - ) - - # then the defaults are returned - self.assertTrue(config is not None) - self.assertEquals(config.region, "test_region") - self.assertEquals(config.google_username, None) - self.assertEquals(config.google_idp_id, None) - self.assertEquals(config.google_sp_id, None) - self.assertEquals(config.duration, 3600) - self.assertEquals(config.ask_role, False) - self.assertEquals(config.profile, "sts") - self.assertEquals(config.role_arn, None) diff --git a/aws_google_auth/u2f.py b/aws_google_auth/u2f.py index 78902ba..a4e0558 100644 --- a/aws_google_auth/u2f.py +++ b/aws_google_auth/u2f.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python + import requests import time import json diff --git a/aws_google_auth/util.py b/aws_google_auth/util.py new file mode 100644 index 0000000..30d9a85 --- /dev/null +++ b/aws_google_auth/util.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python + +import os + + +class Util: + + @staticmethod + def get_input(prompt): + try: + return raw_input(prompt) + except NameError: + return input(prompt) + + @staticmethod + def pick_a_role(roles): + while True: + for i, role in enumerate(roles): + print("[{:>3d}] {}".format(i + 1, role)) + + prompt = 'Type the number (1 - {:d}) of the role to assume: '.format(len(roles)) + choice = Util.get_input(prompt) + + try: + return list(roles.items())[int(choice) - 1] + except IndexError: + print("Invalid choice, try again.") + + @staticmethod + def touch(file_name, mode=0o600): + flags = os.O_CREAT | os.O_APPEND + with os.fdopen(os.open(file_name, flags, mode)) as f: + try: + os.utime(file_name, None) + finally: + f.close() diff --git a/requirements.txt b/requirements.txt index 44af5db..8445799 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,5 @@ - +beautifulsoup4 boto3 +configparser lxml requests -beautifulsoup4 -configparser