Skip to content

Commit

Permalink
Format code
Browse files Browse the repository at this point in the history
Signed-off-by: Ivan Kanakarakis <ivan.kanak@gmail.com>
  • Loading branch information
c00kiemon5ter committed May 16, 2019
1 parent 8ee08e2 commit 285c80d
Showing 1 changed file with 46 additions and 51 deletions.
97 changes: 46 additions & 51 deletions src/satosa/frontends/saml2.py
Original file line number Diff line number Diff line change
Expand Up @@ -1027,6 +1027,7 @@ class SAMLUnsolicitedFrontend(SAMLFrontend):
unsolicted SAML flows. The unsolicited SAML flows are not part of any
SAML standard.
"""

KEY_ENDPOINT = "endpoint"
KEY_DISCO_URL_WHITE = "discovery_service_url_whitelist"
KEY_DISCO_POLICY_WHITE = "discovery_service_policy_whitelist"
Expand All @@ -1037,15 +1038,15 @@ class SAMLUnsolicitedFrontend(SAMLFrontend):
KEY_QUERY_DISCO_URL = "discoveryURL"
KEY_QUERY_DISCO_POLICY = "discoveryPolicy"
KEY_SAML_DISCOVERY_SERVICE_URL = SAMLBackend.KEY_SAML_DISCOVERY_SERVICE_URL
KEY_SAML_DISCOVERY_SERVICE_POLICY = (
SAMLBackend.KEY_SAML_DISCOVERY_SERVICE_POLICY
)
KEY_SAML_DISCOVERY_SERVICE_POLICY = SAMLBackend.KEY_SAML_DISCOVERY_SERVICE_POLICY
KEY_UNSOLICITED = "unsolicited"

def __init__(self, auth_req_callback_func, internal_attributes, config,
base_url, name):
super().__init__(auth_req_callback_func, internal_attributes, config,
base_url, name)
def __init__(
self, auth_req_callback_func, internal_attributes, config, base_url, name
):
super().__init__(
auth_req_callback_func, internal_attributes, config, base_url, name
)

def register_endpoints(self, backend_names):
"""
Expand All @@ -1060,12 +1061,10 @@ def register_endpoints(self, backend_names):
"""
url_map = super().register_endpoints(backend_names)

path = urlparse(
self.config[self.KEY_UNSOLICITED]
.get(self.KEY_ENDPOINT)).path
path = urlparse(self.config[self.KEY_UNSOLICITED].get(self.KEY_ENDPOINT)).path

for backend in backend_names:
pat = '(^{})/{}$'.format(backend, path)
pat = "(^{})/{}$".format(backend, path)
url_map.append((pat, self.unsolicited_endpoint))

logger.debug("URL maps to be registered are {}".format(url_map))
Expand All @@ -1092,16 +1091,14 @@ def unsolicited_endpoint(self, context):
requested_disco_url = request.get(self.KEY_QUERY_DISCO_URL, None)
requested_disco_policy = request.get(self.KEY_QUERY_DISCO_POLICY, None)

logger.debug("Unsolicited target authenticating IdP is {}".format(
target_idp_entity_id))
logger.debug(
"Unsolicited target authenticating IdP is {}".format(target_idp_entity_id)
)
logger.debug("Unsolicited target SP is {}".format(target_sp_entity_id))
logger.debug("Unsolicited ACS URL is {}".format(target_sp_acs_url))
logger.debug("Unsolicited relay state is {}".format(
target_sp_relay_state_url))
logger.debug("Unsolicted discovery URL is {}".format(
requested_disco_url))
logger.debug("Unsolicted discovery policy is {}".format(
requested_disco_policy))
logger.debug("Unsolicited relay state is {}".format(target_sp_relay_state_url))
logger.debug("Unsolicted discovery URL is {}".format(requested_disco_url))
logger.debug("Unsolicted discovery policy is {}".format(requested_disco_policy))

# We only proceed with known federated SPs.
try:
Expand All @@ -1114,9 +1111,10 @@ def unsolicited_endpoint(self, context):

# The SP ACS URL if input must match one from the trusted metadata.
# We assume the SP only has one SPSSODescriptor element in metadata.
acs_ob_list = (target_sp_metadata.get("spsso_descriptor", [{}])[0]
.get("assertion_consumer_service", [{}]))
acs_locations = [acs_ob['location'] for acs_ob in acs_ob_list]
acs_ob_list = target_sp_metadata.get("spsso_descriptor", [{}])[0].get(
"assertion_consumer_service", [{}]
)
acs_locations = [acs_ob["location"] for acs_ob in acs_ob_list]

if target_sp_acs_url:
if target_sp_acs_url not in acs_locations:
Expand All @@ -1128,10 +1126,11 @@ def unsolicited_endpoint(self, context):
for acs_ob in acs_ob_list:
# We assume the SP has HTTP_POST binding and we simply
# take the first one we find.
if acs_ob['binding'] == BINDING_HTTP_POST:
target_sp_acs_url = acs_ob['location']
logger.debug("Unsolicited found SP ACS URL {}".format(
target_sp_acs_url))
if acs_ob["binding"] == BINDING_HTTP_POST:
target_sp_acs_url = acs_ob["location"]
logger.debug(
"Unsolicited found SP ACS URL {}".format(target_sp_acs_url)
)
break

if not target_sp_acs_url:
Expand All @@ -1144,8 +1143,11 @@ def unsolicited_endpoint(self, context):
if target_sp_relay_state_url:
target = urlparse(target_sp_relay_state_url)
acs = urlparse(target_sp_acs_url)
if not (target.scheme == acs.scheme and
target.netloc == acs.netloc and target.port == acs.port):
if not (
target.scheme == acs.scheme
and target.netloc == acs.netloc
and target.port == acs.port
):
msg = "RelayState {} is not permitted"
msg = msg.format(target_sp_relay_state_url)
satosa_logging(logger, logging.ERROR, msg, context.state)
Expand All @@ -1155,14 +1157,8 @@ def unsolicited_endpoint(self, context):
acs = [[target_sp_acs_url, BINDING_HTTP_POST]]
sp_config_dict = {
"entityid": target_sp_entity_id,
"service": {
"sp": {
"endpoints": {
"assertion_consumer_service": acs
}
}
}
}
"service": {"sp": {"endpoints": {"assertion_consumer_service": acs}}},
}
sp_config = SPConfig().load(sp_config_dict, False)

# Create a temporary SP object and use it to create a authn request
Expand All @@ -1171,16 +1167,18 @@ def unsolicited_endpoint(self, context):
target_sp = Base(sp_config)

destination = None
endpoints = self.idp.config.getattr('endpoints')
sso_service_list = endpoints['single_sign_on_service']
endpoints = self.idp.config.getattr("endpoints")
sso_service_list = endpoints["single_sign_on_service"]
for location, binding in sso_service_list:
if binding == BINDING_HTTP_POST:
destination = location
break

if not destination:
msg = ("Could not determine location for SingleSignOnService "
"with HTTP-POST binding")
msg = (
"Could not determine location for SingleSignOnService "
"with HTTP-POST binding"
)
satosa_logging(logger, logging.ERROR, msg, context.state)
raise SATOSAError(msg)

Expand All @@ -1190,9 +1188,8 @@ def unsolicited_endpoint(self, context):

# Convert the authn request object to an encoded set of bytes.
authn_request_str = "{}".format(authn_request)
logger.debug("Unsolicted authn request is {}".format(
authn_request_str))
authn_request_bytes = authn_request_str.encode('utf-8')
logger.debug("Unsolicted authn request is {}".format(authn_request_str))
authn_request_bytes = authn_request_str.encode("utf-8")
authn_request_encoded = b64encode(authn_request_bytes)

# Add the authn request to the context as if it arrived through
Expand All @@ -1205,29 +1202,27 @@ def unsolicited_endpoint(self, context):

# If provided and is whitelisted set the discovery service to use.
if requested_disco_url:
allowed = (self.config[self.KEY_UNSOLICITED]
.get(self.KEY_DISCO_URL_WHITE))
allowed = self.config[self.KEY_UNSOLICITED].get(self.KEY_DISCO_URL_WHITE)
if requested_disco_url not in allowed:
msg = "Discovery service URL {} not allowed"
msg = msg.format(requested_disco_url)
satosa_logging(logger, logging.ERROR, msg, context.state)
raise SATOSAError(msg)

context.decorate(self.KEY_SAML_DISCOVERY_SERVICE_URL,
requested_disco_url)
context.decorate(self.KEY_SAML_DISCOVERY_SERVICE_URL, requested_disco_url)

# If provided and is whitelisted set the discovery policy to use.
if requested_disco_policy:
allowed = (self.config[self.KEY_UNSOLICITED]
.get(self.KEY_DISCO_POLICY_WHITE))
allowed = self.config[self.KEY_UNSOLICITED].get(self.KEY_DISCO_POLICY_WHITE)
if requested_disco_policy not in allowed:
msg = "Discovery service policy {} not allowed"
msg = msg.format(requested_disco_policy)
satosa_logging(logger, logging.ERROR, msg, context.state)
raise SATOSAError(msg)

context.decorate(self.KEY_SAML_DISCOVERY_SERVICE_POLICY,
requested_disco_policy)
context.decorate(
self.KEY_SAML_DISCOVERY_SERVICE_POLICY, requested_disco_policy
)

# If provided and known in the SAML metadata set the entityID for
# the IdP to use for authentication.
Expand Down

0 comments on commit 285c80d

Please sign in to comment.