Skip to content

Commit 95a245a

Browse files
author
Tsotne Tabidze
authored
AWS Template improvements (input prompt for configs, default to Redshift) (#1731)
* AWS Template improvements (input prompt for configs, default to Redshift) Signed-off-by: Tsotne Tabidze <tsotne@tecton.ai> * Add inquirer library to setup.py Signed-off-by: Tsotne Tabidze <tsotne@tecton.ai> * Remove inquirer library and fix linter Signed-off-by: Tsotne Tabidze <tsotne@tecton.ai> * Fix test_cli_aws.py Signed-off-by: Tsotne Tabidze <tsotne@tecton.ai>
1 parent 7972992 commit 95a245a

21 files changed

+194
-83
lines changed

sdk/python/feast/client.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -826,7 +826,7 @@ def ingest(
826826
>>> client = Client(core_url="localhost:6565")
827827
>>> ft_df = pd.DataFrame(
828828
>>> {
829-
>>> "datetime": [pd.datetime.now()],
829+
>>> "event_timestamp": [pd.datetime.now()],
830830
>>> "driver": [1001],
831831
>>> "rating": [4.3],
832832
>>> }

sdk/python/feast/constants.py

-3
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,6 @@ def __new__(cls, name, bases, attrs):
4949
return super().__new__(cls, name, bases, attrs)
5050

5151

52-
#: Default datetime column name for point-in-time join
53-
DATETIME_COLUMN: str = "datetime"
54-
5552
#: Environmental variable to specify Feast configuration file location
5653
FEAST_CONFIG_FILE_ENV: str = "FEAST_CONFIG"
5754

sdk/python/feast/driver_test_data.py

+6-6
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def create_driver_hourly_stats_df(drivers, start_date, end_date) -> pd.DataFrame
9191
"""
9292
Example df generated by this function:
9393
94-
| datetime | driver_id | conv_rate | acc_rate | avg_daily_trips | created |
94+
| event_timestamp | driver_id | conv_rate | acc_rate | avg_daily_trips | created |
9595
|------------------+-----------+-----------+----------+-----------------+------------------|
9696
| 2021-03-17 19:31 | 5010 | 0.229297 | 0.685843 | 861 | 2021-03-24 19:34 |
9797
| 2021-03-17 20:31 | 5010 | 0.781655 | 0.861280 | 769 | 2021-03-24 19:34 |
@@ -107,7 +107,7 @@ def create_driver_hourly_stats_df(drivers, start_date, end_date) -> pd.DataFrame
107107
"""
108108
df_hourly = pd.DataFrame(
109109
{
110-
"datetime": [
110+
"event_timestamp": [
111111
pd.Timestamp(dt, unit="ms", tz="UTC").round("ms")
112112
for dt in pd.date_range(
113113
start=start_date, end=end_date, freq="1H", closed="left"
@@ -129,7 +129,7 @@ def create_driver_hourly_stats_df(drivers, start_date, end_date) -> pd.DataFrame
129129
df_all_drivers = pd.concat([df_hourly_copy, df_all_drivers])
130130

131131
df_all_drivers.reset_index(drop=True, inplace=True)
132-
rows = df_all_drivers["datetime"].count()
132+
rows = df_all_drivers["event_timestamp"].count()
133133

134134
df_all_drivers["conv_rate"] = np.random.random(size=rows).astype(np.float32)
135135
df_all_drivers["acc_rate"] = np.random.random(size=rows).astype(np.float32)
@@ -152,7 +152,7 @@ def create_customer_daily_profile_df(customers, start_date, end_date) -> pd.Data
152152
"""
153153
Example df generated by this function:
154154
155-
| datetime | customer_id | current_balance | avg_passenger_count | lifetime_trip_count | created |
155+
| event_timestamp | customer_id | current_balance | avg_passenger_count | lifetime_trip_count | created |
156156
|------------------+-------------+-----------------+---------------------+---------------------+------------------|
157157
| 2021-03-17 19:31 | 1010 | 0.889188 | 0.049057 | 412 | 2021-03-24 19:38 |
158158
| 2021-03-18 19:31 | 1010 | 0.979273 | 0.212630 | 639 | 2021-03-24 19:38 |
@@ -168,7 +168,7 @@ def create_customer_daily_profile_df(customers, start_date, end_date) -> pd.Data
168168
"""
169169
df_daily = pd.DataFrame(
170170
{
171-
"datetime": [
171+
"event_timestamp": [
172172
pd.Timestamp(dt, unit="ms", tz="UTC").round("ms")
173173
for dt in pd.date_range(
174174
start=start_date, end=end_date, freq="1D", closed="left"
@@ -185,7 +185,7 @@ def create_customer_daily_profile_df(customers, start_date, end_date) -> pd.Data
185185

186186
df_all_customers.reset_index(drop=True, inplace=True)
187187

188-
rows = df_all_customers["datetime"].count()
188+
rows = df_all_customers["event_timestamp"].count()
189189

190190
df_all_customers["current_balance"] = np.random.random(size=rows).astype(np.float32)
191191
df_all_customers["avg_passenger_count"] = np.random.random(size=rows).astype(

sdk/python/feast/feature_store.py

-2
Original file line numberDiff line numberDiff line change
@@ -389,8 +389,6 @@ def get_historical_features(
389389
"""
390390
_feature_refs = self._get_features(features, feature_refs)
391391

392-
print(f"_feature_refs: {_feature_refs}")
393-
394392
all_feature_views = self._registry.list_feature_views(project=self.project)
395393
feature_views = list(
396394
view for view, _ in _group_feature_refs(_feature_refs, all_feature_views)

sdk/python/feast/infra/offline_stores/file.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -207,7 +207,7 @@ def evaluate_historical_retrieval():
207207
# Ensure that we delete dataframes to free up memory
208208
del df_to_join
209209

210-
# Move "datetime" column to front
210+
# Move "event_timestamp" column to front
211211
current_cols = entity_df_with_features.columns.tolist()
212212
current_cols.remove(entity_df_event_timestamp_col)
213213
entity_df_with_features = entity_df_with_features[

sdk/python/feast/infra/offline_stores/file_source.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ def __init__(
3535
or view. Only used for feature columns, not entities or timestamp columns.
3636
3737
Examples:
38-
>>> FileSource(path="/data/my_features.parquet", event_timestamp_column="datetime")
38+
>>> FileSource(path="/data/my_features.parquet", event_timestamp_column="event_timestamp")
3939
"""
4040
if path is None and file_url is None:
4141
raise ValueError(

sdk/python/feast/infra/offline_stores/redshift_source.py

+1-3
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,7 @@ def to_proto(self) -> DataSourceProto:
9090
def validate(self, config: RepoConfig):
9191
# As long as the query gets successfully executed, or the table exists,
9292
# the data source is validated. We don't need the results though.
93-
# TODO: uncomment this
94-
# self.get_table_column_names_and_types(config)
95-
print("Validate", self.get_table_column_names_and_types(config))
93+
self.get_table_column_names_and_types(config)
9694

9795
def get_table_query_string(self) -> str:
9896
"""Returns a string that can directly be used to reference this table in SQL"""

sdk/python/feast/repo_config.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ def _validate_offline_store_config(cls, values):
164164
elif values["provider"] == "gcp":
165165
values["offline_store"]["type"] = "bigquery"
166166
elif values["provider"] == "aws":
167-
values["offline_store"]["type"] = "file"
167+
values["offline_store"]["type"] = "redshift"
168168

169169
offline_store_type = values["offline_store"]["type"]
170170

sdk/python/feast/templates/aws/bootstrap.py

+49-8
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1+
import click
2+
3+
from feast.infra.utils import aws_utils
4+
5+
16
def bootstrap():
27
# Bootstrap() will automatically be called from the init_repo() during `feast init`
38

@@ -6,21 +11,57 @@ def bootstrap():
611

712
from feast.driver_test_data import create_driver_hourly_stats_df
813

9-
repo_path = pathlib.Path(__file__).parent.absolute()
10-
data_path = repo_path / "data"
11-
data_path.mkdir(exist_ok=True)
12-
1314
end_date = datetime.now().replace(microsecond=0, second=0, minute=0)
1415
start_date = end_date - timedelta(days=15)
1516

1617
driver_entities = [1001, 1002, 1003, 1004, 1005]
1718
driver_df = create_driver_hourly_stats_df(driver_entities, start_date, end_date)
1819

19-
driver_stats_path = data_path / "driver_stats.parquet"
20-
driver_df.to_parquet(path=str(driver_stats_path), allow_truncated_timestamps=True)
20+
aws_region = click.prompt("AWS Region (e.g. us-west-2)")
21+
cluster_id = click.prompt("Redshift Cluster ID")
22+
database = click.prompt("Redshift Database Name")
23+
user = click.prompt("Redshift User Name")
24+
s3_staging_location = click.prompt("Redshift S3 Staging Location (s3://*)")
25+
iam_role = click.prompt("Redshift IAM Role for S3 (arn:aws:iam::*:role/*)")
26+
27+
if click.confirm(
28+
"Should I upload example data to Redshift (overwriting 'feast_driver_hourly_stats' table)?",
29+
default=True,
30+
):
31+
client = aws_utils.get_redshift_data_client(aws_region)
32+
s3 = aws_utils.get_s3_resource(aws_region)
33+
34+
aws_utils.execute_redshift_statement(
35+
client,
36+
cluster_id,
37+
database,
38+
user,
39+
"DROP TABLE IF EXISTS feast_driver_hourly_stats",
40+
)
41+
42+
aws_utils.upload_df_to_redshift(
43+
client,
44+
cluster_id,
45+
database,
46+
user,
47+
s3,
48+
f"{s3_staging_location}/data/feast_driver_hourly_stats.parquet",
49+
iam_role,
50+
"feast_driver_hourly_stats",
51+
driver_df,
52+
)
53+
54+
repo_path = pathlib.Path(__file__).parent.absolute()
55+
config_file = repo_path / "feature_store.yaml"
2156

22-
example_py_file = repo_path / "example.py"
23-
replace_str_in_file(example_py_file, "%PARQUET_PATH%", str(driver_stats_path))
57+
replace_str_in_file(config_file, "%AWS_REGION%", aws_region)
58+
replace_str_in_file(config_file, "%REDSHIFT_CLUSTER_ID%", cluster_id)
59+
replace_str_in_file(config_file, "%REDSHIFT_DATABASE%", database)
60+
replace_str_in_file(config_file, "%REDSHIFT_USER%", user)
61+
replace_str_in_file(
62+
config_file, "%REDSHIFT_S3_STAGING_LOCATION%", s3_staging_location
63+
)
64+
replace_str_in_file(config_file, "%REDSHIFT_IAM_ROLE%", iam_role)
2465

2566

2667
def replace_str_in_file(file_path, match_str, sub_str):
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
from datetime import timedelta
2+
3+
from feast import Entity, Feature, FeatureView, RedshiftSource, ValueType
4+
5+
# Define an entity for the driver. Entities can be thought of as primary keys used to
6+
# retrieve features. Entities are also used to join multiple tables/views during the
7+
# construction of feature vectors
8+
driver = Entity(
9+
# Name of the entity. Must be unique within a project
10+
name="driver_id",
11+
# The join key of an entity describes the storage level field/column on which
12+
# features can be looked up. The join key is also used to join feature
13+
# tables/views when building feature vectors
14+
join_key="driver_id",
15+
# The storage level type for an entity
16+
value_type=ValueType.INT64,
17+
)
18+
19+
# Indicates a data source from which feature values can be retrieved. Sources are queried when building training
20+
# datasets or materializing features into an online store.
21+
driver_stats_source = RedshiftSource(
22+
# The Redshift table where features can be found
23+
table="feast_driver_hourly_stats",
24+
# The event timestamp is used for point-in-time joins and for ensuring only
25+
# features within the TTL are returned
26+
event_timestamp_column="event_timestamp",
27+
# The (optional) created timestamp is used to ensure there are no duplicate
28+
# feature rows in the offline store or when building training datasets
29+
created_timestamp_column="created",
30+
)
31+
32+
# Feature views are a grouping based on how features are stored in either the
33+
# online or offline store.
34+
driver_stats_fv = FeatureView(
35+
# The unique name of this feature view. Two feature views in a single
36+
# project cannot have the same name
37+
name="driver_hourly_stats",
38+
# The list of entities specifies the keys required for joining or looking
39+
# up features from this feature view. The reference provided in this field
40+
# correspond to the name of a defined entity (or entities)
41+
entities=["driver_id"],
42+
# The timedelta is the maximum age that each feature value may have
43+
# relative to its lookup time. For historical features (used in training),
44+
# TTL is relative to each timestamp provided in the entity dataframe.
45+
# TTL also allows for eviction of keys from online stores and limits the
46+
# amount of historical scanning required for historical feature values
47+
# during retrieval
48+
ttl=timedelta(weeks=52),
49+
# The list of features defined below act as a schema to both define features
50+
# for both materialization of features into a store, and are used as references
51+
# during retrieval for building a training dataset or serving features
52+
features=[
53+
Feature(name="conv_rate", dtype=ValueType.FLOAT),
54+
Feature(name="acc_rate", dtype=ValueType.FLOAT),
55+
Feature(name="avg_daily_trips", dtype=ValueType.INT64),
56+
],
57+
# Batch sources are used to find feature values. In the case of this feature
58+
# view we will query a source table on Redshift for driver statistics
59+
# features
60+
batch_source=driver_stats_source,
61+
# Tags are user defined key/value pairs that are attached to each
62+
# feature view
63+
tags={"team": "driver_performance"},
64+
)

sdk/python/feast/templates/aws/example.py

-35
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,14 @@
11
project: my_project
22
registry: data/registry.db
33
provider: aws
4+
online_store:
5+
type: dynamodb
6+
region: %AWS_REGION%
7+
offline_store:
8+
type: redshift
9+
cluster_id: %REDSHIFT_CLUSTER_ID%
10+
region: %AWS_REGION%
11+
database: %REDSHIFT_DATABASE%
12+
user: %REDSHIFT_USER%
13+
s3_staging_location: %REDSHIFT_S3_STAGING_LOCATION%
14+
iam_role: %REDSHIFT_IAM_ROLE%

sdk/python/feast/templates/aws/test.py

+34-6
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
from datetime import datetime
1+
from datetime import datetime, timedelta
22

33
import pandas as pd
4-
from example import driver, driver_hourly_stats_view
4+
from driver_repo import driver, driver_stats_fv
55

66
from feast import FeatureStore
77

@@ -15,21 +15,49 @@ def main():
1515

1616
# Deploy the feature store to AWS
1717
print("Deploying feature store to AWS...")
18-
fs.apply([driver, driver_hourly_stats_view])
18+
fs.apply([driver, driver_stats_fv])
1919

2020
# Select features
21-
feature_refs = ["driver_hourly_stats:conv_rate", "driver_hourly_stats:acc_rate"]
21+
features = ["driver_hourly_stats:conv_rate", "driver_hourly_stats:acc_rate"]
2222

23+
# Create an entity dataframe. This is the dataframe that will be enriched with historical features
24+
entity_df = pd.DataFrame(
25+
{
26+
"event_timestamp": [
27+
pd.Timestamp(dt, unit="ms", tz="UTC").round("ms")
28+
for dt in pd.date_range(
29+
start=datetime.now() - timedelta(days=3),
30+
end=datetime.now(),
31+
periods=3,
32+
)
33+
],
34+
"driver_id": [1001, 1002, 1003],
35+
}
36+
)
37+
38+
print("Retrieving training data...")
39+
40+
# Retrieve historical features by joining the entity dataframe to the Redshift table source
41+
training_df = fs.get_historical_features(
42+
features=features, entity_df=entity_df
43+
).to_df()
44+
45+
print()
46+
print(training_df)
47+
48+
print()
2349
print("Loading features into the online store...")
2450
fs.materialize_incremental(end_date=datetime.now())
2551

52+
print()
2653
print("Retrieving online features...")
2754

28-
# Retrieve features from the online store (DynamoDB)
55+
# Retrieve features from the online store (Firestore)
2956
online_features = fs.get_online_features(
30-
features=feature_refs, entity_rows=[{"driver_id": 1001}, {"driver_id": 1002}],
57+
features=features, entity_rows=[{"driver_id": 1001}, {"driver_id": 1002}],
3158
).to_dict()
3259

60+
print()
3361
print(pd.DataFrame.from_dict(online_features))
3462

3563

sdk/python/feast/templates/gcp/driver_repo.py

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@
2020
# datasets or materializing features into an online store.
2121
driver_stats_source = BigQuerySource(
2222
# The BigQuery table where features can be found
23-
table_ref="feast-oss.demo_data.driver_hourly_stats",
23+
table_ref="feast-oss.demo_data.driver_hourly_stats_2",
2424
# The event timestamp is used for point-in-time joins and for ensuring only
2525
# features within the TTL are returned
26-
event_timestamp_column="datetime",
26+
event_timestamp_column="event_timestamp",
2727
# The (optional) created timestamp is used to ensure there are no duplicate
2828
# feature rows in the offline store or when building training datasets
2929
created_timestamp_column="created",

sdk/python/feast/templates/local/example.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
# for more info.
1010
driver_hourly_stats = FileSource(
1111
path="%PARQUET_PATH%",
12-
event_timestamp_column="datetime",
12+
event_timestamp_column="event_timestamp",
1313
created_timestamp_column="created",
1414
)
1515

0 commit comments

Comments
 (0)