Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Support for DynamodbOnlineStoreConfig endpoint_url parameter #2485

Merged
3 changes: 2 additions & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,8 @@ To test across clouds, on top of setting up Redis, you also need GCP / AWS / Sno

**AWS**
1. TODO(adchia): flesh out setting up AWS login (or create helper script)
2. Modify `RedshiftDataSourceCreator` to use your credentials
2. To avoid AWS fees `DynamoDBOnlineStore` can be tested locally if you deploy DynamoDB Locally on your Computer, for this setup a [Local DynamoDB on your Computer](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DynamoDBLocal.DownloadingAndRunning.html)
3. Modify `RedshiftDataSourceCreator` to use your credentials

**Snowflake**
- See https://signup.snowflake.com/
Expand Down
65 changes: 43 additions & 22 deletions sdk/python/feast/infra/online_stores/dynamodb.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,20 @@ class DynamoDBOnlineStoreConfig(FeastConfigBaseModel):
type: Literal["dynamodb"] = "dynamodb"
"""Online store type selector"""

batch_size: int = 40
"""Number of items to retrieve in a DynamoDB BatchGetItem call."""

endpoint_url: str = None
"""DynamoDB local development endpoint Url, i.e. http://localhost:8000"""

region: StrictStr
"""AWS Region Name"""

table_name_template: StrictStr = "{project}.{table_name}"
"""DynamoDB table name template"""

sort_response: bool = True
"""Whether or not to sort BatchGetItem response."""

batch_size: int = 40
"""Number of items to retrieve in a DynamoDB BatchGetItem call."""
table_name_template: StrictStr = "{project}.{table_name}"
"""DynamoDB table name template"""


class DynamoDBOnlineStore(OnlineStore):
Expand Down Expand Up @@ -95,8 +98,12 @@ def update(
"""
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
dynamodb_client = self._get_dynamodb_client(online_config.region)
dynamodb_resource = self._get_dynamodb_resource(online_config.region)
dynamodb_client = self._get_dynamodb_client(
online_config.region, online_config.endpoint_url
)
dynamodb_resource = self._get_dynamodb_resource(
online_config.region, online_config.endpoint_url
)

for table_instance in tables_to_keep:
try:
Expand Down Expand Up @@ -141,7 +148,9 @@ def teardown(
"""
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
dynamodb_resource = self._get_dynamodb_resource(online_config.region)
dynamodb_resource = self._get_dynamodb_resource(
online_config.region, online_config.endpoint_url
)

for table in tables:
_delete_table_idempotent(
Expand Down Expand Up @@ -175,12 +184,14 @@ def online_write_batch(
"""
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
dynamodb_resource = self._get_dynamodb_resource(online_config.region)
dynamodb_resource = self._get_dynamodb_resource(
online_config.region, online_config.endpoint_url
)

table_instance = dynamodb_resource.Table(
_get_table_name(online_config, config, table)
)
with table_instance.batch_writer() as batch:
with table_instance.batch_writer(overwrite_by_pkeys=["entity_id"]) as batch:
for entity_key, features, timestamp, created_ts in data:
entity_id = compute_entity_id(entity_key)
batch.put_item(
Expand Down Expand Up @@ -217,7 +228,9 @@ def online_read(
"""
online_config = config.online_store
assert isinstance(online_config, DynamoDBOnlineStoreConfig)
dynamodb_resource = self._get_dynamodb_resource(online_config.region)
dynamodb_resource = self._get_dynamodb_resource(
online_config.region, online_config.endpoint_url
)
table_instance = dynamodb_resource.Table(
_get_table_name(online_config, config, table)
)
Expand Down Expand Up @@ -260,14 +273,16 @@ def online_read(
result.extend(batch_size_nones)
return result

def _get_dynamodb_client(self, region: str):
def _get_dynamodb_client(self, region: str, endpoint_url: str):
if self._dynamodb_client is None:
self._dynamodb_client = _initialize_dynamodb_client(region)
self._dynamodb_client = _initialize_dynamodb_client(region, endpoint_url)
return self._dynamodb_client

def _get_dynamodb_resource(self, region: str):
def _get_dynamodb_resource(self, region: str, endpoint_url: str):
if self._dynamodb_resource is None:
self._dynamodb_resource = _initialize_dynamodb_resource(region)
self._dynamodb_resource = _initialize_dynamodb_resource(
region, endpoint_url
)
return self._dynamodb_resource

def _sort_dynamodb_response(self, responses: list, order: list):
Expand All @@ -285,12 +300,12 @@ def _sort_dynamodb_response(self, responses: list, order: list):
return table_responses_ordered


def _initialize_dynamodb_client(region: str):
return boto3.client("dynamodb", region_name=region)
def _initialize_dynamodb_client(region: str, endpoint_url: str = None):
return boto3.client("dynamodb", region_name=region, endpoint_url=endpoint_url)


def _initialize_dynamodb_resource(region: str):
return boto3.resource("dynamodb", region_name=region)
def _initialize_dynamodb_resource(region: str, endpoint_url: str = None):
return boto3.resource("dynamodb", region_name=region, endpoint_url=endpoint_url)


# TODO(achals): This form of user-facing templating is experimental.
Expand Down Expand Up @@ -362,8 +377,12 @@ def from_proto(dynamodb_table_proto: DynamoDBTableProto) -> Any:
)

def update(self):
dynamodb_client = _initialize_dynamodb_client(region=self.region)
dynamodb_resource = _initialize_dynamodb_resource(region=self.region)
dynamodb_client = _initialize_dynamodb_client(
region=self.region, endpoint_url=None
)
dynamodb_resource = _initialize_dynamodb_resource(
region=self.region, endpoint_url=None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the endpoint_url None here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@achals my mistake, should we add it as an attribute to DynamoDBTable ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not familiar with how the user interacts with the DynamoDBTable class, and how can the user configure it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add it for consistency, it's not currently used right now but will be used as part of upcoming features.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

)

try:
dynamodb_resource.create_table(
Expand All @@ -384,5 +403,7 @@ def update(self):
dynamodb_client.get_waiter("table_exists").wait(TableName=f"{self.name}")

def teardown(self):
dynamodb_resource = _initialize_dynamodb_resource(region=self.region)
dynamodb_resource = _initialize_dynamodb_resource(
region=self.region, endpoint_url=None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made some changes based on your suggestions, if you don't mind giving it a second check, please.

)
_delete_table_idempotent(dynamodb_resource, self.name)
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import re
import tempfile
import uuid
from copy import deepcopy
from dataclasses import dataclass
from datetime import datetime, timedelta
from pathlib import Path
Expand Down Expand Up @@ -69,9 +70,13 @@
IntegrationTestRepoConfig(python_feature_server=True),
]
if os.getenv("FEAST_IS_LOCAL_TEST", "False") != "True":
# Local integration tests use local Dynamodb
LOCAL_DYNAMO_CONFIG = deepcopy(DYNAMO_CONFIG)
LOCAL_DYNAMO_CONFIG["endpoint_url"] = "http://localhost:8000"
DEFAULT_FULL_REPO_CONFIGS.extend(
[
IntegrationTestRepoConfig(online_store=REDIS_CONFIG),
IntegrationTestRepoConfig(online_store=LOCAL_DYNAMO_CONFIG),
# GCP configurations
IntegrationTestRepoConfig(
provider="gcp",
Expand All @@ -87,7 +92,7 @@
IntegrationTestRepoConfig(
provider="aws",
offline_store_creator=RedshiftDataSourceCreator,
online_store=DYNAMO_CONFIG,
online_store=LOCAL_DYNAMO_CONFIG,
python_feature_server=True,
),
IntegrationTestRepoConfig(
Expand Down