Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream_processor): initial version for processing product changes #76

Merged
merged 59 commits into from
Oct 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
59 commits
Select commit Hold shift + click to select a range
ba6c401
chore: support for cdk watch to speedup dev
heitorlessa Sep 29, 2023
01bb800
fix: ensure pytest only searches tests folder
heitorlessa Sep 29, 2023
a452f86
chore: cdk watch for infra too
heitorlessa Sep 29, 2023
57fc38e
chore: use getpass over outdated getlogin
heitorlessa Sep 29, 2023
880834e
chore: allow branches with _
heitorlessa Sep 29, 2023
6d5fa6d
chore: standardize stream resource names
heitorlessa Sep 29, 2023
409b58e
chore: add hello world stream proc lambda
heitorlessa Sep 29, 2023
afbac00
chore: add watch target in makefile
heitorlessa Sep 30, 2023
8d7dc71
chore: make product models independent
heitorlessa Sep 30, 2023
1821ef9
chore: remove stream specific schema for now
heitorlessa Sep 30, 2023
956a2e1
chore(domain): create skeleton to notify updates
heitorlessa Sep 30, 2023
657bdb8
chore: add placeholder code for handler
heitorlessa Sep 30, 2023
a622cf2
chore: add placeholder unit test
heitorlessa Sep 30, 2023
5cb8b49
chore: rename stream processor to align w/ crud
heitorlessa Sep 30, 2023
68788d0
chore: add format-fix, fix yapf errors
heitorlessa Sep 30, 2023
ba92b7f
fix(tests): stack not found when running integ
heitorlessa Sep 30, 2023
e7b59b7
chore: align handler and fn handler name w/ crud
heitorlessa Sep 30, 2023
11ab329
chore: add mypy boto3 events dev dep
heitorlessa Oct 1, 2023
6b85eb7
feat: add initial DAL protocol and eventbridge
heitorlessa Oct 1, 2023
6c746bf
refactor: use status field over change_status since it's a notificati…
heitorlessa Oct 1, 2023
689e09e
refactor: move test doubles and fixtures to conftest
heitorlessa Oct 1, 2023
013b608
chore: test product_notifications
heitorlessa Oct 1, 2023
c340352
chore: add EventReceipt output model
heitorlessa Oct 2, 2023
1dd1fb0
chore: ignore .idea dir
heitorlessa Oct 2, 2023
c38a33c
chore: create and use Event model
heitorlessa Oct 2, 2023
d2bd0b7
chore: use generic container for emit
heitorlessa Oct 2, 2023
fa546f0
chore: fix event serialization; cleanup
heitorlessa Oct 2, 2023
2723fbf
chore: future note for event slicing
heitorlessa Oct 2, 2023
aa75844
chore: disable sockets for unit test
heitorlessa Oct 3, 2023
e9f3aa4
chore: add eventbridge provider test skeleton
heitorlessa Oct 3, 2023
ab590c2
chore: change to ProductChangeNotification
heitorlessa Oct 3, 2023
b90175f
chore: infer event structure from any model
heitorlessa Oct 3, 2023
e3115b2
chore: cleanup
heitorlessa Oct 3, 2023
865fb4b
chore: test event structure and model to event conversions
heitorlessa Oct 3, 2023
903d069
chore: adjust comment on event name
heitorlessa Oct 3, 2023
5ba71d3
chore: complete eventbridge contract tests
heitorlessa Oct 3, 2023
2d327bf
chore: remove dead code
heitorlessa Oct 3, 2023
f209601
chore: chunk maximum allowed events
heitorlessa Oct 4, 2023
6dc4954
chore: test chunk logic separately
heitorlessa Oct 4, 2023
86258eb
chore: linting
heitorlessa Oct 4, 2023
ed3a029
Merge branch 'main' into stream_processor
heitorlessa Oct 4, 2023
bd714c6
refactor: move standalones to functions.py; complete coverage
heitorlessa Oct 4, 2023
c8a5b55
refactor: move standalones to functions.py; complete coverage
heitorlessa Oct 4, 2023
7e219ed
fix(mypy): narrow typing
heitorlessa Oct 4, 2023
0112d0c
chore: enable pydantic plugin for mypy
heitorlessa Oct 4, 2023
24206bd
chore: explicit typed dict type as mypy can't infer
heitorlessa Oct 4, 2023
c6e8c48
chore: explicit type as mypy can't infer
heitorlessa Oct 4, 2023
4a0aaef
chore: actually apply pydantic plugin; skip pytest_socket missing py.…
heitorlessa Oct 4, 2023
cd157e9
chore: make pr fixes
heitorlessa Oct 4, 2023
fd003d7
refactor: rename dal to integrations
heitorlessa Oct 4, 2023
1841f95
docs(domain): add initial docstrings
heitorlessa Oct 4, 2023
9fcf867
chore: add mkdocs and mkdocstrings for documentation
heitorlessa Oct 4, 2023
9c17f07
docs(handlers): add docstring for process_stream
heitorlessa Oct 4, 2023
e1f6476
docs(domain): use markdown to create anchor
heitorlessa Oct 4, 2023
be733bc
docs: increase indentation to improve nav
heitorlessa Oct 4, 2023
6cc898c
docs(handler): add integrations section
heitorlessa Oct 4, 2023
bddc7b4
docs(domain): add integration section
heitorlessa Oct 4, 2023
0d5728b
docs(models): add Pydantic models
heitorlessa Oct 4, 2023
5ce39a3
docs(stream_processor): handlers first
heitorlessa Oct 4, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -252,3 +252,4 @@ lambda_requirements.txt
# Misc

node_modules
.idea
7 changes: 7 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@ format:
poetry run isort .
poetry run yapf -d -vv --style=./.style -r .

format-fix:
poetry run isort .
poetry run yapf -vv --style=./.style -r --in-place .

lint: format
@echo "Running flake8"
poetry run flake8 product/* infrastructure/* tests/* docs/examples/*
Expand Down Expand Up @@ -68,3 +72,6 @@ docs:

lint-docs:
docker run -v ${PWD}:/markdown 06kellyjac/markdownlint-cli --fix "docs"

watch:
npx cdk watch
15 changes: 14 additions & 1 deletion cdk.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,16 @@
{
"app": "poetry run python app.py"
"app": "poetry run python app.py",
"watch": {
"include": [
"product/**",
"infrastructure/product/**"
],
"exclude": [
"product/**/*.pyc",
"product/**/__pycache__",
"infrastructure/**/*.pyc",
"infrastructure/**/__pycache__"
]
},
"build": "make build"
}
7 changes: 7 additions & 0 deletions docs/api/product_models.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
## Product models

::: product.models.products.product

## Validators

::: product.models.products.validators
16 changes: 16 additions & 0 deletions docs/api/stream_processor.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@

## Product notification

### Lambda Handlers

Process stream is connected to Amazon DynamoDB Stream that polls product changes in the product table.

We convert them into `ProductChangeNotification` model depending on the DynamoDB Stream Event Name (e.g., `INSERT` -> `ADDED`).

::: product.stream_processor.handlers.process_stream

### Domain logic

Domain logic to notify product changes, e.g., `ADDED`, `REMOVED`, `UPDATED`.

::: product.stream_processor.domain_logic.product_notification
8 changes: 5 additions & 3 deletions infrastructure/product/constants.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
SERVICE_ROLE_ARN = 'ServiceRoleArn'
STREAM_PROC_SERVICE_ROLE_ARN = 'StreamRoleArn'
LAMBDA_BASIC_EXECUTION_ROLE = 'AWSLambdaBasicExecutionRole'
CREATE_PRODUCT_ROLE = 'ServiceRole'
DELETE_PRODUCT_ROLE = 'DeleteRole'
Expand Down Expand Up @@ -28,8 +27,11 @@
BUILD_FOLDER = '.build/lambdas/'
COMMON_LAYER_BUILD_FOLDER = '.build/common_layer'
CRUD_CONSTRUCT_NAME = 'Crud'
STREAM_PROC_CONSTRUCT_NAME = 'StreamProc'
OWNER_TAG = 'owner'
REST_API_NAME = 'crud-rest-api'
EVENT_BUS_NAME = 'events'
STREAM_PROC_LAMBDA = 'Stream'
STREAM_PROCESSOR_CONSTRUCT_NAME = 'StreamProc'
STREAM_PROCESSOR_LAMBDA = 'StreamProcessor'
STREAM_PROCESSOR_LAMBDA_MEMORY_SIZE = 128 # MB
STREAM_PROCESSOR_LAMBDA_TIMEOUT = 120 # seconds
STREAM_PROCESSOR_LAMBDA_SERVICE_ROLE_ARN = 'StreamRoleArn'
2 changes: 1 addition & 1 deletion infrastructure/product/service_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def __init__(self, scope: Construct, id: str, **kwargs) -> None:

self.stream_processor = StreamProcessorConstruct(
self,
id_=get_construct_name(id, constants.STREAM_PROC_CONSTRUCT_NAME),
id_=constants.STREAM_PROCESSOR_CONSTRUCT_NAME,
lambda_layer=self.shared_layer,
dynamodb_table=self.api.api_db.db,
)
Expand Down
14 changes: 7 additions & 7 deletions infrastructure/product/stream_processor_construct.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,14 @@ def __init__(self, scope: Construct, id_: str, lambda_layer: PythonLayerVersion,
super().__init__(scope, id_)
bus_name = f'{id_}{constants.EVENT_BUS_NAME}'
self.event_bus = events.EventBus(self, bus_name, event_bus_name=bus_name)
self.role = self._build_lambda_role(dynamodb_table, self.event_bus)
self.role = self._build_lambda_role(db=dynamodb_table, bus=self.event_bus)

self.lambda_function = self._build_stream_processor_lambda(self.role, lambda_layer, dynamodb_table)

def _build_lambda_role(self, db: dynamodb.Table, bus: events.EventBus) -> iam.Role:
return iam.Role(
self,
constants.STREAM_PROC_SERVICE_ROLE_ARN,
id=constants.STREAM_PROCESSOR_LAMBDA_SERVICE_ROLE_ARN,
assumed_by=iam.ServicePrincipal('lambda.amazonaws.com'),
inline_policies={
'streams':
Expand All @@ -51,21 +51,21 @@ def _build_lambda_role(self, db: dynamodb.Table, bus: events.EventBus) -> iam.Ro
def _build_stream_processor_lambda(self, role: iam.Role, lambda_layer: PythonLayerVersion, dynamodb_table: dynamodb.Table) -> _lambda.Function:
lambda_function = _lambda.Function(
self,
constants.STREAM_PROC_LAMBDA,
id=constants.STREAM_PROCESSOR_LAMBDA,
runtime=_lambda.Runtime.PYTHON_3_11,
code=_lambda.Code.from_asset(constants.BUILD_FOLDER),
handler='product.stream_processor.handlers.stream_handler.handle_events',
handler='product.stream_processor.handlers.process_stream.process_stream',
environment={
constants.POWERTOOLS_SERVICE_NAME: constants.SERVICE_NAME, # for logger, tracer and metrics
constants.POWER_TOOLS_LOG_LEVEL: 'DEBUG', # for logger
},
tracing=_lambda.Tracing.ACTIVE,
retry_attempts=0,
timeout=Duration.seconds(constants.API_HANDLER_LAMBDA_TIMEOUT),
memory_size=constants.API_HANDLER_LAMBDA_MEMORY_SIZE,
timeout=Duration.seconds(constants.STREAM_PROCESSOR_LAMBDA_TIMEOUT),
memory_size=constants.STREAM_PROCESSOR_LAMBDA_MEMORY_SIZE,
layers=[lambda_layer],
role=role,
log_retention=RetentionDays.ONE_DAY,
log_retention=RetentionDays.FIVE_DAYS,
)
# Add DynamoDB Stream as an event source for the Lambda function
lambda_function.add_event_source(DynamoEventSource(dynamodb_table, starting_position=_lambda.StartingPosition.LATEST))
Expand Down
6 changes: 3 additions & 3 deletions infrastructure/product/utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import os
import getpass
from pathlib import Path

from git import Repo
Expand All @@ -8,7 +8,7 @@

def get_username() -> str:
try:
return os.getlogin().replace('.', '-')
return getpass.getuser().replace('.', '-')
except Exception:
return 'github'

Expand All @@ -17,7 +17,7 @@ def get_stack_name() -> str:
repo = Repo(Path.cwd())
username = get_username()
try:
branch_name = f'{repo.active_branch}'.replace('/', '-')
branch_name = f'{repo.active_branch}'.replace('/', '-').replace('_', '-')
return f'{username}-{branch_name}-{constants.SERVICE_NAME}'
except TypeError:
# we're running in detached mode (HEAD)
Expand Down
99 changes: 99 additions & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
site_name: Serverless Python Demo OPN305
site_description: Serverless Python Demo for re:Invent OPN305 session
site_author: Ran Isenberg and Heitor Lessa
repo_url: https://github.com/ran-isenberg/serverless-python-demo
edit_uri: edit/main/docs

nav:
- Homepage:
- Decision log: decision_log.md
- API reference:
- Stream Processor: api/stream_processor.md
- Product models: api/product_models.md

theme:
name: material
font:
text: Ubuntu
palette:
- scheme: default
primary: deep purple
toggle:
icon: material/lightbulb
name: Switch to dark mode
- scheme: slate
primary: indigo
accent: teal
toggle:
icon: material/lightbulb-outline
name: Switch to light mode
features:
- header.autohide
- navigation.sections
- navigation.top
- navigation.instant
- navigation.indexes
- navigation.tracking
- navigation.tabs
- content.code.annotate
- content.code.copy
icon:
repo: fontawesome/brands/github

markdown_extensions:
- admonition
- abbr
- pymdownx.tabbed:
alternate_style: true
- pymdownx.highlight:
linenums: true
- pymdownx.details
- pymdownx.snippets:
base_path: "."
check_paths: true
restrict_base_path: false
- meta
- toc:
permalink: true
toc_depth: 4
- attr_list
- pymdownx.emoji:
emoji_index: !!python/name:materialx.emoji.twemoji
emoji_generator: !!python/name:materialx.emoji.to_svg
- pymdownx.inlinehilite
- pymdownx.superfences:
custom_fences:
- name: mermaid
class: mermaid
format: !!python/name:pymdownx.superfences.fence_code_format
- pymdownx.tasklist:
custom_checkbox: true

copyright: Copyright © 2023 Ran Isenberg and Heitor Lessa

plugins:
- search
- mkdocstrings:
handlers:
python:
options:
docstring_style: numpy
docstring_section_style: spacy
show_source: true
heading_level: 4
allow_inspection: true
group_by_category: true
show_category_heading: true
show_bases: true
show_docstring_examples: true
show_if_no_docstring: true
merge_init_into_class: true # avoid Class params + __init__ params
separate_signature: false
show_signature_annotations: false

extra_css:
- stylesheets/extra.css

watch:
- product
- docs
8 changes: 8 additions & 0 deletions mypy.ini
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@ warn_unused_ignores=True
show_column_numbers = True
show_error_codes = True
show_error_context = True
plugins = pydantic.mypy

[pydantic-mypy]
init_forbid_extra = true
init_typed = true
warn_required_dynamic_aliases = true

# Disable specific error codes in the 'tests' package
[mypy-tests.*]
Expand Down Expand Up @@ -67,3 +72,6 @@ ignore_missing_imports = True

[mypy-setuptools]
ignore_missing_imports = True

[mypy-pytest_socket]
ignore_missing_imports = True
Loading
Loading