Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(event-sources): sane defaults for authorizer v1 and v2 #4298

2 changes: 1 addition & 1 deletion aws_lambda_powertools/shared/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@

# JSON constants
PRETTY_INDENT: int = 4
COMPACT_INDENT = None
COMPACT_INDENT: None = None

# Idempotency constants
IDEMPOTENCY_DISABLED_ENV: str = "POWERTOOLS_IDEMPOTENCY_DISABLED"
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,18 @@

class APIGatewayEventAuthorizer(DictWrapper):
@property
def claims(self) -> Optional[Dict[str, Any]]:
return self.get("claims")
def claims(self) -> Dict[str, Any]:
return self.get("claims") or {} # key might exist but can be `null`

@property
def scopes(self) -> Optional[List[str]]:
return self.get("scopes")
def scopes(self) -> List[str]:
return self.get("scopes") or [] # key might exist but can be `null`

@property
def principal_id(self) -> Optional[str]:
def principal_id(self) -> str:
"""The principal user identification associated with the token sent by the client and returned from an
API Gateway Lambda authorizer (formerly known as a custom authorizer)"""
return self.get("principalId")
return self.get("principalId") or "" # key might exist but can be `null`

@property
def integration_latency(self) -> Optional[int]:
Expand Down Expand Up @@ -91,7 +91,8 @@ def route_key(self) -> Optional[str]:

@property
def authorizer(self) -> APIGatewayEventAuthorizer:
return APIGatewayEventAuthorizer(self._data["requestContext"]["authorizer"])
authz_data = self._data.get("requestContext", {}).get("authorizer", {})
return APIGatewayEventAuthorizer(authz_data)


class APIGatewayProxyEvent(BaseProxyEvent):
Expand All @@ -112,11 +113,11 @@ def resource(self) -> str:

@property
def multi_value_headers(self) -> Dict[str, List[str]]:
return self.get("multiValueHeaders") or {}
return self.get("multiValueHeaders") or {} # key might exist but can be `null`

@property
def multi_value_query_string_parameters(self) -> Dict[str, List[str]]:
return self.get("multiValueQueryStringParameters") or {}
return self.get("multiValueQueryStringParameters") or {} # key might exist but can be `null`

@property
def resolved_query_string_parameters(self) -> Dict[str, List[str]]:
Expand Down Expand Up @@ -154,72 +155,72 @@ def header_serializer(self) -> BaseHeadersSerializer:

class RequestContextV2AuthorizerIam(DictWrapper):
@property
def access_key(self) -> Optional[str]:
def access_key(self) -> str:
"""The IAM user access key associated with the request."""
return self.get("accessKey")
return self.get("accessKey") or "" # key might exist but can be `null`

@property
def account_id(self) -> Optional[str]:
def account_id(self) -> str:
"""The AWS account ID associated with the request."""
return self.get("accountId")
return self.get("accountId") or "" # key might exist but can be `null`

@property
def caller_id(self) -> Optional[str]:
def caller_id(self) -> str:
"""The principal identifier of the caller making the request."""
return self.get("callerId")
return self.get("callerId") or "" # key might exist but can be `null`

def _cognito_identity(self) -> Dict:
return self.get("cognitoIdentity", {}) or {} # not available in FunctionURL
return self.get("cognitoIdentity") or {} # not available in FunctionURL; key might exist but can be `null`

@property
def cognito_amr(self) -> Optional[List[str]]:
def cognito_amr(self) -> List[str]:
"""This represents how the user was authenticated.
AMR stands for Authentication Methods References as per the openid spec"""
return self._cognito_identity().get("amr")
return self._cognito_identity().get("amr", [])

@property
def cognito_identity_id(self) -> Optional[str]:
def cognito_identity_id(self) -> str:
"""The Amazon Cognito identity ID of the caller making the request.
Available only if the request was signed with Amazon Cognito credentials."""
return self._cognito_identity().get("identityId")
return self._cognito_identity().get("identityId", "")

@property
def cognito_identity_pool_id(self) -> Optional[str]:
def cognito_identity_pool_id(self) -> str:
"""The Amazon Cognito identity pool ID of the caller making the request.
Available only if the request was signed with Amazon Cognito credentials."""
return self._cognito_identity().get("identityPoolId")
return self._cognito_identity().get("identityPoolId") or "" # key might exist but can be `null`

@property
def principal_org_id(self) -> Optional[str]:
def principal_org_id(self) -> str:
"""The AWS organization ID."""
return self.get("principalOrgId")
return self.get("principalOrgId") or "" # key might exist but can be `null`

@property
def user_arn(self) -> Optional[str]:
def user_arn(self) -> str:
"""The Amazon Resource Name (ARN) of the effective user identified after authentication."""
return self.get("userArn")
return self.get("userArn") or "" # key might exist but can be `null`

@property
def user_id(self) -> Optional[str]:
def user_id(self) -> str:
"""The IAM user ID of the effective user identified after authentication."""
return self.get("userId")
return self.get("userId") or "" # key might exist but can be `null`


class RequestContextV2Authorizer(DictWrapper):
@property
def jwt_claim(self) -> Optional[Dict[str, Any]]:
jwt = self.get("jwt") or {} # not available in FunctionURL
return jwt.get("claims")
def jwt_claim(self) -> Dict[str, Any]:
jwt = self.get("jwt") or {} # not available in FunctionURL; key might exist but can be `null`
return jwt.get("claims") or {} # key might exist but can be `null`

@property
def jwt_scopes(self) -> Optional[List[str]]:
jwt = self.get("jwt") or {} # not available in FunctionURL
return jwt.get("scopes")
def jwt_scopes(self) -> List[str]:
jwt = self.get("jwt") or {} # not available in FunctionURL; key might exist but can be `null`
return jwt.get("scopes", [])

@property
def get_lambda(self) -> Optional[Dict[str, Any]]:
def get_lambda(self) -> Dict[str, Any]:
"""Lambda authorization context details"""
return self.get("lambda")
return self.get("lambda") or {} # key might exist but can be `null`

def get_context(self) -> Dict[str, Any]:
"""Retrieve the authorization context details injected by a Lambda Authorizer.
Expand All @@ -238,20 +239,20 @@ def get_context(self) -> Dict[str, Any]:
Dict[str, Any]
A dictionary containing Lambda authorization context details.
"""
return self.get("lambda", {}) or {}
return self.get_lambda

@property
def iam(self) -> Optional[RequestContextV2AuthorizerIam]:
def iam(self) -> RequestContextV2AuthorizerIam:
"""IAM authorization details used for making the request."""
iam = self.get("iam")
return None if iam is None else RequestContextV2AuthorizerIam(iam)
iam = self.get("iam") or {} # key might exist but can be `null`
return RequestContextV2AuthorizerIam(iam)


class RequestContextV2(BaseRequestContextV2):
@property
def authorizer(self) -> Optional[RequestContextV2Authorizer]:
authorizer = self["requestContext"].get("authorizer")
return None if authorizer is None else RequestContextV2Authorizer(authorizer)
def authorizer(self) -> RequestContextV2Authorizer:
ctx = self.get("requestContext") or {} # key might exist but can be `null`
return RequestContextV2Authorizer(ctx.get("authorizer", {}))


class APIGatewayProxyEventV2(BaseProxyEvent):
Expand Down
8 changes: 4 additions & 4 deletions tests/unit/data_classes/test_api_gateway_proxy_event.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,8 @@ def test_api_gateway_proxy_event():
assert request_context.api_id == request_context_raw["apiId"]

authorizer = request_context.authorizer
assert authorizer.claims is None
assert authorizer.scopes is None
assert authorizer.claims == {}
assert authorizer.scopes == []

assert request_context.domain_name == request_context_raw["domainName"]
assert request_context.domain_prefix == request_context_raw["domainPrefix"]
Expand Down Expand Up @@ -144,8 +144,8 @@ def test_api_gateway_proxy_event_with_principal_id():

request_context = parsed_event.request_context
authorizer = request_context.authorizer
assert authorizer.claims is None
assert authorizer.scopes is None
assert authorizer.claims == {}
assert authorizer.scopes == []
assert authorizer.principal_id == raw_event["requestContext"]["authorizer"]["principalId"]
assert authorizer.integration_latency == raw_event["requestContext"]["authorizer"]["integrationLatency"]
assert authorizer.get("integrationStatus", "failed") == "failed"
Expand Down
17 changes: 9 additions & 8 deletions tests/unit/data_classes/test_lambda_function_url.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from aws_lambda_powertools.utilities.data_classes import LambdaFunctionUrlEvent
from aws_lambda_powertools.utilities.data_classes.api_gateway_proxy_event import RequestContextV2Authorizer
from tests.functional.utils import load_event


Expand Down Expand Up @@ -47,7 +48,7 @@ def test_lambda_function_url_event():
assert http.source_ip == http_raw["sourceIp"]
assert http.user_agent == http_raw["userAgent"]

assert request_context.authorizer is None
assert isinstance(request_context.authorizer, RequestContextV2Authorizer)


def test_lambda_function_url_event_iam():
Expand Down Expand Up @@ -102,19 +103,19 @@ def test_lambda_function_url_event_iam():

authorizer = request_context.authorizer
assert authorizer is not None
assert authorizer.jwt_claim is None
assert authorizer.jwt_scopes is None
assert authorizer.get_lambda is None
assert authorizer.jwt_claim == {}
assert authorizer.jwt_scopes == []
assert authorizer.get_lambda == {}

iam = authorizer.iam
iam_raw = raw_event["requestContext"]["authorizer"]["iam"]
assert iam is not None
assert iam.access_key == iam_raw["accessKey"]
assert iam.account_id == iam_raw["accountId"]
assert iam.caller_id == iam_raw["callerId"]
assert iam.cognito_amr is None
assert iam.cognito_identity_id is None
assert iam.cognito_identity_pool_id is None
assert iam.principal_org_id is None
assert iam.cognito_amr == []
assert iam.cognito_identity_id == ""
assert iam.cognito_identity_pool_id == ""
assert iam.principal_org_id == ""
assert iam.user_id == iam_raw["userId"]
assert iam.user_arn == iam_raw["userArn"]