Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs(stream_processor): complete docstrings #81

Merged
merged 73 commits into from
Oct 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
73 commits
Select commit Hold shift + click to select a range
ba6c401
chore: support for cdk watch to speedup dev
heitorlessa Sep 29, 2023
01bb800
fix: ensure pytest only searches tests folder
heitorlessa Sep 29, 2023
a452f86
chore: cdk watch for infra too
heitorlessa Sep 29, 2023
57fc38e
chore: use getpass over outdated getlogin
heitorlessa Sep 29, 2023
880834e
chore: allow branches with _
heitorlessa Sep 29, 2023
6d5fa6d
chore: standardize stream resource names
heitorlessa Sep 29, 2023
409b58e
chore: add hello world stream proc lambda
heitorlessa Sep 29, 2023
afbac00
chore: add watch target in makefile
heitorlessa Sep 30, 2023
8d7dc71
chore: make product models independent
heitorlessa Sep 30, 2023
1821ef9
chore: remove stream specific schema for now
heitorlessa Sep 30, 2023
956a2e1
chore(domain): create skeleton to notify updates
heitorlessa Sep 30, 2023
657bdb8
chore: add placeholder code for handler
heitorlessa Sep 30, 2023
a622cf2
chore: add placeholder unit test
heitorlessa Sep 30, 2023
5cb8b49
chore: rename stream processor to align w/ crud
heitorlessa Sep 30, 2023
68788d0
chore: add format-fix, fix yapf errors
heitorlessa Sep 30, 2023
ba92b7f
fix(tests): stack not found when running integ
heitorlessa Sep 30, 2023
e7b59b7
chore: align handler and fn handler name w/ crud
heitorlessa Sep 30, 2023
11ab329
chore: add mypy boto3 events dev dep
heitorlessa Oct 1, 2023
6b85eb7
feat: add initial DAL protocol and eventbridge
heitorlessa Oct 1, 2023
6c746bf
refactor: use status field over change_status since it's a notificati…
heitorlessa Oct 1, 2023
689e09e
refactor: move test doubles and fixtures to conftest
heitorlessa Oct 1, 2023
013b608
chore: test product_notifications
heitorlessa Oct 1, 2023
c340352
chore: add EventReceipt output model
heitorlessa Oct 2, 2023
1dd1fb0
chore: ignore .idea dir
heitorlessa Oct 2, 2023
c38a33c
chore: create and use Event model
heitorlessa Oct 2, 2023
d2bd0b7
chore: use generic container for emit
heitorlessa Oct 2, 2023
fa546f0
chore: fix event serialization; cleanup
heitorlessa Oct 2, 2023
2723fbf
chore: future note for event slicing
heitorlessa Oct 2, 2023
aa75844
chore: disable sockets for unit test
heitorlessa Oct 3, 2023
e9f3aa4
chore: add eventbridge provider test skeleton
heitorlessa Oct 3, 2023
ab590c2
chore: change to ProductChangeNotification
heitorlessa Oct 3, 2023
b90175f
chore: infer event structure from any model
heitorlessa Oct 3, 2023
e3115b2
chore: cleanup
heitorlessa Oct 3, 2023
865fb4b
chore: test event structure and model to event conversions
heitorlessa Oct 3, 2023
903d069
chore: adjust comment on event name
heitorlessa Oct 3, 2023
5ba71d3
chore: complete eventbridge contract tests
heitorlessa Oct 3, 2023
2d327bf
chore: remove dead code
heitorlessa Oct 3, 2023
f209601
chore: chunk maximum allowed events
heitorlessa Oct 4, 2023
6dc4954
chore: test chunk logic separately
heitorlessa Oct 4, 2023
86258eb
chore: linting
heitorlessa Oct 4, 2023
ed3a029
Merge branch 'main' into stream_processor
heitorlessa Oct 4, 2023
bd714c6
refactor: move standalones to functions.py; complete coverage
heitorlessa Oct 4, 2023
c8a5b55
refactor: move standalones to functions.py; complete coverage
heitorlessa Oct 4, 2023
7e219ed
fix(mypy): narrow typing
heitorlessa Oct 4, 2023
0112d0c
chore: enable pydantic plugin for mypy
heitorlessa Oct 4, 2023
24206bd
chore: explicit typed dict type as mypy can't infer
heitorlessa Oct 4, 2023
c6e8c48
chore: explicit type as mypy can't infer
heitorlessa Oct 4, 2023
4a0aaef
chore: actually apply pydantic plugin; skip pytest_socket missing py.…
heitorlessa Oct 4, 2023
cd157e9
chore: make pr fixes
heitorlessa Oct 4, 2023
fd003d7
refactor: rename dal to integrations
heitorlessa Oct 4, 2023
1841f95
docs(domain): add initial docstrings
heitorlessa Oct 4, 2023
9fcf867
chore: add mkdocs and mkdocstrings for documentation
heitorlessa Oct 4, 2023
9c17f07
docs(handlers): add docstring for process_stream
heitorlessa Oct 4, 2023
e1f6476
docs(domain): use markdown to create anchor
heitorlessa Oct 4, 2023
be733bc
docs: increase indentation to improve nav
heitorlessa Oct 4, 2023
6cc898c
docs(handler): add integrations section
heitorlessa Oct 4, 2023
bddc7b4
docs(domain): add integration section
heitorlessa Oct 4, 2023
0d5728b
docs(models): add Pydantic models
heitorlessa Oct 4, 2023
5ce39a3
docs(stream_processor): handlers first
heitorlessa Oct 4, 2023
106f84b
docs(models): docstring pydantic models for API docs
heitorlessa Oct 4, 2023
a8cb6c5
docs(integrations): docstring constants for API docs
heitorlessa Oct 4, 2023
de577ca
docs(events): docstring to standalone functions
heitorlessa Oct 4, 2023
7b62dc1
docs(events): docstring product change handler
heitorlessa Oct 4, 2023
7602e09
docs: add integrations; API docs rendering adjusts
heitorlessa Oct 4, 2023
cf82f1b
docs(api): include exceptions; add base exception
heitorlessa Oct 4, 2023
1d51a21
chore: move leftover comments to functions; linting
heitorlessa Oct 4, 2023
cf557d9
docs(api): docstring EventProvider
heitorlessa Oct 4, 2023
7d278ab
docs(api): docstring interfaces
heitorlessa Oct 4, 2023
c297f51
docs(api): docstring eventbridge provider
heitorlessa Oct 4, 2023
27f6044
refactor(events): move emit logic to base
heitorlessa Oct 4, 2023
7b935fe
Merge branch 'main' into stream_processor
heitorlessa Oct 4, 2023
6c27c67
docs(api): document standard Event and EventMetadata
heitorlessa Oct 4, 2023
de6f465
docs: readd homepage index
heitorlessa Oct 4, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions docs/api/stream_processor.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,34 @@ We convert them into `ProductChangeNotification` model depending on the DynamoDB
Domain logic to notify product changes, e.g., `ADDED`, `REMOVED`, `UPDATED`.

::: product.stream_processor.domain_logic.product_notification

### Integrations

These are integrations with external services. As of now, we only use one integration to send events, by default `Amazon EventBridge`.

> NOTE: We could make a single Event Handler. For now, we're using one event handler closely aligned with the model we want to convert into event for type safety.

::: product.stream_processor.integrations.events.event_handler

### Events

::: product.stream_processor.integrations.events.models.input

::: product.stream_processor.integrations.events.models.output

#### Providers

::: product.stream_processor.integrations.events.providers.eventbridge

#### Interfaces

::: product.stream_processor.integrations.events.base

### Utility functions

::: product.stream_processor.integrations.events.functions
::: product.stream_processor.integrations.events.constants

### Exceptions

::: product.stream_processor.integrations.events.exceptions
23 changes: 14 additions & 9 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ repo_url: https://github.com/ran-isenberg/serverless-python-demo
edit_uri: edit/main/docs

nav:
- Homepage: index.md
- Decision log: decision_log.md
- API reference:
- Stream Processor: api/stream_processor.md
- Product models: api/product_models.md
- Homepage:
- index.md
- Decision log: decision_log.md
- API reference:
- Stream Processor: api/stream_processor.md
- Product models: api/product_models.md

theme:
name: material
Expand All @@ -34,6 +35,7 @@ theme:
- navigation.instant
- navigation.indexes
- navigation.tracking
- navigation.tabs
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you really like them tabs :)

- content.code.annotate
- content.code.copy
icon:
Expand Down Expand Up @@ -81,14 +83,17 @@ plugins:
show_source: true
heading_level: 4
allow_inspection: true
group_by_category: true
show_category_heading: true
show_bases: true
group_by_category: true # Attributes, Functions, Classes etc.
show_category_heading: true # Attributes, Functions, Classes etc.
show_bases: true # show inheritance
show_docstring_examples: true
show_if_no_docstring: true
show_if_no_docstring: true # show Pydantic models and global attrs
merge_init_into_class: true # avoid Class params + __init__ params
separate_signature: false
show_signature_annotations: false
show_docstring_attributes: false
show_root_heading: false
members_order: source

extra_css:
- stylesheets/extra.css
Expand Down
23 changes: 23 additions & 0 deletions product/models/products/product.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,38 @@
from product.models.products.validators import validate_product_id

ProductId = Annotated[str, Field(min_length=36, max_length=36), AfterValidator(validate_product_id)]
"""Unique Product ID, represented and validated as a UUID string."""


class Product(BaseModel):
"""Data representation for a product.

Parameters
----------
name : str
Product name
id : ProductId
Product ID (UUID string)
price : PositiveInt
Product price represented as a positive integer
"""
name: Annotated[str, Field(min_length=1, max_length=30)]
id: ProductId
price: PositiveInt


class ProductChangeNotification(BaseModel):
"""Data representation for a notification about a product change.

Parameters
----------
product_id : ProductId
Product ID (UUID string)
status : Literal['ADDED', 'REMOVED', 'UPDATED']
Product change status
created_at : datetime
Product change notification creation time (UTC)
"""
product_id: ProductId
status: Literal['ADDED', 'REMOVED', 'UPDATED']
created_at: datetime = Field(default_factory=datetime.utcnow)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from product.stream_processor.integrations.events.models.output import EventReceipt
from product.stream_processor.integrations.events.providers.eventbridge import EventBridge

# NOTE: these will move to environment variables. Event source format could even use a pydantic validation!
EVENT_BUS = os.environ.get('EVENT_BUS', '')
EVENT_SOURCE = 'myorg.product.product_notification'

Expand Down
3 changes: 1 addition & 2 deletions product/stream_processor/handlers/process_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ def process_stream(

Raises
------

ProductNotificationDeliveryError
ProductChangeNotificationDeliveryError
Partial or total failures when sending notification. It allows the stream to stop at the exact same sequence number.

This means sending notifications are at least once.
Expand Down
56 changes: 48 additions & 8 deletions product/stream_processor/integrations/events/base.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,72 @@
from abc import ABC, abstractmethod
from typing import Any, Generic, TypeVar

from product.stream_processor.integrations.events.functions import build_events_from_models
from product.stream_processor.integrations.events.models.input import Event
from product.stream_processor.integrations.events.models.output import EventReceipt

T = TypeVar('T')

# negative look ahead (?|char). Don't try to match the start of the string and any underscore that follows e.g., `_<name>` and `__<name>`

# full regex: ((?!^)(?<!_)[A-Z][a-z]+|(?<=[a-z0-9])[A-Z])
# ProductNotification -> Product_Notification
# ProductNotificationV2 -> Product_Notification_V2
# ProductHTTP -> Product_HTTP


class EventProvider(ABC):
"""ABC for an Event Provider that send events to a destination."""

@abstractmethod
def send(self, payload: list[Event]) -> EventReceipt:
"""Sends list of events to an Event Provider.

Parameters
----------
payload : list[Event]
List of events to send.

Returns
-------
EventReceipt
Receipts for unsuccessfully and successfully published events.

Raises
------
NotificationDeliveryError
When one or more events could not be delivered.
"""
...


class EventHandler(ABC, Generic[T]):

def __init__(self, provider: EventProvider, event_source: str) -> None:
"""ABC to handle event manipulation from a model, and publishing through a provider.

Parameters
----------
provider : EventProvider
Event Provider to publish events through.
event_source : str
Event source name, e.g., 'myorg.service.feature'
"""
self.provider = provider
self.event_source = event_source

@abstractmethod
def emit(self, payload: list[T], metadata: dict[str, Any] | None = None, correlation_id='') -> EventReceipt:
...
"""Emits product change notifications using registered provider, along with additional metadata or specific correlation ID.

Parameters
----------
payload : list[T]
List of product change notifications models to be sent.
metadata : dict[str, Any] | None, optional
Additional metadata to be injected into the event before sending, by default None
correlation_id : str, optional
Correlation ID to inject in event metadata. We generate one if not provided.

Returns
-------
EventReceipt
Receipts for unsuccessfully and successfully published events.
"""
event_payload = build_events_from_models(

Check warning on line 69 in product/stream_processor/integrations/events/base.py

View check run for this annotation

Codecov / codecov/patch

product/stream_processor/integrations/events/base.py#L69

Added line #L69 was not covered by tests
models=payload, metadata=metadata, correlation_id=correlation_id,
event_source=self.event_source) # type: ignore[type-var] # T will be defined by its implementation; see ProductChangeNotificationHandler
return self.provider.send(payload=event_payload)

Check warning on line 72 in product/stream_processor/integrations/events/base.py

View check run for this annotation

Codecov / codecov/patch

product/stream_processor/integrations/events/base.py#L72

Added line #L72 was not covered by tests
1 change: 1 addition & 0 deletions product/stream_processor/integrations/events/constants.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
"""Constants related to events integration (event handler and event providers)"""
DEFAULT_EVENT_VERSION = 'v1'
EVENTBRIDGE_PROVIDER_MAX_EVENTS_ENTRY = 10
29 changes: 26 additions & 3 deletions product/stream_processor/integrations/events/event_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,38 @@

from product.models.products.product import ProductChangeNotification
from product.stream_processor.integrations.events.base import EventHandler, EventProvider
from product.stream_processor.integrations.events.functions import build_events_from_models
from product.stream_processor.integrations.events.models.output import EventReceipt


class ProductChangeNotificationHandler(EventHandler):

def __init__(self, provider: EventProvider, event_source: str) -> None:
"""Event Handler for ProductChangeNotification.

Parameters
----------
provider : EventProvider
An event provider to send events to.
event_source : str
Event source to inject in event metadata, following 'myorg.service_name.feature_name'
"""
super().__init__(provider=provider, event_source=event_source)

def emit(self, payload: list[ProductChangeNotification], metadata: dict[str, Any] | None = None, correlation_id='') -> EventReceipt:
event_payload = build_events_from_models(models=payload, metadata=metadata, correlation_id=correlation_id, event_source=self.event_source)
return self.provider.send(payload=event_payload)
"""Emits product change notifications using registered provider, along with additional metadata or specific correlation ID.

Parameters
----------
payload : list[ProductChangeNotification]
List of product change notifications models to be sent.
metadata : dict[str, Any] | None, optional
Additional metadata to be injected into the event before sending, by default None
correlation_id : str, optional
Correlation ID to inject in event metadata. We generate one if not provided.

Returns
-------
EventReceipt
Receipts for unsuccessfully and successfully published events.
"""
return super().emit(payload, metadata, correlation_id)

Check warning on line 39 in product/stream_processor/integrations/events/event_handler.py

View check run for this annotation

Codecov / codecov/patch

product/stream_processor/integrations/events/event_handler.py#L39

Added line #L39 was not covered by tests
18 changes: 17 additions & 1 deletion product/stream_processor/integrations/events/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,25 @@
from product.stream_processor.integrations.events.models.output import EventReceiptFail


class ProductNotificationDeliveryError(Exception):
class NotificationDeliveryError(Exception):

def __init__(self, message: str, receipts: list[EventReceiptFail]):
"""Exception raised when a notification delivery fails.

Parameters
----------
message : str
error message
receipts : list[EventReceiptFail]
list of receipts failed notification deliveries along with details
"""
super().__init__(message)
self.message = message
self.receipts = receipts


class ProductChangeNotificationDeliveryError(NotificationDeliveryError):
"""Raised when one or all product change notification deliveries fail."""

def __init__(self, message: str, receipts: list[EventReceiptFail]):
super().__init__(message, receipts)
71 changes: 68 additions & 3 deletions product/stream_processor/integrations/events/functions.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
"""Standalone functions related to events integration. These are reused in more than one location, and tested separately"""

import re
from typing import Any, Generator, Sequence, TypeVar
from uuid import uuid4
Expand All @@ -6,21 +8,70 @@
from product.stream_processor.integrations.events.models.input import AnyModel, Event, EventMetadata

T = TypeVar('T')
"""Generic type for a list of events"""

_exclude_underscores = r'(?!^)(?<!_)'
_pascal_case = r'[A-Z][a-z]+'
# full regex: ((?!^)(?<!_)[A-Z][a-z]+|(?<=[a-z0-9])[A-Z])
_exclude_underscores = r'(?!^)(?<!_)' # _ProductNotification
_pascal_case = r'[A-Z][a-z]+' # ProductNotification
_followed_by_lower_case_or_digit = r'(?<=[a-z0-9])[A-Z])' # V1ProductNotification
_or = r'|'
_pascal_to_snake_pattern = re.compile(rf'({_exclude_underscores}{_pascal_case}{_or}{_followed_by_lower_case_or_digit}')


def convert_model_to_event_name(model_name: str) -> str:
""" Convert ModelName (pascal) to MODEL_NAME (snake, uppercase)"""
"""Derives a standard event name from the name of the model.

It uses snake_case in uppercase letters, e.g., `ProductNotification` -> `PRODUCT_NOTIFICATION`.

It also keeps numbers and acronyms that are typically abbreviation for something intact, e.g.: "ProductHTTP" -> "PRODUCT_HTTP"

Parameters
----------
model_name : str
Name of the model to derive from.

# Examples

```python
from pydantic import BaseModel

class SampleNotification(BaseModel):
message: str

notification = SampleNotification(message='testing')
event_name = convert_model_to_event_name(notification.__class__.__name__)

assert event_name == "SAMPLE_NOTIFICATION"
```

Returns
-------
str
Standard event name in snake_case upper letters.
"""
return _pascal_to_snake_pattern.sub(r'_\1', model_name).upper()


def build_events_from_models(models: Sequence[AnyModel], event_source: str, metadata: dict[str, Any] | None = None,
correlation_id: str = '') -> list[Event]:
"""Converts a Pydantic model into a standard event.

Parameters
----------
models : Sequence[AnyModel]
List of Pydantic models to convert into events.
event_source : str
Event source name to inject into event metadata.
metadata : dict[str, Any] | None, optional
Additional metadata to inject, by default None
correlation_id : str, optional
Correlation ID to use in event metadata. If not provided, we generate one using UUID4.

Returns
-------
list[Event]
List of events created from model ready to be emitted.
"""
metadata = metadata or {}
correlation_id = correlation_id or f'{uuid4()}'

Expand All @@ -39,6 +90,20 @@ def build_events_from_models(models: Sequence[AnyModel], event_source: str, meta


def chunk_from_list(events: list[T], max_items: int) -> Generator[list[T], None, None]:
"""Slices a list of items into a generator, respecting the max number of items.

Parameters
----------
events : list[T]
List of events to slice.
max_items : int
Maximum number of items per chunk.

Yields
------
Generator[list[T], None, None]
Generator containing batches of events with maximum number of items requested.
"""
for idx in range(0, len(events), max_items): # start, stop, step
# slice the first 10 items, then the next 10 items starting from the index
yield from [events[idx:idx + max_items]]
Loading
Loading