diff --git a/aws_lambda_powertools/utilities/data_classes/__init__.py b/aws_lambda_powertools/utilities/data_classes/__init__.py index 20a4131052..8ed77f9f3a 100644 --- a/aws_lambda_powertools/utilities/data_classes/__init__.py +++ b/aws_lambda_powertools/utilities/data_classes/__init__.py @@ -12,6 +12,7 @@ from .dynamo_db_stream_event import DynamoDBStreamEvent from .event_bridge_event import EventBridgeEvent from .event_source import event_source +from .kafka_event import KafkaEvent from .kinesis_stream_event import KinesisStreamEvent from .lambda_function_url_event import LambdaFunctionUrlEvent from .s3_event import S3Event @@ -30,6 +31,7 @@ "ConnectContactFlowEvent", "DynamoDBStreamEvent", "EventBridgeEvent", + "KafkaEvent", "KinesisStreamEvent", "LambdaFunctionUrlEvent", "S3Event", diff --git a/aws_lambda_powertools/utilities/data_classes/kafka_event.py b/aws_lambda_powertools/utilities/data_classes/kafka_event.py new file mode 100644 index 0000000000..5e9201dae9 --- /dev/null +++ b/aws_lambda_powertools/utilities/data_classes/kafka_event.py @@ -0,0 +1,124 @@ +import base64 +import json +from typing import Any, Dict, Iterator, List, Optional + +from aws_lambda_powertools.utilities.data_classes.common import DictWrapper + + +class KafkaEventRecord(DictWrapper): + @property + def topic(self) -> str: + """The Kafka topic.""" + return self["topic"] + + @property + def partition(self) -> str: + """The Kafka record parition.""" + return self["partition"] + + @property + def offset(self) -> str: + """The Kafka record offset.""" + return self["offset"] + + @property + def timestamp(self) -> int: + """The Kafka record timestamp.""" + return self["timestamp"] + + @property + def timestamp_type(self) -> str: + """The Kafka record timestamp type.""" + return self["timestampType"] + + @property + def key(self) -> str: + """The raw (base64 encoded) Kafka record key.""" + return self["key"] + + @property + def decoded_key(self) -> bytes: + """Decode the base64 encoded key as bytes.""" + return base64.b64decode(self.key) + + @property + def value(self) -> str: + """The raw (base64 encoded) Kafka record value.""" + return self["value"] + + @property + def decoded_value(self) -> bytes: + """Decodes the base64 encoded value as bytes.""" + return base64.b64decode(self.value) + + @property + def json_value(self) -> Any: + """Decodes the text encoded data as JSON.""" + if self._json_data is None: + self._json_data = json.loads(self.decoded_value.decode("utf-8")) + return self._json_data + + @property + def headers(self) -> List[Dict[str, List[int]]]: + """The raw Kafka record headers.""" + return self["headers"] + + @property + def decoded_headers(self) -> Dict[str, bytes]: + """Decodes the headers as a single dictionary.""" + return {k: bytes(v) for chunk in self.headers for k, v in chunk.items()} + + def get_header_value( + self, name: str, default_value: Optional[Any] = None, case_sensitive: bool = True + ) -> Optional[bytes]: + """Get a decoded header value by name.""" + if case_sensitive: + return self.decoded_headers.get(name, default_value) + name_lower = name.lower() + + return next( + # Iterate over the dict and do a case-insensitive key comparison + (value for key, value in self.decoded_headers.items() if key.lower() == name_lower), + # Default value is returned if no matches was found + default_value, + ) + + +class KafkaEvent(DictWrapper): + """Self-managed Apache Kafka event trigger + Documentation: + -------------- + - https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html + """ + + @property + def event_source(self) -> str: + """The AWS service from which the Kafka event record originated.""" + return self["eventSource"] + + @property + def event_source_arn(self) -> Optional[str]: + """The AWS service ARN from which the Kafka event record originated.""" + return self.get("eventSourceArn") + + @property + def bootstrap_servers(self) -> str: + """The Kafka bootstrap URL.""" + return self["bootstrapServers"] + + @property + def decoded_bootstrap_servers(self) -> List[str]: + """The decoded Kafka bootstrap URL.""" + return self.bootstrap_servers.split(",") + + @property + def records(self) -> Iterator[KafkaEventRecord]: + """The Kafka records.""" + for chunk in self["records"].values(): + for record in chunk: + yield KafkaEventRecord(record) + + @property + def record(self) -> KafkaEventRecord: + """The next Kafka record.""" + return next(self.records) diff --git a/docs/utilities/data_classes.md b/docs/utilities/data_classes.md index 86cbebd3c9..67d821fe04 100644 --- a/docs/utilities/data_classes.md +++ b/docs/utilities/data_classes.md @@ -75,6 +75,7 @@ Event Source | Data_class [Connect Contact Flow](#connect-contact-flow) | `ConnectContactFlowEvent` [DynamoDB streams](#dynamodb-streams) | `DynamoDBStreamEvent`, `DynamoDBRecordEventName` [EventBridge](#eventbridge) | `EventBridgeEvent` +[Kafka](#kafka) | `KafkaEvent` [Kinesis Data Stream](#kinesis-streams) | `KinesisStreamEvent` [Lambda Function URL](#lambda-function-url) | `LambdaFunctionUrlEvent` [Rabbit MQ](#rabbit-mq) | `RabbitMQEvent` @@ -852,6 +853,22 @@ attributes values (`AttributeValue`), as well as enums for stream view type (`St ``` +### Kafka + +This example is based on the AWS docs for [Amazon MSK](https://docs.aws.amazon.com/lambda/latest/dg/with-msk.html){target="_blank"} and [self-managed Apache Kafka](https://docs.aws.amazon.com/lambda/latest/dg/with-kafka.html){target="_blank"}. + +=== "app.py" + + ```python + from aws_lambda_powertools.utilities.data_classes import event_source, KafkaEvent + + @event_source(data_class=KafkaEvent) + def lambda_handler(event: KafkaEvent, context): + for record in event.records: + do_something_with(record.decoded_key, record.json_value) + + ``` + ### Kinesis streams Kinesis events by default contain base64 encoded data. You can use the helper function to access the data either as json diff --git a/tests/events/kafkaEventMsk.json b/tests/events/kafkaEventMsk.json new file mode 100644 index 0000000000..5a35b89680 --- /dev/null +++ b/tests/events/kafkaEventMsk.json @@ -0,0 +1,35 @@ +{ + "eventSource":"aws:kafka", + "eventSourceArn":"arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4", + "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + "records":{ + "mytopic-0":[ + { + "topic":"mytopic", + "partition":0, + "offset":15, + "timestamp":1545084650987, + "timestampType":"CREATE_TIME", + "key":"cmVjb3JkS2V5", + "value":"eyJrZXkiOiJ2YWx1ZSJ9", + "headers":[ + { + "headerKey":[ + 104, + 101, + 97, + 100, + 101, + 114, + 86, + 97, + 108, + 117, + 101 + ] + } + ] + } + ] + } +} diff --git a/tests/events/kafkaEventSelfManaged.json b/tests/events/kafkaEventSelfManaged.json new file mode 100644 index 0000000000..17372b7c24 --- /dev/null +++ b/tests/events/kafkaEventSelfManaged.json @@ -0,0 +1,34 @@ +{ + "eventSource":"aws:aws:SelfManagedKafka", + "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + "records":{ + "mytopic-0":[ + { + "topic":"mytopic", + "partition":0, + "offset":15, + "timestamp":1545084650987, + "timestampType":"CREATE_TIME", + "key":"cmVjb3JkS2V5", + "value":"eyJrZXkiOiJ2YWx1ZSJ9", + "headers":[ + { + "headerKey":[ + 104, + 101, + 97, + 100, + 101, + 114, + 86, + 97, + 108, + 117, + 101 + ] + } + ] + } + ] + } +} diff --git a/tests/functional/test_data_classes.py b/tests/functional/test_data_classes.py index 5c7423add6..00dd5100f6 100644 --- a/tests/functional/test_data_classes.py +++ b/tests/functional/test_data_classes.py @@ -17,6 +17,7 @@ CloudWatchLogsEvent, CodePipelineJobEvent, EventBridgeEvent, + KafkaEvent, KinesisStreamEvent, S3Event, SESEvent, @@ -1138,6 +1139,68 @@ def test_base_proxy_event_json_body_with_base64_encoded_data(): assert event.json_body == data +def test_kafka_msk_event(): + event = KafkaEvent(load_event("kafkaEventMsk.json")) + assert event.event_source == "aws:kafka" + assert ( + event.event_source_arn + == "arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4" + ) + + bootstrap_servers_raw = "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092" # noqa E501 + + bootstrap_servers_list = [ + "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + "b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + ] + + assert event.bootstrap_servers == bootstrap_servers_raw + assert event.decoded_bootstrap_servers == bootstrap_servers_list + + records = list(event.records) + assert len(records) == 1 + record = records[0] + assert record.topic == "mytopic" + assert record.partition == 0 + assert record.offset == 15 + assert record.timestamp == 1545084650987 + assert record.timestamp_type == "CREATE_TIME" + assert record.decoded_key == b"recordKey" + assert record.value == "eyJrZXkiOiJ2YWx1ZSJ9" + assert record.json_value == {"key": "value"} + assert record.decoded_headers == {"headerKey": b"headerValue"} + assert record.get_header_value("HeaderKey", case_sensitive=False) == b"headerValue" + + +def test_kafka_self_managed_event(): + event = KafkaEvent(load_event("kafkaEventSelfManaged.json")) + assert event.event_source == "aws:aws:SelfManagedKafka" + + bootstrap_servers_raw = "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092" # noqa E501 + + bootstrap_servers_list = [ + "b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + "b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", + ] + + assert event.bootstrap_servers == bootstrap_servers_raw + assert event.decoded_bootstrap_servers == bootstrap_servers_list + + records = list(event.records) + assert len(records) == 1 + record = records[0] + assert record.topic == "mytopic" + assert record.partition == 0 + assert record.offset == 15 + assert record.timestamp == 1545084650987 + assert record.timestamp_type == "CREATE_TIME" + assert record.decoded_key == b"recordKey" + assert record.value == "eyJrZXkiOiJ2YWx1ZSJ9" + assert record.json_value == {"key": "value"} + assert record.decoded_headers == {"headerKey": b"headerValue"} + assert record.get_header_value("HeaderKey", case_sensitive=False) == b"headerValue" + + def test_kinesis_stream_event(): event = KinesisStreamEvent(load_event("kinesisStreamEvent.json"))