diff --git a/docs/api/stream_processor.md b/docs/api/stream_processor.md index 8f4010e..e2b4ec9 100644 --- a/docs/api/stream_processor.md +++ b/docs/api/stream_processor.md @@ -14,3 +14,34 @@ We convert them into `ProductChangeNotification` model depending on the DynamoDB Domain logic to notify product changes, e.g., `ADDED`, `REMOVED`, `UPDATED`. ::: product.stream_processor.domain_logic.product_notification + +### Integrations + +These are integrations with external services. As of now, we only use one integration to send events, by default `Amazon EventBridge`. + +> NOTE: We could make a single Event Handler. For now, we're using one event handler closely aligned with the model we want to convert into event for type safety. + +::: product.stream_processor.integrations.events.event_handler + +### Events + +::: product.stream_processor.integrations.events.models.input + +::: product.stream_processor.integrations.events.models.output + +#### Providers + +::: product.stream_processor.integrations.events.providers.eventbridge + +#### Interfaces + +::: product.stream_processor.integrations.events.base + +### Utility functions + +::: product.stream_processor.integrations.events.functions +::: product.stream_processor.integrations.events.constants + +### Exceptions + +::: product.stream_processor.integrations.events.exceptions diff --git a/mkdocs.yml b/mkdocs.yml index 1cf4722..88dc3b9 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -5,11 +5,12 @@ repo_url: https://github.com/ran-isenberg/serverless-python-demo edit_uri: edit/main/docs nav: - - Homepage: index.md - - Decision log: decision_log.md - - API reference: - - Stream Processor: api/stream_processor.md - - Product models: api/product_models.md + - Homepage: + - index.md + - Decision log: decision_log.md + - API reference: + - Stream Processor: api/stream_processor.md + - Product models: api/product_models.md theme: name: material @@ -34,6 +35,7 @@ theme: - navigation.instant - navigation.indexes - navigation.tracking + - navigation.tabs - content.code.annotate - content.code.copy icon: @@ -81,14 +83,17 @@ plugins: show_source: true heading_level: 4 allow_inspection: true - group_by_category: true - show_category_heading: true - show_bases: true + group_by_category: true # Attributes, Functions, Classes etc. + show_category_heading: true # Attributes, Functions, Classes etc. + show_bases: true # show inheritance show_docstring_examples: true - show_if_no_docstring: true + show_if_no_docstring: true # show Pydantic models and global attrs merge_init_into_class: true # avoid Class params + __init__ params separate_signature: false show_signature_annotations: false + show_docstring_attributes: false + show_root_heading: false + members_order: source extra_css: - stylesheets/extra.css diff --git a/product/models/products/product.py b/product/models/products/product.py index 6cca04a..ccfece8 100644 --- a/product/models/products/product.py +++ b/product/models/products/product.py @@ -7,15 +7,38 @@ from product.models.products.validators import validate_product_id ProductId = Annotated[str, Field(min_length=36, max_length=36), AfterValidator(validate_product_id)] +"""Unique Product ID, represented and validated as a UUID string.""" class Product(BaseModel): + """Data representation for a product. + + Parameters + ---------- + name : str + Product name + id : ProductId + Product ID (UUID string) + price : PositiveInt + Product price represented as a positive integer + """ name: Annotated[str, Field(min_length=1, max_length=30)] id: ProductId price: PositiveInt class ProductChangeNotification(BaseModel): + """Data representation for a notification about a product change. + + Parameters + ---------- + product_id : ProductId + Product ID (UUID string) + status : Literal['ADDED', 'REMOVED', 'UPDATED'] + Product change status + created_at : datetime + Product change notification creation time (UTC) + """ product_id: ProductId status: Literal['ADDED', 'REMOVED', 'UPDATED'] created_at: datetime = Field(default_factory=datetime.utcnow) diff --git a/product/stream_processor/domain_logic/product_notification.py b/product/stream_processor/domain_logic/product_notification.py index 7306003..d8b5348 100644 --- a/product/stream_processor/domain_logic/product_notification.py +++ b/product/stream_processor/domain_logic/product_notification.py @@ -5,6 +5,7 @@ from product.stream_processor.integrations.events.models.output import EventReceipt from product.stream_processor.integrations.events.providers.eventbridge import EventBridge +# NOTE: these will move to environment variables. Event source format could even use a pydantic validation! EVENT_BUS = os.environ.get('EVENT_BUS', '') EVENT_SOURCE = 'myorg.product.product_notification' diff --git a/product/stream_processor/handlers/process_stream.py b/product/stream_processor/handlers/process_stream.py index 74849fd..21069ba 100644 --- a/product/stream_processor/handlers/process_stream.py +++ b/product/stream_processor/handlers/process_stream.py @@ -50,8 +50,7 @@ def process_stream( Raises ------ - - ProductNotificationDeliveryError + ProductChangeNotificationDeliveryError Partial or total failures when sending notification. It allows the stream to stop at the exact same sequence number. This means sending notifications are at least once. diff --git a/product/stream_processor/integrations/events/base.py b/product/stream_processor/integrations/events/base.py index d7f1873..ecfd724 100644 --- a/product/stream_processor/integrations/events/base.py +++ b/product/stream_processor/integrations/events/base.py @@ -1,32 +1,72 @@ from abc import ABC, abstractmethod from typing import Any, Generic, TypeVar +from product.stream_processor.integrations.events.functions import build_events_from_models from product.stream_processor.integrations.events.models.input import Event from product.stream_processor.integrations.events.models.output import EventReceipt T = TypeVar('T') -# negative look ahead (?|char). Don't try to match the start of the string and any underscore that follows e.g., `_` and `__` - -# full regex: ((?!^)(? Product_Notification -# ProductNotificationV2 -> Product_Notification_V2 -# ProductHTTP -> Product_HTTP - class EventProvider(ABC): + """ABC for an Event Provider that send events to a destination.""" @abstractmethod def send(self, payload: list[Event]) -> EventReceipt: + """Sends list of events to an Event Provider. + + Parameters + ---------- + payload : list[Event] + List of events to send. + + Returns + ------- + EventReceipt + Receipts for unsuccessfully and successfully published events. + + Raises + ------ + NotificationDeliveryError + When one or more events could not be delivered. + """ ... class EventHandler(ABC, Generic[T]): def __init__(self, provider: EventProvider, event_source: str) -> None: + """ABC to handle event manipulation from a model, and publishing through a provider. + + Parameters + ---------- + provider : EventProvider + Event Provider to publish events through. + event_source : str + Event source name, e.g., 'myorg.service.feature' + """ self.provider = provider self.event_source = event_source @abstractmethod def emit(self, payload: list[T], metadata: dict[str, Any] | None = None, correlation_id='') -> EventReceipt: - ... + """Emits product change notifications using registered provider, along with additional metadata or specific correlation ID. + + Parameters + ---------- + payload : list[T] + List of product change notifications models to be sent. + metadata : dict[str, Any] | None, optional + Additional metadata to be injected into the event before sending, by default None + correlation_id : str, optional + Correlation ID to inject in event metadata. We generate one if not provided. + + Returns + ------- + EventReceipt + Receipts for unsuccessfully and successfully published events. + """ + event_payload = build_events_from_models( + models=payload, metadata=metadata, correlation_id=correlation_id, + event_source=self.event_source) # type: ignore[type-var] # T will be defined by its implementation; see ProductChangeNotificationHandler + return self.provider.send(payload=event_payload) diff --git a/product/stream_processor/integrations/events/constants.py b/product/stream_processor/integrations/events/constants.py index 6a7e067..b891d09 100644 --- a/product/stream_processor/integrations/events/constants.py +++ b/product/stream_processor/integrations/events/constants.py @@ -1,2 +1,3 @@ +"""Constants related to events integration (event handler and event providers)""" DEFAULT_EVENT_VERSION = 'v1' EVENTBRIDGE_PROVIDER_MAX_EVENTS_ENTRY = 10 diff --git a/product/stream_processor/integrations/events/event_handler.py b/product/stream_processor/integrations/events/event_handler.py index feca1e7..5d504c5 100644 --- a/product/stream_processor/integrations/events/event_handler.py +++ b/product/stream_processor/integrations/events/event_handler.py @@ -2,15 +2,38 @@ from product.models.products.product import ProductChangeNotification from product.stream_processor.integrations.events.base import EventHandler, EventProvider -from product.stream_processor.integrations.events.functions import build_events_from_models from product.stream_processor.integrations.events.models.output import EventReceipt class ProductChangeNotificationHandler(EventHandler): def __init__(self, provider: EventProvider, event_source: str) -> None: + """Event Handler for ProductChangeNotification. + + Parameters + ---------- + provider : EventProvider + An event provider to send events to. + event_source : str + Event source to inject in event metadata, following 'myorg.service_name.feature_name' + """ super().__init__(provider=provider, event_source=event_source) def emit(self, payload: list[ProductChangeNotification], metadata: dict[str, Any] | None = None, correlation_id='') -> EventReceipt: - event_payload = build_events_from_models(models=payload, metadata=metadata, correlation_id=correlation_id, event_source=self.event_source) - return self.provider.send(payload=event_payload) + """Emits product change notifications using registered provider, along with additional metadata or specific correlation ID. + + Parameters + ---------- + payload : list[ProductChangeNotification] + List of product change notifications models to be sent. + metadata : dict[str, Any] | None, optional + Additional metadata to be injected into the event before sending, by default None + correlation_id : str, optional + Correlation ID to inject in event metadata. We generate one if not provided. + + Returns + ------- + EventReceipt + Receipts for unsuccessfully and successfully published events. + """ + return super().emit(payload, metadata, correlation_id) diff --git a/product/stream_processor/integrations/events/exceptions.py b/product/stream_processor/integrations/events/exceptions.py index 1e641d5..1e0f591 100644 --- a/product/stream_processor/integrations/events/exceptions.py +++ b/product/stream_processor/integrations/events/exceptions.py @@ -1,9 +1,25 @@ from product.stream_processor.integrations.events.models.output import EventReceiptFail -class ProductNotificationDeliveryError(Exception): +class NotificationDeliveryError(Exception): def __init__(self, message: str, receipts: list[EventReceiptFail]): + """Exception raised when a notification delivery fails. + + Parameters + ---------- + message : str + error message + receipts : list[EventReceiptFail] + list of receipts failed notification deliveries along with details + """ super().__init__(message) self.message = message self.receipts = receipts + + +class ProductChangeNotificationDeliveryError(NotificationDeliveryError): + """Raised when one or all product change notification deliveries fail.""" + + def __init__(self, message: str, receipts: list[EventReceiptFail]): + super().__init__(message, receipts) diff --git a/product/stream_processor/integrations/events/functions.py b/product/stream_processor/integrations/events/functions.py index e4baa03..4cab6c3 100644 --- a/product/stream_processor/integrations/events/functions.py +++ b/product/stream_processor/integrations/events/functions.py @@ -1,3 +1,5 @@ +"""Standalone functions related to events integration. These are reused in more than one location, and tested separately""" + import re from typing import Any, Generator, Sequence, TypeVar from uuid import uuid4 @@ -6,21 +8,70 @@ from product.stream_processor.integrations.events.models.input import AnyModel, Event, EventMetadata T = TypeVar('T') +"""Generic type for a list of events""" -_exclude_underscores = r'(?!^)(? str: - """ Convert ModelName (pascal) to MODEL_NAME (snake, uppercase)""" + """Derives a standard event name from the name of the model. + + It uses snake_case in uppercase letters, e.g., `ProductNotification` -> `PRODUCT_NOTIFICATION`. + + It also keeps numbers and acronyms that are typically abbreviation for something intact, e.g.: "ProductHTTP" -> "PRODUCT_HTTP" + + Parameters + ---------- + model_name : str + Name of the model to derive from. + + # Examples + + ```python + from pydantic import BaseModel + + class SampleNotification(BaseModel): + message: str + + notification = SampleNotification(message='testing') + event_name = convert_model_to_event_name(notification.__class__.__name__) + + assert event_name == "SAMPLE_NOTIFICATION" + ``` + + Returns + ------- + str + Standard event name in snake_case upper letters. + """ return _pascal_to_snake_pattern.sub(r'_\1', model_name).upper() def build_events_from_models(models: Sequence[AnyModel], event_source: str, metadata: dict[str, Any] | None = None, correlation_id: str = '') -> list[Event]: + """Converts a Pydantic model into a standard event. + + Parameters + ---------- + models : Sequence[AnyModel] + List of Pydantic models to convert into events. + event_source : str + Event source name to inject into event metadata. + metadata : dict[str, Any] | None, optional + Additional metadata to inject, by default None + correlation_id : str, optional + Correlation ID to use in event metadata. If not provided, we generate one using UUID4. + + Returns + ------- + list[Event] + List of events created from model ready to be emitted. + """ metadata = metadata or {} correlation_id = correlation_id or f'{uuid4()}' @@ -39,6 +90,20 @@ def build_events_from_models(models: Sequence[AnyModel], event_source: str, meta def chunk_from_list(events: list[T], max_items: int) -> Generator[list[T], None, None]: + """Slices a list of items into a generator, respecting the max number of items. + + Parameters + ---------- + events : list[T] + List of events to slice. + max_items : int + Maximum number of items per chunk. + + Yields + ------ + Generator[list[T], None, None] + Generator containing batches of events with maximum number of items requested. + """ for idx in range(0, len(events), max_items): # start, stop, step # slice the first 10 items, then the next 10 items starting from the index yield from [events[idx:idx + max_items]] diff --git a/product/stream_processor/integrations/events/models/input.py b/product/stream_processor/integrations/events/models/input.py index 3be7fb7..edad9a7 100644 --- a/product/stream_processor/integrations/events/models/input.py +++ b/product/stream_processor/integrations/events/models/input.py @@ -7,6 +7,22 @@ class EventMetadata(BaseModel): + """Data representation for a standard event metadata + + Parameters + ---------- + event_name : str + Name of the event, e.g., "PRODUCT_CHANGE_NOTIFICATION" + event_source : str + Event source, e.g., "myorg.service.feature" + event_version : str + Event version, e.g. "v1" + correlation_id : str + Correlation ID, e.g., "b76d27e1-bd2b-4aae-9781-1ef11063c5cd" + + created_at : datetime + Timestamp of when the event was created (UTC) + """ event_name: str event_source: str event_version: str @@ -17,5 +33,14 @@ class EventMetadata(BaseModel): class Event(BaseModel, Generic[AnyModel]): + """Data representation for a standard event + + Parameters + ---------- + data : BaseModel + Any Pydantic BaseModel + metadata : EventMetadata + Event metadata + """ data: AnyModel metadata: EventMetadata diff --git a/product/stream_processor/integrations/events/providers/eventbridge.py b/product/stream_processor/integrations/events/providers/eventbridge.py index a84bc81..1bc0bab 100644 --- a/product/stream_processor/integrations/events/providers/eventbridge.py +++ b/product/stream_processor/integrations/events/providers/eventbridge.py @@ -7,7 +7,7 @@ from product.constants import XRAY_TRACE_ID_ENV from product.stream_processor.integrations.events.base import EventProvider from product.stream_processor.integrations.events.constants import EVENTBRIDGE_PROVIDER_MAX_EVENTS_ENTRY -from product.stream_processor.integrations.events.exceptions import ProductNotificationDeliveryError +from product.stream_processor.integrations.events.exceptions import ProductChangeNotificationDeliveryError from product.stream_processor.integrations.events.functions import chunk_from_list from product.stream_processor.integrations.events.models.input import Event from product.stream_processor.integrations.events.models.output import EventReceipt, EventReceiptFail, EventReceiptSuccess @@ -20,10 +20,38 @@ class EventBridge(EventProvider): def __init__(self, bus_name: str, client: Optional['EventBridgeClient'] = None): + """Amazon EventBridge provider using PutEvents API. + + See [PutEvents docs](https://docs.aws.amazon.com/eventbridge/latest/APIReference/API_PutEvents.html). + + Parameters + ---------- + bus_name : str + Name of the event bus to send events to + client : Optional[EventBridgeClient], optional + EventBridge boto3 client to use, by default None + """ self.bus_name = bus_name self.client = client or boto3.client('events') def send(self, payload: list[Event]) -> EventReceipt: + """Sends batches of events up to maximum allowed by PutEvents API (10). + + Parameters + ---------- + payload : list[Event] + List of events to publish + + Returns + ------- + EventReceipt + Receipts for unsuccessfully and successfully published events + + Raises + ------ + ProductChangeNotificationDeliveryError + When one or more events could not be delivered. + """ success: list[EventReceiptSuccess] = [] failed: list[EventReceiptFail] = [] events = self.build_put_events_requests(payload) @@ -38,11 +66,20 @@ def send(self, payload: list[Event]) -> EventReceipt: error_message = exc.response['Error']['Message'] receipt = EventReceiptFail(receipt_id='', error='error_message', details=exc.response['ResponseMetadata']) - raise ProductNotificationDeliveryError(f'Failed to deliver all events: {error_message}', receipts=[receipt]) from exc + raise ProductChangeNotificationDeliveryError(f'Failed to deliver all events: {error_message}', receipts=[receipt]) from exc return EventReceipt(success=success, failed=failed) def build_put_events_requests(self, payload: list[Event]) -> Generator[list['PutEventsRequestEntryTypeDef'], None, None]: + """Converts a list of events into a list of PutEvents API request. + + If AWS X-Ray is enabled, it automatically includes 'TraceHeader' field in the request. + + Yields + ------ + list['PutEventsRequestEntryTypeDef'] + List of maximum events permitted to be sent by a single PutEvents API. + """ trace_id = os.environ.get(XRAY_TRACE_ID_ENV) for chunk in chunk_from_list(events=payload, max_items=EVENTBRIDGE_PROVIDER_MAX_EVENTS_ENTRY): @@ -82,6 +119,6 @@ def _collect_receipts(result: 'PutEventsResponseTypeDef') -> tuple[list[EventRec # NOTE: Improve this error by correlating which entry failed to send. # We will fail regardless, but it'll be useful for logging and correlation later on. if result['FailedEntryCount'] > 0: - raise ProductNotificationDeliveryError(f'Failed to deliver {len(fails)} events', receipts=fails) + raise ProductChangeNotificationDeliveryError(f'Failed to deliver {len(fails)} events', receipts=fails) return successes, fails diff --git a/tests/unit/stream_processor/test_eventbridge_provider.py b/tests/unit/stream_processor/test_eventbridge_provider.py index face696..429ae45 100644 --- a/tests/unit/stream_processor/test_eventbridge_provider.py +++ b/tests/unit/stream_processor/test_eventbridge_provider.py @@ -7,7 +7,7 @@ from product.constants import XRAY_TRACE_ID_ENV from product.stream_processor.integrations.events.constants import EVENTBRIDGE_PROVIDER_MAX_EVENTS_ENTRY -from product.stream_processor.integrations.events.exceptions import ProductNotificationDeliveryError +from product.stream_processor.integrations.events.exceptions import ProductChangeNotificationDeliveryError from product.stream_processor.integrations.events.functions import build_events_from_models from product.stream_processor.integrations.events.providers.eventbridge import EventBridge @@ -169,7 +169,7 @@ class SampleNotification(BaseModel): event_provider = EventBridge(bus_name=event_bus_name, client=client) - with pytest.raises(ProductNotificationDeliveryError) as exc: + with pytest.raises(ProductChangeNotificationDeliveryError) as exc: event_provider.send(payload=events) # THEN we should receive a ProductNotificationDeliveryError along with its receipts @@ -198,7 +198,7 @@ class SampleNotification(BaseModel): event_provider = EventBridge(bus_name=event_bus_name, client=client) - with pytest.raises(ProductNotificationDeliveryError) as exc: + with pytest.raises(ProductChangeNotificationDeliveryError) as exc: event_provider.send(payload=events) # THEN we should receive a ProductNotificationDeliveryError along with its receipts