Skip to content

Commit

Permalink
feat(stream_processor): initial version for processing product changes (
Browse files Browse the repository at this point in the history
#76)

* chore: support for cdk watch to speedup dev

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* fix: ensure pytest only searches tests folder

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: cdk watch for infra too

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: use getpass over outdated getlogin

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: allow branches with _

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: standardize stream resource names

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: add hello world stream proc lambda

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: add watch target in makefile

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: make product models independent

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: remove stream specific schema for now

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore(domain): create skeleton to notify updates

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: add placeholder code for handler

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: add placeholder unit test

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: rename stream processor to align w/ crud

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: add format-fix, fix yapf errors

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* fix(tests): stack not found when running integ

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: align handler and fn handler name w/ crud

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: add mypy boto3 events dev dep

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* feat: add initial DAL protocol and eventbridge

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* refactor: use status field over change_status since it's a notification already

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* refactor: move test doubles and fixtures to conftest

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: test product_notifications

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: add EventReceipt output model

* chore: ignore .idea dir

* chore: create and use Event model

* chore: use generic container for emit

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: fix event serialization; cleanup

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: future note for event slicing

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: disable sockets for unit test

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: add eventbridge provider test skeleton

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: change to ProductChangeNotification

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: infer event structure from any model

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: cleanup

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: test event structure and model to event conversions

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: adjust comment on event name

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: complete eventbridge contract tests

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: remove dead code

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: chunk maximum allowed events

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: test chunk logic separately

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: linting

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* refactor: move standalones to functions.py; complete coverage

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* refactor: move standalones to functions.py; complete coverage

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* fix(mypy): narrow typing

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: enable pydantic plugin for mypy

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: explicit typed dict type as mypy can't infer

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: explicit type as mypy can't infer

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: actually apply pydantic plugin; skip pytest_socket missing py.typed

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: make pr fixes

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* refactor: rename dal to integrations

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* docs(domain): add initial docstrings

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* chore: add mkdocs and mkdocstrings for documentation

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* docs(handlers): add docstring for process_stream

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* docs(domain): use markdown to create anchor

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* docs: increase indentation to improve nav

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* docs(handler): add integrations section

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* docs(domain): add integration section

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* docs(models): add Pydantic models

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

* docs(stream_processor): handlers first

Signed-off-by: heitorlessa <lessa@amazon.co.uk>

---------

Signed-off-by: heitorlessa <lessa@amazon.co.uk>
  • Loading branch information
heitorlessa authored Oct 4, 2023
1 parent 7f6c64d commit 576c669
Show file tree
Hide file tree
Showing 42 changed files with 1,254 additions and 18 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -252,3 +252,4 @@ lambda_requirements.txt
# Misc

node_modules
.idea
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ format:
poetry run isort .
poetry run yapf -d -vv --style=./.style -r .

format-fix:
poetry run isort .
poetry run yapf -vv --style=./.style -r --in-place .

lint: format
@echo "Running flake8"
poetry run flake8 product/* infrastructure/* tests/* docs/examples/*
Expand Down Expand Up @@ -68,3 +72,6 @@ docs:

lint-docs:
docker run -v ${PWD}:/markdown 06kellyjac/markdownlint-cli --fix "docs"

watch:
npx cdk watch
15 changes: 14 additions & 1 deletion cdk.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
{
"app": "poetry run python app.py"
"app": "poetry run python app.py",
"watch": {
"include": [
"product/**",
"infrastructure/product/**"
],
"exclude": [
"product/**/*.pyc",
"product/**/__pycache__",
"infrastructure/**/*.pyc",
"infrastructure/**/__pycache__"
]
},
"build": "make build"
}
7 changes: 7 additions & 0 deletions docs/api/product_models.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
## Product models

::: product.models.products.product

## Validators

::: product.models.products.validators
16 changes: 16 additions & 0 deletions docs/api/stream_processor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@

## Product notification

### Lambda Handlers

Process stream is connected to Amazon DynamoDB Stream that polls product changes in the product table.

We convert them into `ProductChangeNotification` model depending on the DynamoDB Stream Event Name (e.g., `INSERT` -> `ADDED`).

::: product.stream_processor.handlers.process_stream

### Domain logic

Domain logic to notify product changes, e.g., `ADDED`, `REMOVED`, `UPDATED`.

::: product.stream_processor.domain_logic.product_notification
8 changes: 5 additions & 3 deletions infrastructure/product/constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
SERVICE_ROLE_ARN = 'ServiceRoleArn'
STREAM_PROC_SERVICE_ROLE_ARN = 'StreamRoleArn'
LAMBDA_BASIC_EXECUTION_ROLE = 'AWSLambdaBasicExecutionRole'
CREATE_PRODUCT_ROLE = 'ServiceRole'
DELETE_PRODUCT_ROLE = 'DeleteRole'
Expand Down Expand Up @@ -28,8 +27,11 @@
BUILD_FOLDER = '.build/lambdas/'
COMMON_LAYER_BUILD_FOLDER = '.build/common_layer'
CRUD_CONSTRUCT_NAME = 'Crud'
STREAM_PROC_CONSTRUCT_NAME = 'StreamProc'
OWNER_TAG = 'owner'
REST_API_NAME = 'crud-rest-api'
EVENT_BUS_NAME = 'events'
STREAM_PROC_LAMBDA = 'Stream'
STREAM_PROCESSOR_CONSTRUCT_NAME = 'StreamProc'
STREAM_PROCESSOR_LAMBDA = 'StreamProcessor'
STREAM_PROCESSOR_LAMBDA_MEMORY_SIZE = 128 # MB
STREAM_PROCESSOR_LAMBDA_TIMEOUT = 120 # seconds
STREAM_PROCESSOR_LAMBDA_SERVICE_ROLE_ARN = 'StreamRoleArn'
2 changes: 1 addition & 1 deletion infrastructure/product/service_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def __init__(self, scope: Construct, id: str, **kwargs) -> None:

self.stream_processor = StreamProcessorConstruct(
self,
id_=get_construct_name(id, constants.STREAM_PROC_CONSTRUCT_NAME),
id_=constants.STREAM_PROCESSOR_CONSTRUCT_NAME,
lambda_layer=self.shared_layer,
dynamodb_table=self.api.api_db.db,
)
Expand Down
14 changes: 7 additions & 7 deletions infrastructure/product/stream_processor_construct.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ def __init__(self, scope: Construct, id_: str, lambda_layer: PythonLayerVersion,
super().__init__(scope, id_)
bus_name = f'{id_}{constants.EVENT_BUS_NAME}'
self.event_bus = events.EventBus(self, bus_name, event_bus_name=bus_name)
self.role = self._build_lambda_role(dynamodb_table, self.event_bus)
self.role = self._build_lambda_role(db=dynamodb_table, bus=self.event_bus)

self.lambda_function = self._build_stream_processor_lambda(self.role, lambda_layer, dynamodb_table)

def _build_lambda_role(self, db: dynamodb.Table, bus: events.EventBus) -> iam.Role:
return iam.Role(
self,
constants.STREAM_PROC_SERVICE_ROLE_ARN,
id=constants.STREAM_PROCESSOR_LAMBDA_SERVICE_ROLE_ARN,
assumed_by=iam.ServicePrincipal('lambda.amazonaws.com'),
inline_policies={
'streams':
Expand All @@ -51,21 +51,21 @@ def _build_lambda_role(self, db: dynamodb.Table, bus: events.EventBus) -> iam.Ro
def _build_stream_processor_lambda(self, role: iam.Role, lambda_layer: PythonLayerVersion, dynamodb_table: dynamodb.Table) -> _lambda.Function:
lambda_function = _lambda.Function(
self,
constants.STREAM_PROC_LAMBDA,
id=constants.STREAM_PROCESSOR_LAMBDA,
runtime=_lambda.Runtime.PYTHON_3_11,
code=_lambda.Code.from_asset(constants.BUILD_FOLDER),
handler='product.stream_processor.handlers.stream_handler.handle_events',
handler='product.stream_processor.handlers.process_stream.process_stream',
environment={
constants.POWERTOOLS_SERVICE_NAME: constants.SERVICE_NAME, # for logger, tracer and metrics
constants.POWER_TOOLS_LOG_LEVEL: 'DEBUG', # for logger
},
tracing=_lambda.Tracing.ACTIVE,
retry_attempts=0,
timeout=Duration.seconds(constants.API_HANDLER_LAMBDA_TIMEOUT),
memory_size=constants.API_HANDLER_LAMBDA_MEMORY_SIZE,
timeout=Duration.seconds(constants.STREAM_PROCESSOR_LAMBDA_TIMEOUT),
memory_size=constants.STREAM_PROCESSOR_LAMBDA_MEMORY_SIZE,
layers=[lambda_layer],
role=role,
log_retention=RetentionDays.ONE_DAY,
log_retention=RetentionDays.FIVE_DAYS,
)
# Add DynamoDB Stream as an event source for the Lambda function
lambda_function.add_event_source(DynamoEventSource(dynamodb_table, starting_position=_lambda.StartingPosition.LATEST))
Expand Down
6 changes: 3 additions & 3 deletions infrastructure/product/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import os
import getpass
from pathlib import Path

from git import Repo
Expand All @@ -8,7 +8,7 @@

def get_username() -> str:
try:
return os.getlogin().replace('.', '-')
return getpass.getuser().replace('.', '-')
except Exception:
return 'github'

Expand All @@ -17,7 +17,7 @@ def get_stack_name() -> str:
repo = Repo(Path.cwd())
username = get_username()
try:
branch_name = f'{repo.active_branch}'.replace('/', '-')
branch_name = f'{repo.active_branch}'.replace('/', '-').replace('_', '-')
return f'{username}-{branch_name}-{constants.SERVICE_NAME}'
except TypeError:
# we're running in detached mode (HEAD)
Expand Down
99 changes: 99 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
site_name: Serverless Python Demo OPN305
site_description: Serverless Python Demo for re:Invent OPN305 session
site_author: Ran Isenberg and Heitor Lessa
repo_url: https://github.com/ran-isenberg/serverless-python-demo
edit_uri: edit/main/docs

nav:
- Homepage:
- Decision log: decision_log.md
- API reference:
- Stream Processor: api/stream_processor.md
- Product models: api/product_models.md

theme:
name: material
font:
text: Ubuntu
palette:
- scheme: default
primary: deep purple
toggle:
icon: material/lightbulb
name: Switch to dark mode
- scheme: slate
primary: indigo
accent: teal
toggle:
icon: material/lightbulb-outline
name: Switch to light mode
features:
- header.autohide
- navigation.sections
- navigation.top
- navigation.instant
- navigation.indexes
- navigation.tracking
- navigation.tabs
- content.code.annotate
- content.code.copy
icon:
repo: fontawesome/brands/github

markdown_extensions:
- admonition
- abbr
- pymdownx.tabbed:
alternate_style: true
- pymdownx.highlight:
linenums: true
- pymdownx.details
- pymdownx.snippets:
base_path: "."
check_paths: true
restrict_base_path: false
- meta
- toc:
permalink: true
toc_depth: 4
- attr_list
- pymdownx.emoji:
emoji_index: !!python/name:materialx.emoji.twemoji
emoji_generator: !!python/name:materialx.emoji.to_svg
- pymdownx.inlinehilite
- pymdownx.superfences:
custom_fences:
- name: mermaid
class: mermaid
format: !!python/name:pymdownx.superfences.fence_code_format
- pymdownx.tasklist:
custom_checkbox: true

copyright: Copyright &copy; 2023 Ran Isenberg and Heitor Lessa

plugins:
- search
- mkdocstrings:
handlers:
python:
options:
docstring_style: numpy
docstring_section_style: spacy
show_source: true
heading_level: 4
allow_inspection: true
group_by_category: true
show_category_heading: true
show_bases: true
show_docstring_examples: true
show_if_no_docstring: true
merge_init_into_class: true # avoid Class params + __init__ params
separate_signature: false
show_signature_annotations: false

extra_css:
- stylesheets/extra.css

watch:
- product
- docs
8 changes: 8 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ warn_unused_ignores=True
show_column_numbers = True
show_error_codes = True
show_error_context = True
plugins = pydantic.mypy

[pydantic-mypy]
init_forbid_extra = true
init_typed = true
warn_required_dynamic_aliases = true

# Disable specific error codes in the 'tests' package
[mypy-tests.*]
Expand Down Expand Up @@ -67,3 +72,6 @@ ignore_missing_imports = True

[mypy-setuptools]
ignore_missing_imports = True

[mypy-pytest_socket]
ignore_missing_imports = True
Loading

0 comments on commit 576c669

Please sign in to comment.