Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(stream_processor): initial version for processing product changes #76

Merged
merged 59 commits into from
Oct 4, 2023

Conversation

heitorlessa
Copy link
Collaborator

@heitorlessa heitorlessa commented Sep 30, 2023

closes #75

Summary

Please provide a summary of what's being changed

Initial skeleton to create stream processor to handle product updates coming from CRUD.

Changes

  • Create Lambda
  • Get sample stream event
  • Create domain logic skeleton
    • Create top-level models (product/models/)
    • Create Product model (product/models/products)
    • Create Product model validators (product/models/products/validators)
    • Create skeleton function to notify product changes
    • Create constants to keep track of Product Statuses (product/stream_processor/domain_logic/constants.py)
    • Create Product Notification model
    • Rename handlers/stream_handler to handlers/product_stream
    • Create placeholder unit test
  • Create DAL
    • Create protocol for emitting events
    • Make protocol generic to allow different types of events to be sent
    • Create EventBridge integration
    • Create model for event entry (service, data, metadata)
      • Update EventProvider to receive Input Event
    • Create model for event output (EventReceipt)
    • Slice EventBridge put_events to the max permitted
  • Create unit tests
    • Generate INSERT DynamoDB Stream event
    • Generate REMOVE DynamoDB Stream event
    • Skeleton test for handler
    • Test handler with event handler
    • Test product update notification
    • Test EventBridge contracts
    • Test Event standard
  • Document APIs (mkdocstrings)
    • Domain logic
    • Integrations
    • Models

Tasks for a separate PR due to size

  • Create integ tests (it'll need careful thought in infra)
  • Create E2E tests (will incorporate if PR isn't big)
  • Docs/examples for suboptimal and optimal
  • Implement Powertools
    • Batch
    • Create model for Stream Input
  • We're including dev requirements in the final build (needs issue, pants2 should make all of this simpler)

unrelated but noticeable changes

  • Support for cdk watch to speed prototyping
  • Add Pytest config to ignore tests in cdk.out
  • Allow git local branches named with _
  • Use getpass.getuser() over outdated os.getlogin() to take into account env vars (mine was root, now it's lessa)
  • Enable Pydantic Plugin for Mypy

User experience

If applicable, please share what the user experience looks like before and after this change

Checklist

If your change doesn't seem to apply, please leave them unchecked.

  • I have performed a self-review of this change
  • Changes have been tested
  • Changes are documented
  • PR title follows conventional commit semantics
Is this a breaking change?

RFC issue number:

Checklist:

  • Migration process documented
  • Implement warnings (if it can live side by side)

Acknowledgment

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

Disclaimer: We value your time and bandwidth. As such, any pull requests created on non-triaged issues might not be successful.

Signed-off-by: heitorlessa <lessa@amazon.co.uk>
Signed-off-by: heitorlessa <lessa@amazon.co.uk>
Signed-off-by: heitorlessa <lessa@amazon.co.uk>
Signed-off-by: heitorlessa <lessa@amazon.co.uk>
Signed-off-by: heitorlessa <lessa@amazon.co.uk>
Signed-off-by: heitorlessa <lessa@amazon.co.uk>
Signed-off-by: heitorlessa <lessa@amazon.co.uk>
Signed-off-by: heitorlessa <lessa@amazon.co.uk>
Signed-off-by: heitorlessa <lessa@amazon.co.uk>
Signed-off-by: heitorlessa <lessa@amazon.co.uk>
Signed-off-by: heitorlessa <lessa@amazon.co.uk>
Signed-off-by: heitorlessa <lessa@amazon.co.uk>
Signed-off-by: heitorlessa <lessa@amazon.co.uk>
Signed-off-by: heitorlessa <lessa@amazon.co.uk>
Signed-off-by: heitorlessa <lessa@amazon.co.uk>
Signed-off-by: heitorlessa <lessa@amazon.co.uk>
@heitorlessa heitorlessa marked this pull request as draft September 30, 2023 19:19
Signed-off-by: heitorlessa <lessa@amazon.co.uk>
Signed-off-by: heitorlessa <lessa@amazon.co.uk>
Signed-off-by: heitorlessa <lessa@amazon.co.uk>
Signed-off-by: heitorlessa <lessa@amazon.co.uk>
…typed

Signed-off-by: heitorlessa <lessa@amazon.co.uk>
Signed-off-by: heitorlessa <lessa@amazon.co.uk>
@heitorlessa
Copy link
Collaborator Author

heitorlessa commented Oct 4, 2023

make pr passing locally and should be in CI. I've enabled Pydantic plugin for Mypy to fix other false positive errors.

Waiting for CI before I rename dal to integrations to match the deck and make it easier to spot its intent.

Signed-off-by: heitorlessa <lessa@amazon.co.uk>
@heitorlessa
Copy link
Collaborator Author

I believe the deployment credentials error is because it's coming from my fork, and not as a branch from the repo. Everything else passed.

Gonna add docstrings now but implementation wise is ready for review.

@heitorlessa heitorlessa marked this pull request as ready for review October 4, 2023 12:43
@ran-isenberg
Copy link
Owner

@heitorlessa I'll need to port lots of code to the template , yikes. but thx for fixing lots of side issues

@ran-isenberg
Copy link
Owner

@heitorlessa I forked your branch and it passes all tests . coverage is down 2% though
you can run make coverage-tests and look at the coverage.xml file.
we can sort it later

@heitorlessa
Copy link
Collaborator Author

heitorlessa commented Oct 4, 2023

@heitorlessa I forked your branch and it passes all tests . coverage is down 2% though you can run make coverage-tests and look at the coverage.xml file. we can sort it later

sure thing. I'm working on docstrings now.

Check how cool this looks ;)

image

Signed-off-by: heitorlessa <lessa@amazon.co.uk>
Signed-off-by: heitorlessa <lessa@amazon.co.uk>
Signed-off-by: heitorlessa <lessa@amazon.co.uk>
Signed-off-by: heitorlessa <lessa@amazon.co.uk>
Signed-off-by: heitorlessa <lessa@amazon.co.uk>
Signed-off-by: heitorlessa <lessa@amazon.co.uk>
Signed-off-by: heitorlessa <lessa@amazon.co.uk>
@ran-isenberg
Copy link
Owner

when i run make docs i a 404 page, when i click on homepage link to the left, it works
@heitorlessa

Signed-off-by: heitorlessa <lessa@amazon.co.uk>
Signed-off-by: heitorlessa <lessa@amazon.co.uk>
@@ -55,6 +55,12 @@ types-requests = "*"
toml = "*"


[tool.poetry.group.dev.dependencies]
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we already have the group:
[tool.poetry.dev-dependencies]
so which is the best dev deps name?

@@ -18,7 +18,7 @@ def init():
os.environ[POWER_TOOLS_LOG_LEVEL] = 'DEBUG'
os.environ['REST_API'] = 'https://www.ranthebuilder.cloud/api'
os.environ['ROLE_ARN'] = 'arn:partition:service:region:account-id:resource-type:resource-id'
os.environ['AWS_DEFAULT_REGION'] = 'us-east-1' # used for appconfig mocked boto calls
os.environ['AWS_DEFAULT_REGION'] = os.environ.get('AWS_DEFAULT_REGION', 'us-east-1') # used for appconfig mocked boto calls
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove the appconfig comment, it's my bad :)

_pascal_to_snake_pattern = re.compile(rf'({_exclude_underscores}{_pascal_case}{_or}{_followed_by_lower_case_or_digit}')


def convert_model_to_event_name(model_name: str) -> str:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i know that pydantic has support for automatic serializations thagt tranfser from whatever format to another
see this example:

def to_camel_lower_case(snake_str: str) -> str:
components = snake_str.split('_')
return components[0] + ''.join(x.capitalize() for x in components[1:])

then you add to the pydantic class this config. it means that when you do .model_dump it will run the to_camel_lower_case function automatically
class Config:
alias_generator = to_camel_lower_case
allow_population_by_field_name = False


T = TypeVar('T')

_exclude_underscores = r'(?!^)(?<!_)'
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

functions is a bit of generic name ? maybe utils? or move the schema related ones to the schema folder?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose functions as a more narrow version of utils. Similar to tests/.../data_builders.py instead of utils.py. That way I know where utility functions are, instead of bin packing into utils.py.

Makes sense?

from product.models.products.product import ProductChangeNotification


def generate_dynamodb_stream_events(product_id: str = '8c18c85a-0f10-4b73-b54a-07ab0d381018',) -> dict[str, Any]:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you move the generate_product_id from crud_utils.py and you use it instead of a default id

class EventMetadata(BaseModel):
event_name: str
event_source: str
event_version: str
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a default value here?



class Event(BaseModel, Generic[AnyModel]):
data: AnyModel
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldnt the timestamp and version reside here? they are like a hint for parsing everything else



class EventReceiptFail(BaseModel):
receipt_id: str
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here for field min length, so they are not empty

error_message = exc.response['Error']['Message']

receipt = EventReceiptFail(receipt_id='', error='error_message', details=exc.response['ResponseMetadata'])
raise ProductNotificationDeliveryError(f'Failed to deliver all events: {error_message}', receipts=[receipt]) from exc
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

about the error handling here, if we raise the exception, we stop going over the batch right? shouldnt we do failed.extend(batch somehow) and do continue in the loop?

@ran-isenberg
Copy link
Owner

@heitorlessa top class code, really.
I've left many questions, comments like : very nice, and small suggestions.
lets merge and fix that in next PR.
I'll start working on my event handler.

but damn, this one was hard, lots of files and required concentration.

@ran-isenberg ran-isenberg merged commit 576c669 into ran-isenberg:main Oct 4, 2023
3 of 4 checks passed
heitorlessa added a commit that referenced this pull request Oct 4, 2023
* main:
  fix: fix broken main, add automatic github oages publish (#80)
  feat(stream_processor): initial version for processing product changes (#76)

Signed-off-by: heitorlessa <lessa@amazon.co.uk>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Create skeleton for async processor
2 participants