From 576c6698454f9b0b8e9c870a971bd33bd35cad08 Mon Sep 17 00:00:00 2001 From: Heitor Lessa Date: Wed, 4 Oct 2023 18:46:44 +0200 Subject: [PATCH] feat(stream_processor): initial version for processing product changes (#76) * chore: support for cdk watch to speedup dev Signed-off-by: heitorlessa * fix: ensure pytest only searches tests folder Signed-off-by: heitorlessa * chore: cdk watch for infra too Signed-off-by: heitorlessa * chore: use getpass over outdated getlogin Signed-off-by: heitorlessa * chore: allow branches with _ Signed-off-by: heitorlessa * chore: standardize stream resource names Signed-off-by: heitorlessa * chore: add hello world stream proc lambda Signed-off-by: heitorlessa * chore: add watch target in makefile Signed-off-by: heitorlessa * chore: make product models independent Signed-off-by: heitorlessa * chore: remove stream specific schema for now Signed-off-by: heitorlessa * chore(domain): create skeleton to notify updates Signed-off-by: heitorlessa * chore: add placeholder code for handler Signed-off-by: heitorlessa * chore: add placeholder unit test Signed-off-by: heitorlessa * chore: rename stream processor to align w/ crud Signed-off-by: heitorlessa * chore: add format-fix, fix yapf errors Signed-off-by: heitorlessa * fix(tests): stack not found when running integ Signed-off-by: heitorlessa * chore: align handler and fn handler name w/ crud Signed-off-by: heitorlessa * chore: add mypy boto3 events dev dep Signed-off-by: heitorlessa * feat: add initial DAL protocol and eventbridge Signed-off-by: heitorlessa * refactor: use status field over change_status since it's a notification already Signed-off-by: heitorlessa * refactor: move test doubles and fixtures to conftest Signed-off-by: heitorlessa * chore: test product_notifications Signed-off-by: heitorlessa * chore: add EventReceipt output model * chore: ignore .idea dir * chore: create and use Event model * chore: use generic container for emit Signed-off-by: heitorlessa * chore: fix event serialization; cleanup Signed-off-by: heitorlessa * chore: future note for event slicing Signed-off-by: heitorlessa * chore: disable sockets for unit test Signed-off-by: heitorlessa * chore: add eventbridge provider test skeleton Signed-off-by: heitorlessa * chore: change to ProductChangeNotification Signed-off-by: heitorlessa * chore: infer event structure from any model Signed-off-by: heitorlessa * chore: cleanup Signed-off-by: heitorlessa * chore: test event structure and model to event conversions Signed-off-by: heitorlessa * chore: adjust comment on event name Signed-off-by: heitorlessa * chore: complete eventbridge contract tests Signed-off-by: heitorlessa * chore: remove dead code Signed-off-by: heitorlessa * chore: chunk maximum allowed events Signed-off-by: heitorlessa * chore: test chunk logic separately Signed-off-by: heitorlessa * chore: linting Signed-off-by: heitorlessa * refactor: move standalones to functions.py; complete coverage Signed-off-by: heitorlessa * refactor: move standalones to functions.py; complete coverage Signed-off-by: heitorlessa * fix(mypy): narrow typing Signed-off-by: heitorlessa * chore: enable pydantic plugin for mypy Signed-off-by: heitorlessa * chore: explicit typed dict type as mypy can't infer Signed-off-by: heitorlessa * chore: explicit type as mypy can't infer Signed-off-by: heitorlessa * chore: actually apply pydantic plugin; skip pytest_socket missing py.typed Signed-off-by: heitorlessa * chore: make pr fixes Signed-off-by: heitorlessa * refactor: rename dal to integrations Signed-off-by: heitorlessa * docs(domain): add initial docstrings Signed-off-by: heitorlessa * chore: add mkdocs and mkdocstrings for documentation Signed-off-by: heitorlessa * docs(handlers): add docstring for process_stream Signed-off-by: heitorlessa * docs(domain): use markdown to create anchor Signed-off-by: heitorlessa * docs: increase indentation to improve nav Signed-off-by: heitorlessa * docs(handler): add integrations section Signed-off-by: heitorlessa * docs(domain): add integration section Signed-off-by: heitorlessa * docs(models): add Pydantic models Signed-off-by: heitorlessa * docs(stream_processor): handlers first Signed-off-by: heitorlessa --------- Signed-off-by: heitorlessa --- .gitignore | 1 + Makefile | 7 + cdk.json | 15 +- docs/api/product_models.md | 7 + docs/api/stream_processor.md | 16 ++ infrastructure/product/constants.py | 8 +- infrastructure/product/service_stack.py | 2 +- .../product/stream_processor_construct.py | 14 +- infrastructure/product/utils.py | 6 +- mkdocs.yml | 99 +++++++++ mypy.ini | 8 + poetry.lock | 100 ++++++++- product/constants.py | 1 + .../dal => models}/__init__.py | 0 .../schemas => models/products}/__init__.py | 0 product/models/products/product.py | 23 ++ product/models/products/validators.py | 26 +++ .../domain_logic/product_notification.py | 52 +++++ .../handlers/process_stream.py | 74 +++++++ .../{schemas => integrations}/__init__.py | 0 .../events/__init__.py} | 0 .../integrations/events/base.py | 32 +++ .../integrations/events/constants.py | 2 + .../integrations/events/event_handler.py | 16 ++ .../integrations/events/exceptions.py | 9 + .../integrations/events/functions.py | 44 ++++ .../events/models/__init__.py} | 0 .../integrations/events/models/input.py | 21 ++ .../integrations/events/models/output.py | 16 ++ .../events/providers/__init__.py} | 0 .../events/providers/eventbridge.py | 87 ++++++++ pyproject.toml | 9 + tests/integration/conftest.py | 2 +- tests/unit/stream_processor/__init__.py | 0 tests/unit/stream_processor/conftest.py | 52 +++++ tests/unit/stream_processor/data_builder.py | 94 ++++++++ .../test_eventbridge_provider.py | 208 ++++++++++++++++++ tests/unit/stream_processor/test_events.py | 80 +++++++ tests/unit/stream_processor/test_functions.py | 91 ++++++++ .../test_process_stream_handler.py | 31 +++ .../test_product_notification.py | 16 ++ tests/utils.py | 3 + 42 files changed, 1254 insertions(+), 18 deletions(-) create mode 100644 docs/api/product_models.md create mode 100644 docs/api/stream_processor.md create mode 100644 mkdocs.yml create mode 100644 product/constants.py rename product/{stream_processor/dal => models}/__init__.py (100%) rename product/{stream_processor/dal/schemas => models/products}/__init__.py (100%) create mode 100644 product/models/products/product.py create mode 100644 product/models/products/validators.py create mode 100644 product/stream_processor/domain_logic/product_notification.py create mode 100644 product/stream_processor/handlers/process_stream.py rename product/stream_processor/{schemas => integrations}/__init__.py (100%) rename product/stream_processor/{handlers/stream_handler.py => integrations/events/__init__.py} (100%) create mode 100644 product/stream_processor/integrations/events/base.py create mode 100644 product/stream_processor/integrations/events/constants.py create mode 100644 product/stream_processor/integrations/events/event_handler.py create mode 100644 product/stream_processor/integrations/events/exceptions.py create mode 100644 product/stream_processor/integrations/events/functions.py rename product/stream_processor/{schemas/input.py => integrations/events/models/__init__.py} (100%) create mode 100644 product/stream_processor/integrations/events/models/input.py create mode 100644 product/stream_processor/integrations/events/models/output.py rename product/stream_processor/{schemas/output.py => integrations/events/providers/__init__.py} (100%) create mode 100644 product/stream_processor/integrations/events/providers/eventbridge.py create mode 100644 tests/unit/stream_processor/__init__.py create mode 100644 tests/unit/stream_processor/conftest.py create mode 100644 tests/unit/stream_processor/data_builder.py create mode 100644 tests/unit/stream_processor/test_eventbridge_provider.py create mode 100644 tests/unit/stream_processor/test_events.py create mode 100644 tests/unit/stream_processor/test_functions.py create mode 100644 tests/unit/stream_processor/test_process_stream_handler.py create mode 100644 tests/unit/stream_processor/test_product_notification.py diff --git a/.gitignore b/.gitignore index 5b8a414..66493f6 100644 --- a/.gitignore +++ b/.gitignore @@ -252,3 +252,4 @@ lambda_requirements.txt # Misc node_modules +.idea diff --git a/Makefile b/Makefile index 9c8c43e..e9c14ab 100644 --- a/Makefile +++ b/Makefile @@ -14,6 +14,10 @@ format: poetry run isort . poetry run yapf -d -vv --style=./.style -r . +format-fix: + poetry run isort . + poetry run yapf -vv --style=./.style -r --in-place . + lint: format @echo "Running flake8" poetry run flake8 product/* infrastructure/* tests/* docs/examples/* @@ -68,3 +72,6 @@ docs: lint-docs: docker run -v ${PWD}:/markdown 06kellyjac/markdownlint-cli --fix "docs" + +watch: + npx cdk watch diff --git a/cdk.json b/cdk.json index 9f78038..681e70a 100644 --- a/cdk.json +++ b/cdk.json @@ -1,3 +1,16 @@ { - "app": "poetry run python app.py" + "app": "poetry run python app.py", + "watch": { + "include": [ + "product/**", + "infrastructure/product/**" + ], + "exclude": [ + "product/**/*.pyc", + "product/**/__pycache__", + "infrastructure/**/*.pyc", + "infrastructure/**/__pycache__" + ] + }, + "build": "make build" } \ No newline at end of file diff --git a/docs/api/product_models.md b/docs/api/product_models.md new file mode 100644 index 0000000..a2b7194 --- /dev/null +++ b/docs/api/product_models.md @@ -0,0 +1,7 @@ +## Product models + +::: product.models.products.product + +## Validators + +::: product.models.products.validators diff --git a/docs/api/stream_processor.md b/docs/api/stream_processor.md new file mode 100644 index 0000000..8f4010e --- /dev/null +++ b/docs/api/stream_processor.md @@ -0,0 +1,16 @@ + +## Product notification + +### Lambda Handlers + +Process stream is connected to Amazon DynamoDB Stream that polls product changes in the product table. + +We convert them into `ProductChangeNotification` model depending on the DynamoDB Stream Event Name (e.g., `INSERT` -> `ADDED`). + +::: product.stream_processor.handlers.process_stream + +### Domain logic + +Domain logic to notify product changes, e.g., `ADDED`, `REMOVED`, `UPDATED`. + +::: product.stream_processor.domain_logic.product_notification diff --git a/infrastructure/product/constants.py b/infrastructure/product/constants.py index b37ac06..bc1a8bf 100644 --- a/infrastructure/product/constants.py +++ b/infrastructure/product/constants.py @@ -1,5 +1,4 @@ SERVICE_ROLE_ARN = 'ServiceRoleArn' -STREAM_PROC_SERVICE_ROLE_ARN = 'StreamRoleArn' LAMBDA_BASIC_EXECUTION_ROLE = 'AWSLambdaBasicExecutionRole' CREATE_PRODUCT_ROLE = 'ServiceRole' DELETE_PRODUCT_ROLE = 'DeleteRole' @@ -28,8 +27,11 @@ BUILD_FOLDER = '.build/lambdas/' COMMON_LAYER_BUILD_FOLDER = '.build/common_layer' CRUD_CONSTRUCT_NAME = 'Crud' -STREAM_PROC_CONSTRUCT_NAME = 'StreamProc' OWNER_TAG = 'owner' REST_API_NAME = 'crud-rest-api' EVENT_BUS_NAME = 'events' -STREAM_PROC_LAMBDA = 'Stream' +STREAM_PROCESSOR_CONSTRUCT_NAME = 'StreamProc' +STREAM_PROCESSOR_LAMBDA = 'StreamProcessor' +STREAM_PROCESSOR_LAMBDA_MEMORY_SIZE = 128 # MB +STREAM_PROCESSOR_LAMBDA_TIMEOUT = 120 # seconds +STREAM_PROCESSOR_LAMBDA_SERVICE_ROLE_ARN = 'StreamRoleArn' diff --git a/infrastructure/product/service_stack.py b/infrastructure/product/service_stack.py index ee23fb3..c966531 100644 --- a/infrastructure/product/service_stack.py +++ b/infrastructure/product/service_stack.py @@ -25,7 +25,7 @@ def __init__(self, scope: Construct, id: str, **kwargs) -> None: self.stream_processor = StreamProcessorConstruct( self, - id_=get_construct_name(id, constants.STREAM_PROC_CONSTRUCT_NAME), + id_=constants.STREAM_PROCESSOR_CONSTRUCT_NAME, lambda_layer=self.shared_layer, dynamodb_table=self.api.api_db.db, ) diff --git a/infrastructure/product/stream_processor_construct.py b/infrastructure/product/stream_processor_construct.py index 0200375..452bbd3 100644 --- a/infrastructure/product/stream_processor_construct.py +++ b/infrastructure/product/stream_processor_construct.py @@ -17,14 +17,14 @@ def __init__(self, scope: Construct, id_: str, lambda_layer: PythonLayerVersion, super().__init__(scope, id_) bus_name = f'{id_}{constants.EVENT_BUS_NAME}' self.event_bus = events.EventBus(self, bus_name, event_bus_name=bus_name) - self.role = self._build_lambda_role(dynamodb_table, self.event_bus) + self.role = self._build_lambda_role(db=dynamodb_table, bus=self.event_bus) self.lambda_function = self._build_stream_processor_lambda(self.role, lambda_layer, dynamodb_table) def _build_lambda_role(self, db: dynamodb.Table, bus: events.EventBus) -> iam.Role: return iam.Role( self, - constants.STREAM_PROC_SERVICE_ROLE_ARN, + id=constants.STREAM_PROCESSOR_LAMBDA_SERVICE_ROLE_ARN, assumed_by=iam.ServicePrincipal('lambda.amazonaws.com'), inline_policies={ 'streams': @@ -51,21 +51,21 @@ def _build_lambda_role(self, db: dynamodb.Table, bus: events.EventBus) -> iam.Ro def _build_stream_processor_lambda(self, role: iam.Role, lambda_layer: PythonLayerVersion, dynamodb_table: dynamodb.Table) -> _lambda.Function: lambda_function = _lambda.Function( self, - constants.STREAM_PROC_LAMBDA, + id=constants.STREAM_PROCESSOR_LAMBDA, runtime=_lambda.Runtime.PYTHON_3_11, code=_lambda.Code.from_asset(constants.BUILD_FOLDER), - handler='product.stream_processor.handlers.stream_handler.handle_events', + handler='product.stream_processor.handlers.process_stream.process_stream', environment={ constants.POWERTOOLS_SERVICE_NAME: constants.SERVICE_NAME, # for logger, tracer and metrics constants.POWER_TOOLS_LOG_LEVEL: 'DEBUG', # for logger }, tracing=_lambda.Tracing.ACTIVE, retry_attempts=0, - timeout=Duration.seconds(constants.API_HANDLER_LAMBDA_TIMEOUT), - memory_size=constants.API_HANDLER_LAMBDA_MEMORY_SIZE, + timeout=Duration.seconds(constants.STREAM_PROCESSOR_LAMBDA_TIMEOUT), + memory_size=constants.STREAM_PROCESSOR_LAMBDA_MEMORY_SIZE, layers=[lambda_layer], role=role, - log_retention=RetentionDays.ONE_DAY, + log_retention=RetentionDays.FIVE_DAYS, ) # Add DynamoDB Stream as an event source for the Lambda function lambda_function.add_event_source(DynamoEventSource(dynamodb_table, starting_position=_lambda.StartingPosition.LATEST)) diff --git a/infrastructure/product/utils.py b/infrastructure/product/utils.py index 0b2f13b..9af7902 100644 --- a/infrastructure/product/utils.py +++ b/infrastructure/product/utils.py @@ -1,4 +1,4 @@ -import os +import getpass from pathlib import Path from git import Repo @@ -8,7 +8,7 @@ def get_username() -> str: try: - return os.getlogin().replace('.', '-') + return getpass.getuser().replace('.', '-') except Exception: return 'github' @@ -17,7 +17,7 @@ def get_stack_name() -> str: repo = Repo(Path.cwd()) username = get_username() try: - branch_name = f'{repo.active_branch}'.replace('/', '-') + branch_name = f'{repo.active_branch}'.replace('/', '-').replace('_', '-') return f'{username}-{branch_name}-{constants.SERVICE_NAME}' except TypeError: # we're running in detached mode (HEAD) diff --git a/mkdocs.yml b/mkdocs.yml new file mode 100644 index 0000000..7e5f85e --- /dev/null +++ b/mkdocs.yml @@ -0,0 +1,99 @@ +site_name: Serverless Python Demo OPN305 +site_description: Serverless Python Demo for re:Invent OPN305 session +site_author: Ran Isenberg and Heitor Lessa +repo_url: https://github.com/ran-isenberg/serverless-python-demo +edit_uri: edit/main/docs + +nav: + - Homepage: + - Decision log: decision_log.md + - API reference: + - Stream Processor: api/stream_processor.md + - Product models: api/product_models.md + +theme: + name: material + font: + text: Ubuntu + palette: + - scheme: default + primary: deep purple + toggle: + icon: material/lightbulb + name: Switch to dark mode + - scheme: slate + primary: indigo + accent: teal + toggle: + icon: material/lightbulb-outline + name: Switch to light mode + features: + - header.autohide + - navigation.sections + - navigation.top + - navigation.instant + - navigation.indexes + - navigation.tracking + - navigation.tabs + - content.code.annotate + - content.code.copy + icon: + repo: fontawesome/brands/github + +markdown_extensions: + - admonition + - abbr + - pymdownx.tabbed: + alternate_style: true + - pymdownx.highlight: + linenums: true + - pymdownx.details + - pymdownx.snippets: + base_path: "." + check_paths: true + restrict_base_path: false + - meta + - toc: + permalink: true + toc_depth: 4 + - attr_list + - pymdownx.emoji: + emoji_index: !!python/name:materialx.emoji.twemoji + emoji_generator: !!python/name:materialx.emoji.to_svg + - pymdownx.inlinehilite + - pymdownx.superfences: + custom_fences: + - name: mermaid + class: mermaid + format: !!python/name:pymdownx.superfences.fence_code_format + - pymdownx.tasklist: + custom_checkbox: true + +copyright: Copyright © 2023 Ran Isenberg and Heitor Lessa + +plugins: + - search + - mkdocstrings: + handlers: + python: + options: + docstring_style: numpy + docstring_section_style: spacy + show_source: true + heading_level: 4 + allow_inspection: true + group_by_category: true + show_category_heading: true + show_bases: true + show_docstring_examples: true + show_if_no_docstring: true + merge_init_into_class: true # avoid Class params + __init__ params + separate_signature: false + show_signature_annotations: false + +extra_css: + - stylesheets/extra.css + +watch: + - product + - docs diff --git a/mypy.ini b/mypy.ini index b8a390f..a85c9e7 100644 --- a/mypy.ini +++ b/mypy.ini @@ -7,7 +7,12 @@ warn_unused_ignores=True show_column_numbers = True show_error_codes = True show_error_context = True +plugins = pydantic.mypy +[pydantic-mypy] +init_forbid_extra = true +init_typed = true +warn_required_dynamic_aliases = true # Disable specific error codes in the 'tests' package [mypy-tests.*] @@ -67,3 +72,6 @@ ignore_missing_imports = True [mypy-setuptools] ignore_missing_imports = True + +[mypy-pytest_socket] +ignore_missing_imports = True diff --git a/poetry.lock b/poetry.lock index 75d843c..b412084 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.6.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.5.1 and should not be changed by hand. [[package]] name = "annotated-types" @@ -604,6 +604,20 @@ gitdb = ">=4.0.1,<5" [package.extras] test = ["black", "coverage[toml]", "ddt (>=1.1.1,!=1.4.3)", "mypy", "pre-commit", "pytest", "pytest-cov", "pytest-sugar"] +[[package]] +name = "griffe" +version = "0.36.4" +description = "Signatures for entire Python programs. Extract the structure, the frame, the skeleton of your project, to generate API documentation or find breaking changes in your API." +optional = false +python-versions = ">=3.8" +files = [ + {file = "griffe-0.36.4-py3-none-any.whl", hash = "sha256:4e37a723891fa774fafdd67240571801a1d90d0236562c178707e5c37fb3ebe2"}, + {file = "griffe-0.36.4.tar.gz", hash = "sha256:7b5968f5cc6446637ed0d3ded9de07d6a928f10ccb24116b1dd843635bf1994a"}, +] + +[package.dependencies] +colorama = ">=0.4" + [[package]] name = "identify" version = "2.5.29" @@ -895,6 +909,21 @@ watchdog = ">=2.0" i18n = ["babel (>=2.9.0)"] min-versions = ["babel (==2.9.0)", "click (==7.0)", "colorama (==0.4)", "ghp-import (==1.0)", "importlib-metadata (==4.3)", "jinja2 (==2.11.1)", "markdown (==3.2.1)", "markupsafe (==2.0.1)", "mergedeep (==1.3.4)", "packaging (==20.5)", "pathspec (==0.11.1)", "platformdirs (==2.2.0)", "pyyaml (==5.1)", "pyyaml-env-tag (==0.1)", "typing-extensions (==3.10)", "watchdog (==2.0)"] +[[package]] +name = "mkdocs-autorefs" +version = "0.5.0" +description = "Automatically link across pages in MkDocs." +optional = false +python-versions = ">=3.8" +files = [ + {file = "mkdocs_autorefs-0.5.0-py3-none-any.whl", hash = "sha256:7930fcb8ac1249f10e683967aeaddc0af49d90702af111a5e390e8b20b3d97ff"}, + {file = "mkdocs_autorefs-0.5.0.tar.gz", hash = "sha256:9a5054a94c08d28855cfab967ada10ed5be76e2bfad642302a610b252c3274c0"}, +] + +[package.dependencies] +Markdown = ">=3.3" +mkdocs = ">=1.1" + [[package]] name = "mkdocs-git-revision-date-plugin" version = "0.3.2" @@ -950,6 +979,45 @@ files = [ {file = "mkdocs_material_extensions-1.2.tar.gz", hash = "sha256:27e2d1ed2d031426a6e10d5ea06989d67e90bb02acd588bc5673106b5ee5eedf"}, ] +[[package]] +name = "mkdocstrings" +version = "0.23.0" +description = "Automatic documentation from sources, for MkDocs." +optional = false +python-versions = ">=3.8" +files = [ + {file = "mkdocstrings-0.23.0-py3-none-any.whl", hash = "sha256:051fa4014dfcd9ed90254ae91de2dbb4f24e166347dae7be9a997fe16316c65e"}, + {file = "mkdocstrings-0.23.0.tar.gz", hash = "sha256:d9c6a37ffbe7c14a7a54ef1258c70b8d394e6a33a1c80832bce40b9567138d1c"}, +] + +[package.dependencies] +Jinja2 = ">=2.11.1" +Markdown = ">=3.3" +MarkupSafe = ">=1.1" +mkdocs = ">=1.2" +mkdocs-autorefs = ">=0.3.1" +pymdown-extensions = ">=6.3" + +[package.extras] +crystal = ["mkdocstrings-crystal (>=0.3.4)"] +python = ["mkdocstrings-python (>=0.5.2)"] +python-legacy = ["mkdocstrings-python-legacy (>=0.2.1)"] + +[[package]] +name = "mkdocstrings-python" +version = "1.7.1" +description = "A Python handler for mkdocstrings." +optional = false +python-versions = ">=3.8" +files = [ + {file = "mkdocstrings_python-1.7.1-py3-none-any.whl", hash = "sha256:cb1651fba8423324b861fe38ce881cf56f30738770a2119f007a0a4ffcb00777"}, + {file = "mkdocstrings_python-1.7.1.tar.gz", hash = "sha256:90d838dc7861674794e3ca79f64c23c5d8fa76b9aa29db834b246771964c0881"}, +] + +[package.dependencies] +griffe = ">=0.35" +mkdocstrings = ">=0.20" + [[package]] name = "mypy" version = "1.5.1" @@ -1009,6 +1077,20 @@ files = [ [package.dependencies] typing-extensions = {version = ">=4.1.0", markers = "python_version < \"3.12\""} +[[package]] +name = "mypy-boto3-events" +version = "1.28.46" +description = "Type annotations for boto3.EventBridge 1.28.46 service generated with mypy-boto3-builder 7.19.0" +optional = false +python-versions = ">=3.7" +files = [ + {file = "mypy-boto3-events-1.28.46.tar.gz", hash = "sha256:fdae2b51c7c13d0045c6a0a7c2ddb735e67ae79077084a28f922870593091ad1"}, + {file = "mypy_boto3_events-1.28.46-py3-none-any.whl", hash = "sha256:1220289549bd3b24a37561c5dbf6f20a9d97f66748b15dbfe9e218cc52cce246"}, +] + +[package.dependencies] +typing-extensions = {version = ">=4.1.0", markers = "python_version < \"3.12\""} + [[package]] name = "mypy-extensions" version = "1.0.0" @@ -1408,6 +1490,20 @@ pytest = ">=5.0" [package.extras] dev = ["pre-commit", "pytest-asyncio", "tox"] +[[package]] +name = "pytest-socket" +version = "0.6.0" +description = "Pytest Plugin to disable socket calls during tests" +optional = false +python-versions = ">=3.7,<4.0" +files = [ + {file = "pytest_socket-0.6.0-py3-none-any.whl", hash = "sha256:cca72f134ff01e0023c402e78d31b32e68da3efdf3493bf7788f8eba86a6824c"}, + {file = "pytest_socket-0.6.0.tar.gz", hash = "sha256:363c1d67228315d4fc7912f1aabfd570de29d0e3db6217d61db5728adacd7138"}, +] + +[package.dependencies] +pytest = ">=3.6.3" + [[package]] name = "python-dateutil" version = "2.8.2" @@ -1983,4 +2079,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = "^3.11.0" -content-hash = "3bbff053d4521f4e31c16a5dbb26c9b3872feeec0f2000c24d58b82ad7de025f" +content-hash = "0f1dc94f77a850ad98d56e2188e26dc8e2837e9b1a856e1b6dd796b07dc2663d" diff --git a/product/constants.py b/product/constants.py new file mode 100644 index 0000000..874f653 --- /dev/null +++ b/product/constants.py @@ -0,0 +1 @@ +XRAY_TRACE_ID_ENV: str = '_X_AMZN_TRACE_ID' diff --git a/product/stream_processor/dal/__init__.py b/product/models/__init__.py similarity index 100% rename from product/stream_processor/dal/__init__.py rename to product/models/__init__.py diff --git a/product/stream_processor/dal/schemas/__init__.py b/product/models/products/__init__.py similarity index 100% rename from product/stream_processor/dal/schemas/__init__.py rename to product/models/products/__init__.py diff --git a/product/models/products/product.py b/product/models/products/product.py new file mode 100644 index 0000000..6cca04a --- /dev/null +++ b/product/models/products/product.py @@ -0,0 +1,23 @@ +from datetime import datetime +from typing import Annotated, Literal + +from pydantic import BaseModel, Field, PositiveInt +from pydantic.functional_validators import AfterValidator + +from product.models.products.validators import validate_product_id + +ProductId = Annotated[str, Field(min_length=36, max_length=36), AfterValidator(validate_product_id)] + + +class Product(BaseModel): + name: Annotated[str, Field(min_length=1, max_length=30)] + id: ProductId + price: PositiveInt + + +class ProductChangeNotification(BaseModel): + product_id: ProductId + status: Literal['ADDED', 'REMOVED', 'UPDATED'] + created_at: datetime = Field(default_factory=datetime.utcnow) + + __version__: str = 'v1' diff --git a/product/models/products/validators.py b/product/models/products/validators.py new file mode 100644 index 0000000..6130f0f --- /dev/null +++ b/product/models/products/validators.py @@ -0,0 +1,26 @@ +from uuid import UUID + + +def validate_product_id(product_id: str) -> str: + """Validates Product IDs are valid UUIDs + + Parameters + ---------- + product_id : str + Product ID as a string + + Returns + ------- + str + Validated product ID value + + Raises + ------ + ValueError + When a product ID doesn't conform with the UUID spec. + """ + try: + UUID(product_id, version=4) + except Exception as exc: + raise ValueError(str(exc)) from exc + return product_id diff --git a/product/stream_processor/domain_logic/product_notification.py b/product/stream_processor/domain_logic/product_notification.py new file mode 100644 index 0000000..7306003 --- /dev/null +++ b/product/stream_processor/domain_logic/product_notification.py @@ -0,0 +1,52 @@ +import os + +from product.models.products.product import ProductChangeNotification +from product.stream_processor.integrations.events.event_handler import ProductChangeNotificationHandler +from product.stream_processor.integrations.events.models.output import EventReceipt +from product.stream_processor.integrations.events.providers.eventbridge import EventBridge + +EVENT_BUS = os.environ.get('EVENT_BUS', '') +EVENT_SOURCE = 'myorg.product.product_notification' + + +def notify_product_updates(update: list[ProductChangeNotification], event_handler: ProductChangeNotificationHandler | None = None) -> EventReceipt: + """Notify product change notifications using default or provided event handler. + + Parameters + ---------- + update : list[ProductChangeNotification] + List of product change notifications to notify. + event_handler : ProductChangeNotificationHandler | None, optional + Event handler to use for notification, by default ProductChangeNotificationHandler + + Environment variables + --------------------- + `EVENT_BUS` : Event Bus to notify product change notifications + + # Examples + + Sending a newly added product notification + + ```python + from product.stream_processor.domain_logic.product_notification import notify_product_updates + + notification = ProductChangeNotification(product_id=product_id, status="ADDED") + receipt = notify_product_updates(update=[notification]) + ``` + + Integrations + ------------ + + # Events + + * `ProductChangeNotificationHandler` uses `EventBridge` provider to convert and publish `ProductChangeNotification` models into events. + + Returns + ------- + EventReceipt + Receipts for unsuccessfully and successfully published events. + """ + if event_handler is None: + event_handler = ProductChangeNotificationHandler(provider=EventBridge(EVENT_BUS), event_source=EVENT_SOURCE) + + return event_handler.emit(payload=update) diff --git a/product/stream_processor/handlers/process_stream.py b/product/stream_processor/handlers/process_stream.py new file mode 100644 index 0000000..74849fd --- /dev/null +++ b/product/stream_processor/handlers/process_stream.py @@ -0,0 +1,74 @@ +from typing import Any + +from aws_lambda_powertools import Logger +from aws_lambda_powertools.utilities.data_classes.dynamo_db_stream_event import DynamoDBStreamEvent +from aws_lambda_powertools.utilities.typing import LambdaContext + +from product.models.products.product import ProductChangeNotification +from product.stream_processor.domain_logic.product_notification import notify_product_updates +from product.stream_processor.integrations.events.event_handler import ProductChangeNotificationHandler +from product.stream_processor.integrations.events.models.output import EventReceipt + +logger = Logger() + + +@logger.inject_lambda_context(log_event=True) +def process_stream( + event: dict[str, Any], + context: LambdaContext, + event_handler: ProductChangeNotificationHandler | None = None, +) -> EventReceipt: + """Process batch of Amazon DynamoDB Stream containing product changes. + + + Parameters + ---------- + event : dict[str, Any] + DynamoDB Stream event. + + See [sample](https://docs.aws.amazon.com/lambda/latest/dg/with-ddb.html#events-sample-dynamodb) + context : LambdaContext + Lambda Context object. + + It is used to enrich our structured logging via Powertools for AWS Lambda. + + See [sample](https://docs.aws.amazon.com/lambda/latest/dg/python-context.html) + event_handler : ProductChangeNotificationHandler | None, optional + Event Handler to use to notify product changes, by default `ProductChangeNotificationHandler` + + Integrations + ------------ + + # Domain + + * `notify_product_updates` to notify `ProductChangeNotification` changes + + Returns + ------- + EventReceipt + Receipts for unsuccessfully and successfully published events. + + Raises + ------ + + ProductNotificationDeliveryError + Partial or total failures when sending notification. It allows the stream to stop at the exact same sequence number. + + This means sending notifications are at least once. + """ + # Until we create our handler product stream change input + stream_records = DynamoDBStreamEvent(event) + + product_updates = [] + for record in stream_records.records: + product_id = record.dynamodb.keys.get('id', '') # type: ignore[union-attr] + + match record.event_name: + case record.event_name.INSERT: # type: ignore[union-attr] + product_updates.append(ProductChangeNotification(product_id=product_id, status='ADDED')) + case record.event_name.MODIFY: # type: ignore[union-attr] + product_updates.append(ProductChangeNotification(product_id=product_id, status='UPDATED')) + case record.event_name.REMOVE: # type: ignore[union-attr] + product_updates.append(ProductChangeNotification(product_id=product_id, status='REMOVED')) + + return notify_product_updates(update=product_updates, event_handler=event_handler) diff --git a/product/stream_processor/schemas/__init__.py b/product/stream_processor/integrations/__init__.py similarity index 100% rename from product/stream_processor/schemas/__init__.py rename to product/stream_processor/integrations/__init__.py diff --git a/product/stream_processor/handlers/stream_handler.py b/product/stream_processor/integrations/events/__init__.py similarity index 100% rename from product/stream_processor/handlers/stream_handler.py rename to product/stream_processor/integrations/events/__init__.py diff --git a/product/stream_processor/integrations/events/base.py b/product/stream_processor/integrations/events/base.py new file mode 100644 index 0000000..d7f1873 --- /dev/null +++ b/product/stream_processor/integrations/events/base.py @@ -0,0 +1,32 @@ +from abc import ABC, abstractmethod +from typing import Any, Generic, TypeVar + +from product.stream_processor.integrations.events.models.input import Event +from product.stream_processor.integrations.events.models.output import EventReceipt + +T = TypeVar('T') + +# negative look ahead (?|char). Don't try to match the start of the string and any underscore that follows e.g., `_` and `__` + +# full regex: ((?!^)(? Product_Notification +# ProductNotificationV2 -> Product_Notification_V2 +# ProductHTTP -> Product_HTTP + + +class EventProvider(ABC): + + @abstractmethod + def send(self, payload: list[Event]) -> EventReceipt: + ... + + +class EventHandler(ABC, Generic[T]): + + def __init__(self, provider: EventProvider, event_source: str) -> None: + self.provider = provider + self.event_source = event_source + + @abstractmethod + def emit(self, payload: list[T], metadata: dict[str, Any] | None = None, correlation_id='') -> EventReceipt: + ... diff --git a/product/stream_processor/integrations/events/constants.py b/product/stream_processor/integrations/events/constants.py new file mode 100644 index 0000000..6a7e067 --- /dev/null +++ b/product/stream_processor/integrations/events/constants.py @@ -0,0 +1,2 @@ +DEFAULT_EVENT_VERSION = 'v1' +EVENTBRIDGE_PROVIDER_MAX_EVENTS_ENTRY = 10 diff --git a/product/stream_processor/integrations/events/event_handler.py b/product/stream_processor/integrations/events/event_handler.py new file mode 100644 index 0000000..feca1e7 --- /dev/null +++ b/product/stream_processor/integrations/events/event_handler.py @@ -0,0 +1,16 @@ +from typing import Any + +from product.models.products.product import ProductChangeNotification +from product.stream_processor.integrations.events.base import EventHandler, EventProvider +from product.stream_processor.integrations.events.functions import build_events_from_models +from product.stream_processor.integrations.events.models.output import EventReceipt + + +class ProductChangeNotificationHandler(EventHandler): + + def __init__(self, provider: EventProvider, event_source: str) -> None: + super().__init__(provider=provider, event_source=event_source) + + def emit(self, payload: list[ProductChangeNotification], metadata: dict[str, Any] | None = None, correlation_id='') -> EventReceipt: + event_payload = build_events_from_models(models=payload, metadata=metadata, correlation_id=correlation_id, event_source=self.event_source) + return self.provider.send(payload=event_payload) diff --git a/product/stream_processor/integrations/events/exceptions.py b/product/stream_processor/integrations/events/exceptions.py new file mode 100644 index 0000000..1e641d5 --- /dev/null +++ b/product/stream_processor/integrations/events/exceptions.py @@ -0,0 +1,9 @@ +from product.stream_processor.integrations.events.models.output import EventReceiptFail + + +class ProductNotificationDeliveryError(Exception): + + def __init__(self, message: str, receipts: list[EventReceiptFail]): + super().__init__(message) + self.message = message + self.receipts = receipts diff --git a/product/stream_processor/integrations/events/functions.py b/product/stream_processor/integrations/events/functions.py new file mode 100644 index 0000000..e4baa03 --- /dev/null +++ b/product/stream_processor/integrations/events/functions.py @@ -0,0 +1,44 @@ +import re +from typing import Any, Generator, Sequence, TypeVar +from uuid import uuid4 + +from product.stream_processor.integrations.events.constants import DEFAULT_EVENT_VERSION +from product.stream_processor.integrations.events.models.input import AnyModel, Event, EventMetadata + +T = TypeVar('T') + +_exclude_underscores = r'(?!^)(? str: + """ Convert ModelName (pascal) to MODEL_NAME (snake, uppercase)""" + return _pascal_to_snake_pattern.sub(r'_\1', model_name).upper() + + +def build_events_from_models(models: Sequence[AnyModel], event_source: str, metadata: dict[str, Any] | None = None, + correlation_id: str = '') -> list[Event]: + metadata = metadata or {} + correlation_id = correlation_id or f'{uuid4()}' + + events: list[Event] = [] + + for model in models: + event_name = convert_model_to_event_name(model_name=model.__class__.__name__) + event_version = getattr(model, '__version__', DEFAULT_EVENT_VERSION) # defaults to v1 + + events.append( + Event( + data=model, metadata=EventMetadata(event_name=event_name, event_source=event_source, event_version=event_version, + correlation_id=correlation_id, **metadata))) + + return events + + +def chunk_from_list(events: list[T], max_items: int) -> Generator[list[T], None, None]: + for idx in range(0, len(events), max_items): # start, stop, step + # slice the first 10 items, then the next 10 items starting from the index + yield from [events[idx:idx + max_items]] diff --git a/product/stream_processor/schemas/input.py b/product/stream_processor/integrations/events/models/__init__.py similarity index 100% rename from product/stream_processor/schemas/input.py rename to product/stream_processor/integrations/events/models/__init__.py diff --git a/product/stream_processor/integrations/events/models/input.py b/product/stream_processor/integrations/events/models/input.py new file mode 100644 index 0000000..3be7fb7 --- /dev/null +++ b/product/stream_processor/integrations/events/models/input.py @@ -0,0 +1,21 @@ +from datetime import datetime +from typing import Generic, TypeVar + +from pydantic import BaseModel, ConfigDict, Field + +AnyModel = TypeVar('AnyModel', bound=BaseModel) + + +class EventMetadata(BaseModel): + event_name: str + event_source: str + event_version: str + correlation_id: str + created_at: datetime = Field(default_factory=datetime.utcnow) + + model_config = ConfigDict(extra='allow') + + +class Event(BaseModel, Generic[AnyModel]): + data: AnyModel + metadata: EventMetadata diff --git a/product/stream_processor/integrations/events/models/output.py b/product/stream_processor/integrations/events/models/output.py new file mode 100644 index 0000000..743160a --- /dev/null +++ b/product/stream_processor/integrations/events/models/output.py @@ -0,0 +1,16 @@ +from pydantic import BaseModel, Field + + +class EventReceiptSuccess(BaseModel): + receipt_id: str + + +class EventReceiptFail(BaseModel): + receipt_id: str + error: str + details: dict + + +class EventReceipt(BaseModel): + success: list[EventReceiptSuccess] + failed: list[EventReceiptFail] = Field(default_factory=list) diff --git a/product/stream_processor/schemas/output.py b/product/stream_processor/integrations/events/providers/__init__.py similarity index 100% rename from product/stream_processor/schemas/output.py rename to product/stream_processor/integrations/events/providers/__init__.py diff --git a/product/stream_processor/integrations/events/providers/eventbridge.py b/product/stream_processor/integrations/events/providers/eventbridge.py new file mode 100644 index 0000000..a84bc81 --- /dev/null +++ b/product/stream_processor/integrations/events/providers/eventbridge.py @@ -0,0 +1,87 @@ +import os +from typing import TYPE_CHECKING, Generator, Optional + +import boto3 +import botocore.exceptions + +from product.constants import XRAY_TRACE_ID_ENV +from product.stream_processor.integrations.events.base import EventProvider +from product.stream_processor.integrations.events.constants import EVENTBRIDGE_PROVIDER_MAX_EVENTS_ENTRY +from product.stream_processor.integrations.events.exceptions import ProductNotificationDeliveryError +from product.stream_processor.integrations.events.functions import chunk_from_list +from product.stream_processor.integrations.events.models.input import Event +from product.stream_processor.integrations.events.models.output import EventReceipt, EventReceiptFail, EventReceiptSuccess + +if TYPE_CHECKING: + from mypy_boto3_events import EventBridgeClient + from mypy_boto3_events.type_defs import PutEventsRequestEntryTypeDef, PutEventsResponseTypeDef + + +class EventBridge(EventProvider): + + def __init__(self, bus_name: str, client: Optional['EventBridgeClient'] = None): + self.bus_name = bus_name + self.client = client or boto3.client('events') + + def send(self, payload: list[Event]) -> EventReceipt: + success: list[EventReceiptSuccess] = [] + failed: list[EventReceiptFail] = [] + events = self.build_put_events_requests(payload) + + for batch in events: + try: + result = self.client.put_events(Entries=batch) + ok, not_ok = self._collect_receipts(result) + success.extend(ok) + failed.extend(not_ok) + except botocore.exceptions.ClientError as exc: + error_message = exc.response['Error']['Message'] + + receipt = EventReceiptFail(receipt_id='', error='error_message', details=exc.response['ResponseMetadata']) + raise ProductNotificationDeliveryError(f'Failed to deliver all events: {error_message}', receipts=[receipt]) from exc + + return EventReceipt(success=success, failed=failed) + + def build_put_events_requests(self, payload: list[Event]) -> Generator[list['PutEventsRequestEntryTypeDef'], None, None]: + trace_id = os.environ.get(XRAY_TRACE_ID_ENV) + + for chunk in chunk_from_list(events=payload, max_items=EVENTBRIDGE_PROVIDER_MAX_EVENTS_ENTRY): + events: list['PutEventsRequestEntryTypeDef'] = [] + + for event in chunk: + # 'Time' field is not included to be able to measure end-to-end latency later (time - created_at) + event_request: 'PutEventsRequestEntryTypeDef' = { + 'Source': event.metadata.event_source, + 'DetailType': event.metadata.event_name, + 'Detail': event.model_dump_json(), + 'EventBusName': self.bus_name, + } + + if trace_id: + event_request['TraceHeader'] = trace_id + + events.append(event_request) + + yield events + + @staticmethod + def _collect_receipts(result: 'PutEventsResponseTypeDef') -> tuple[list[EventReceiptSuccess], list[EventReceiptFail]]: + successes: list[EventReceiptSuccess] = [] + fails: list[EventReceiptFail] = [] + + for receipt in result['Entries']: + error_message = receipt.get('ErrorMessage') + event_id = receipt.get('EventId', '') + + if error_message: + error_code = receipt.get('ErrorCode') + fails.append(EventReceiptFail(receipt_id=event_id, error=error_message, details={'error_code': error_code})) + else: + successes.append(EventReceiptSuccess(receipt_id=event_id)) + + # NOTE: Improve this error by correlating which entry failed to send. + # We will fail regardless, but it'll be useful for logging and correlation later on. + if result['FailedEntryCount'] > 0: + raise ProductNotificationDeliveryError(f'Failed to deliver {len(fails)} events', receipts=fails) + + return successes, fails diff --git a/pyproject.toml b/pyproject.toml index e7bdd51..2265c16 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -55,6 +55,12 @@ types-requests = "*" toml = "*" +[tool.poetry.group.dev.dependencies] +mypy-boto3-events = "^1.28.46" +pytest-socket = "^0.6.0" +mkdocstrings = "^0.23.0" +mkdocstrings-python = "^1.7.1" + [tool.isort] py_version = 311 multi_line_output = 3 @@ -74,3 +80,6 @@ skip = [ [tool.yapfignore] ignore_patterns = [".git", ".venv", ".build", "cdk.out", "node_modules"] + +[tool.pytest.ini_options] +testpaths = "tests" diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index 49895bc..d1222e9 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -18,7 +18,7 @@ def init(): os.environ[POWER_TOOLS_LOG_LEVEL] = 'DEBUG' os.environ['REST_API'] = 'https://www.ranthebuilder.cloud/api' os.environ['ROLE_ARN'] = 'arn:partition:service:region:account-id:resource-type:resource-id' - os.environ['AWS_DEFAULT_REGION'] = 'us-east-1' # used for appconfig mocked boto calls + os.environ['AWS_DEFAULT_REGION'] = os.environ.get('AWS_DEFAULT_REGION', 'us-east-1') # used for appconfig mocked boto calls os.environ['TABLE_NAME'] = get_stack_output(TABLE_NAME_OUTPUT) os.environ['IDEMPOTENCY_TABLE_NAME'] = get_stack_output(IDEMPOTENCY_TABLE_NAME_OUTPUT) diff --git a/tests/unit/stream_processor/__init__.py b/tests/unit/stream_processor/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/unit/stream_processor/conftest.py b/tests/unit/stream_processor/conftest.py new file mode 100644 index 0000000..2fb598f --- /dev/null +++ b/tests/unit/stream_processor/conftest.py @@ -0,0 +1,52 @@ +from typing import Any, Generator, Sequence, TypeVar + +from pytest_socket import disable_socket + +from product.models.products.product import ProductChangeNotification +from product.stream_processor.integrations.events.base import EventProvider +from product.stream_processor.integrations.events.event_handler import ProductChangeNotificationHandler +from product.stream_processor.integrations.events.functions import build_events_from_models +from product.stream_processor.integrations.events.models.input import Event +from product.stream_processor.integrations.events.models.output import EventReceipt, EventReceiptSuccess + + +def pytest_runtest_setup(): + """Disable Unix and TCP sockets for Data masking tests""" + disable_socket() + + +T = TypeVar('T') +Fixture = Generator[T, None, None] + +# Fakes are in-memory implementations of our interface, serving the following purposes: +# -- Remove the need for mocks that need to be aware of scope and return types +# -- Make it easier to assert data structures that would be hard otherwise to introspect +# -- Simple reference for an EventHandler and EventProvider + + +class FakeProvider(EventProvider): + + def send(self, payload: Sequence[Event]) -> EventReceipt: + notifications = [EventReceiptSuccess(receipt_id='test') for _ in payload] + return EventReceipt(success=notifications) + + +class FakeEventHandler(ProductChangeNotificationHandler): + + def __init__(self, provider: EventProvider = FakeProvider(), event_source: str = 'fake') -> None: + super().__init__(provider=provider, event_source=event_source) + self.published_payloads: list[ProductChangeNotification] = [] + + def emit(self, payload: list[ProductChangeNotification], metadata: dict[str, Any] | None = None, correlation_id='') -> EventReceipt: + metadata = metadata or {} + event_payload = build_events_from_models(models=payload, metadata=metadata, correlation_id=correlation_id, event_source='fake') + receipt = self.provider.send(payload=event_payload) + + self.published_payloads.extend(payload) + return receipt + + def __len__(self): + return len(self.published_payloads) + + def __contains__(self, item: ProductChangeNotification): + return item in self.published_payloads diff --git a/tests/unit/stream_processor/data_builder.py b/tests/unit/stream_processor/data_builder.py new file mode 100644 index 0000000..effc1da --- /dev/null +++ b/tests/unit/stream_processor/data_builder.py @@ -0,0 +1,94 @@ +"""This will be replaced with hypothesis later""" +import random +import time +from typing import Any +from uuid import uuid4 + +from product.models.products.product import ProductChangeNotification + + +def generate_dynamodb_stream_events(product_id: str = '8c18c85a-0f10-4b73-b54a-07ab0d381018',) -> dict[str, Any]: + return { + 'Records': [ + { + 'eventID': + 'af0065970f39f49c7d014079db1b86ce', + 'eventName': + 'INSERT', + 'eventVersion': + '1.1', + 'eventSource': + 'aws:dynamodb', + 'awsRegion': + 'eu-west-1', + 'dynamodb': { + 'ApproximateCreationDateTime': time.time(), + 'Keys': { + 'id': { + 'S': f'{product_id}' + } + }, + 'NewImage': { + 'price': { + 'N': '1' + }, + 'name': { + 'S': 'test' + }, + 'id': { + 'S': f'{product_id}' + }, + }, + 'SequenceNumber': f'{random.randint(a=10**24, b=10**25 - 1)}', + 'SizeBytes': 91, + 'StreamViewType': 'NEW_AND_OLD_IMAGES', + }, + 'eventSourceARN': + 'arn:aws:dynamodb:eu-west-1:123456789012:table/lessa-stream-processor-ProductCruddbproducts/stream/2023-09-29T09:00:01.491', + }, + { + 'eventID': + '4ef9babf010f884033a2bd761105f392', + 'eventName': + 'REMOVE', + 'eventVersion': + '1.1', + 'eventSource': + 'aws:dynamodb', + 'awsRegion': + 'eu-west-1', + 'dynamodb': { + 'ApproximateCreationDateTime': time.time(), + 'Keys': { + 'id': { + 'S': f'{product_id}' + } + }, + 'OldImage': { + 'price': { + 'N': '1' + }, + 'name': { + 'S': 'test' + }, + 'id': { + 'S': f'{product_id}' + }, + }, + 'SequenceNumber': f'{random.randint(a=10**24, b=10**25 - 1)}', + 'SizeBytes': 91, + 'StreamViewType': 'NEW_AND_OLD_IMAGES', + }, + 'eventSourceARN': + 'arn:aws:dynamodb:eu-west-1:123456789012:table/lessa-stream-processor-ProductCruddbproducts/stream/2023-09-29T09:00:01.491', + }, + ] + } + + +def generate_product_notifications(product_id: str = '') -> list[ProductChangeNotification]: + product_id = product_id or f'{uuid4()}' + return [ + ProductChangeNotification(product_id=product_id, status=random.choice(['ADDED', 'REMOVED', 'UPDATED'])), + ProductChangeNotification(product_id=product_id, status=random.choice(['ADDED', 'REMOVED', 'UPDATED'])), + ] diff --git a/tests/unit/stream_processor/test_eventbridge_provider.py b/tests/unit/stream_processor/test_eventbridge_provider.py new file mode 100644 index 0000000..face696 --- /dev/null +++ b/tests/unit/stream_processor/test_eventbridge_provider.py @@ -0,0 +1,208 @@ +from uuid import uuid4 + +import boto3 +import pytest +from botocore import stub +from pydantic import BaseModel + +from product.constants import XRAY_TRACE_ID_ENV +from product.stream_processor.integrations.events.constants import EVENTBRIDGE_PROVIDER_MAX_EVENTS_ENTRY +from product.stream_processor.integrations.events.exceptions import ProductNotificationDeliveryError +from product.stream_processor.integrations.events.functions import build_events_from_models +from product.stream_processor.integrations.events.providers.eventbridge import EventBridge + + +def test_eventbridge_build_put_events_from_event_payload(): + # GIVEN a list of events from a SampleNotification model + class SampleNotification(BaseModel): + message: str + + __version__ = 'V1' + + notification = SampleNotification(message='test') + events = build_events_from_models(models=[notification], event_source='test') + + # WHEN EventBridge provider builds a PutEvents request + event_provider = EventBridge(bus_name='test_bus') + requests = event_provider.build_put_events_requests(payload=events) + + # THEN EventBridge PutEvents request should match our metadata and model data + request = next(requests)[0] + event = events[0] + + assert request['Source'] == event.metadata.event_source + assert request['Detail'] == event.model_dump_json() + assert request['DetailType'] == event.metadata.event_name + assert request['EventBusName'] == event_provider.bus_name + + +def test_eventbridge_build_put_events_from_event_payload_include_trace_header(monkeypatch: pytest.MonkeyPatch): + # GIVEN X-Ray Trace ID is available in the environment + trace_id = '90835161-3067-47ba-8126-fda76dfdb0b0' + monkeypatch.setenv(XRAY_TRACE_ID_ENV, trace_id) + + class SampleNotification(BaseModel): + message: str + + __version__ = 'v1' + + event_bus_name = 'sample_bus' + notification = SampleNotification(message='test') + events = build_events_from_models(models=[notification], event_source='test') + event_provider = EventBridge(bus_name=event_bus_name) + + # WHEN EventBridge provider builds a PutEvents request + requests = event_provider.build_put_events_requests(payload=events) + + # THEN PutEvents request should include 'TraceHeader' with the available X-Ray Trace ID + entry = next(requests)[0] + assert entry['TraceHeader'] == trace_id + + +def test_eventbridge_build_put_events_respect_max_entries_limit(): + # GIVEN an even number of events to be sent to EventBridge PutEvents API that are higher than 10 (limit) + class SampleNotification(BaseModel): + message: str + + number_of_events = 20 + + notifications = [SampleNotification(message='test') for _ in range(number_of_events)] + events = build_events_from_models(models=notifications, event_source='test') + + # WHEN EventBridge provider builds a PutEvents request + requests = EventBridge(bus_name='test_bus').build_put_events_requests(payload=events) + + # THEN we should have a generator with two batches of the maximum permitted entry (EVENTBRIDGE_PROVIDER_MAX_EVENTS_ENTRY) + first_batch = next(requests) + second_batch = next(requests) + + assert len(first_batch) == EVENTBRIDGE_PROVIDER_MAX_EVENTS_ENTRY + assert len(second_batch) == EVENTBRIDGE_PROVIDER_MAX_EVENTS_ENTRY + assert len(list(requests)) == 0 + + +def test_eventbridge_put_events_with_stubber(): + # GIVEN a list of events from a SampleNotification model and an expected PutEvents request + class SampleNotification(BaseModel): + message: str + + event_bus_name = 'sample_bus' + event_source = 'test' + + notification = SampleNotification(message='testing') + events = build_events_from_models(models=[notification], event_source=event_source) + event = events[0] + + put_events_request = { + 'Entries': [{ + 'Source': event_source, + 'DetailType': event.metadata.event_name, + 'Detail': event.model_dump_json(), + 'EventBusName': event_bus_name + }] + } + + put_events_response = { + 'Entries': [{ + 'EventId': f'{uuid4()}', + }], + 'FailedEntryCount': 0 + } + + # WHEN EventBridge receives a stubbed client and send the event payload + client = boto3.client('events') + stubber = stub.Stubber(client) + stubber.add_response(method='put_events', expected_params=put_events_request, service_response=put_events_response) + stubber.activate() + + event_provider = EventBridge(bus_name=event_bus_name, client=client) + event_provider.send(payload=events) + + # THEN we should use the stubbed client to send the events + # it should lead to no parameter validation error, runtime error on response manipulation syntax errors + + stubber.assert_no_pending_responses() + stubber.deactivate() + + +def test_eventbridge_put_events_with_stubber_partial_failure(): + # GIVEN a list of events from a SampleNotification model and an expected PutEvents request + class SampleNotification(BaseModel): + message: str + + event_bus_name = 'sample_bus' + event_source = 'test' + + notification = SampleNotification(message='testing') + events = build_events_from_models(models=[notification], event_source=event_source) + event = events[0] + + expected_failure_count = 1 + put_events_request = { + 'Entries': [{ + 'Source': event_source, + 'DetailType': event.metadata.event_name, + 'Detail': event.model_dump_json(), + 'EventBusName': event_bus_name + }] + } + + put_events_response = { + 'Entries': [ + { + 'EventId': f'{uuid4()}', + }, + { + # https://docs.aws.amazon.com/eventbridge/latest/APIReference/API_PutEvents.html#API_PutEvents_Errors + 'ErrorCode': 'InternalException', + 'ErrorMessage': 'An internal error occurred' + } + ], + 'FailedEntryCount': expected_failure_count + } + + # WHEN EventBridge receives a stubbed client with at least one FailedEntryCount + client = boto3.client('events') + stubber = stub.Stubber(client) + stubber.add_response(method='put_events', expected_params=put_events_request, service_response=put_events_response) + stubber.activate() + + event_provider = EventBridge(bus_name=event_bus_name, client=client) + + with pytest.raises(ProductNotificationDeliveryError) as exc: + event_provider.send(payload=events) + + # THEN we should receive a ProductNotificationDeliveryError along with its receipts + stubber.assert_no_pending_responses() + stubber.deactivate() + + assert len(exc.value.receipts) == expected_failure_count + + +def test_eventbridge_put_events_with_stubber_service_failure(): + # GIVEN a list of events from a SampleNotification model and an expected PutEvents request + class SampleNotification(BaseModel): + message: str + + event_bus_name = 'sample_bus' + event_source = 'test' + + notification = SampleNotification(message='testing') + events = build_events_from_models(models=[notification], event_source=event_source) + + # WHEN EventBridge receives a stubbed client with at least one FailedEntryCount + client = boto3.client('events') + stubber = stub.Stubber(client) + stubber.add_client_error(method='put_events', http_status_code=500, service_error_code='InternalException', service_message='Oops') + stubber.activate() + + event_provider = EventBridge(bus_name=event_bus_name, client=client) + + with pytest.raises(ProductNotificationDeliveryError) as exc: + event_provider.send(payload=events) + + # THEN we should receive a ProductNotificationDeliveryError along with its receipts + stubber.assert_no_pending_responses() + stubber.deactivate() + + assert len(exc.value.receipts) == 1 diff --git a/tests/unit/stream_processor/test_events.py b/tests/unit/stream_processor/test_events.py new file mode 100644 index 0000000..d2d4f0c --- /dev/null +++ b/tests/unit/stream_processor/test_events.py @@ -0,0 +1,80 @@ +from uuid import uuid4 + +from pydantic import BaseModel + +from product.stream_processor.integrations.events.constants import DEFAULT_EVENT_VERSION +from product.stream_processor.integrations.events.functions import build_events_from_models, convert_model_to_event_name + + +def test_model_to_standard_event(): + # GIVEN a model with __version__ set + class SampleNotification(BaseModel): + message: str + + __version__ = 'v1' + + notification = SampleNotification(message='testing') + event_source = 'test' + + # WHEN we convert to an event + event = build_events_from_models(models=[notification], event_source=event_source)[0] + + # THEN the event should contain our notification in `.data`, all metadata under `.metadata` + # infer the event version from the model, event name infers model name from PascalCase to SNAKE_CASE_UPPER + assert event.data == notification + assert event.metadata.event_source == event_source + assert event.metadata.event_version == notification.__version__ + assert event.metadata.event_name == convert_model_to_event_name(notification.__class__.__name__) + assert event.metadata.correlation_id != '' + assert event.metadata.created_at != '' + + +def test_model_to_standard_event_with_correlation_id(): + # GIVEN a model with __version__ set + class SampleNotification(BaseModel): + message: str + + __version__ = 'v1' + + notification = SampleNotification(message='testing') + event_source = 'test' + correlation_id = f'{uuid4()}' + + # WHEN we convert to an event + event = build_events_from_models(models=[notification], event_source=event_source, correlation_id=correlation_id)[0] + + # THEN we should have the same correlation ID in the final event + assert event.metadata.correlation_id == correlation_id + + +def test_model_to_standard_event_with_additional_metadata(): + # GIVEN a model with __version__ set + class SampleNotification(BaseModel): + message: str + + __version__ = 'v1' + + notification = SampleNotification(message='testing') + event_source = 'test' + metadata = {'product_id': 'test', 'username': 'lessa'} + + # WHEN we convert to an event + event = build_events_from_models(models=[notification], event_source=event_source, metadata=metadata)[0] + + # THEN we should have additional metadata included in the final event + assert metadata.items() <= event.metadata.model_dump().items() + + +def test_model_without_version_to_standard_event(): + # GIVEN a model without __version__ set + class SampleNotification(BaseModel): + message: str + + notification = SampleNotification(message='testing') + event_source = 'test' + + # WHEN we convert to an event + event = build_events_from_models(models=[notification], event_source=event_source)[0] + + # THEN we should add a default v1 version + assert event.metadata.event_version == DEFAULT_EVENT_VERSION diff --git a/tests/unit/stream_processor/test_functions.py b/tests/unit/stream_processor/test_functions.py new file mode 100644 index 0000000..4533eed --- /dev/null +++ b/tests/unit/stream_processor/test_functions.py @@ -0,0 +1,91 @@ +from pydantic import BaseModel + +from product.stream_processor.integrations.events.functions import build_events_from_models, chunk_from_list, convert_model_to_event_name +from product.stream_processor.integrations.events.models.input import Event + + +def test_chunk_from_list_returns_empty_list_when_list_is_empty(): + # GIVEN an empty list of items and a chunk size of 3 + list_of_items = [] + chunk_size = 3 + expected_chunk = [] + + # WHEN we call chunk_from_list + actual_chunk = chunk_from_list(list_of_items, chunk_size) + + # THEN we get an empty chunk + assert list(actual_chunk) == expected_chunk + + +def test_chunk_from_list_returns_single_chunk_when_list_size_is_less_than_chunk_size(): + # GIVEN a list of items and a chunk size of 3 + list_of_items = [1, 2, 3] + chunk_size = 3 + expected_chunk = [1, 2, 3] + + # WHEN we call chunk_from_list + actual_chunk = next(chunk_from_list(list_of_items, chunk_size)) + + # THEN we get a chunk of the same size as the list + assert actual_chunk == expected_chunk + assert len(actual_chunk) == len(expected_chunk) + assert len(actual_chunk) == len(list_of_items) + + +def test_chunk_from_list_returns_multiple_chunks_when_list_size_is_greater_than_chunk_size(): + # GIVEN a list of items and a chunk size of 3 + list_of_items = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] + chunk_size = 3 + expected_chunks = [[1, 2, 3], [4, 5, 6], [7, 8, 9], [10]] + + # WHEN we call chunk_from_list + actual_chunks = list(chunk_from_list(list_of_items, chunk_size)) + + # THEN we get a chunk of the same size as the list + assert actual_chunks == expected_chunks + + +def test_convert_pascal_case_to_snake_case_with_convert_model_to_event_name(): + # GIVEN a model name in pascal case + model_name = 'ProductNotification' + + # WHEN we call convert_model_to_event_name + event_name = convert_model_to_event_name(model_name) + + # THEN we get the expected event name + assert event_name == 'product_notification'.upper() + + +def test_convert_model_to_event_name_with_uppercase(): + # GIVEN a model name in pascal case + model_name = 'ProductHTTPNotification' + + # WHEN we call convert_model_to_event_name + event_name = convert_model_to_event_name(model_name) + + # THEN we get the expected event name + assert event_name == 'product_http_notification'.upper() + + +def test_convert_model_to_event_name_with_numbers(): + # GIVEN a model name in pascal case + model_name = 'ProductHTTPNotification123' + + # WHEN we call convert_model_to_event_name + event_name = convert_model_to_event_name(model_name) + + # THEN we get the expected event name + assert event_name == 'product_http_notification123'.upper() + + +def test_build_events_from_models(): + # GIVEN any Pydantic model + class SampleNotification(BaseModel): + message: str + + # WHEN we call build_events_from_models with all required fields + notification = SampleNotification(message='Hello World!') + event = build_events_from_models(models=[notification], event_source='sample') + + # THEN we get a list of Events + assert type(event[0]) is Event diff --git a/tests/unit/stream_processor/test_process_stream_handler.py b/tests/unit/stream_processor/test_process_stream_handler.py new file mode 100644 index 0000000..66360df --- /dev/null +++ b/tests/unit/stream_processor/test_process_stream_handler.py @@ -0,0 +1,31 @@ +from product.stream_processor.handlers.process_stream import process_stream +from tests.unit.stream_processor.conftest import FakeEventHandler +from tests.unit.stream_processor.data_builder import generate_dynamodb_stream_events +from tests.utils import generate_context + + +def test_process_stream_notify_product_updates(): + # GIVEN a DynamoDB stream event and a fake event handler + dynamodb_stream_events = generate_dynamodb_stream_events() + event_store = FakeEventHandler() + + # WHEN process_stream is called with a custom event handler + process_stream(event=dynamodb_stream_events, context=generate_context(), event_handler=event_store) + + # THEN the fake event handler should emit these product notifications + # and no errors should have been raised (e.g., no sockets, no DAL calls) + assert len(dynamodb_stream_events['Records']) == len(event_store) + + +# NOTE: this should fail once we have schema validation +def test_process_stream_with_empty_records(): + # GIVEN an empty DynamoDB stream event + event_store = FakeEventHandler() + event: dict[str, list] = {'Records': []} + + # WHEN process_stream is called with a custom event handler + process_stream(event=event, context=generate_context(), event_handler=event_store) + + # THEN the fake event handler should emit these product notifications + # and no errors should have been raised + assert len(event_store) == 0 diff --git a/tests/unit/stream_processor/test_product_notification.py b/tests/unit/stream_processor/test_product_notification.py new file mode 100644 index 0000000..a84a0d9 --- /dev/null +++ b/tests/unit/stream_processor/test_product_notification.py @@ -0,0 +1,16 @@ +from product.stream_processor.domain_logic.product_notification import notify_product_updates +from tests.unit.stream_processor.conftest import FakeEventHandler +from tests.unit.stream_processor.data_builder import generate_product_notifications + + +def test_product_notifications_are_emitted(): + # GIVEN a list of Product Notifications and a fake event handler + product_notifications = generate_product_notifications() + event_store = FakeEventHandler() + + # WHEN the product notifications are processed + receipt = notify_product_updates(update=product_notifications, event_handler=event_store) + + # THEN the fake event handler should emit these product notifications + assert len(receipt.success) == len(product_notifications) + assert all(notification in event_store for notification in product_notifications) diff --git a/tests/utils.py b/tests/utils.py index 5fedb2e..9560b20 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -16,6 +16,9 @@ def generate_random_string(length: int = 7): def generate_context() -> LambdaContext: context = LambdaContext() context._aws_request_id = '888888' + context._function_name = 'test' + context._memory_limit_in_mb = 128 + context._invoked_function_arn = ('arn:aws:lambda:eu-west-1:123456789012:function:test') return context