Skip to content

Commit 989ce08

Browse files
yelo-bloodadchia
andauthored
feat: Feast AWS Athena offline store (again) (#3044)
* fixed bugs, cleaned code, added AthenaDataSourceCreator Signed-off-by: Youngkyu OH <toping4445@gmail.com> * fixed bugs, cleaned code, added some methods. test_universal_historical_retrieval - 100% passed Signed-off-by: Youngkyu OH <toping4445@gmail.com> * fixed bugs to pass test_validation Signed-off-by: Youngkyu OH <toping4445@gmail.com> * changed boolean data type mapping Signed-off-by: Youngkyu OH <toping4445@gmail.com> * 1.added test-python-universal-athena in Makefile 2.replaced database,bucket_name hardcoding to variable in AthenaDataSourceCreator Signed-off-by: Youngkyu OH <toping4445@gmail.com> * format,run lint Signed-off-by: Youngkyu OH <toping4445@gmail.com> * revert merge changes Signed-off-by: Danny Chiao <danny@tecton.ai> * add entity_key_serialization Signed-off-by: Danny Chiao <danny@tecton.ai> * restore deleted file Signed-off-by: Danny Chiao <danny@tecton.ai> * modified confusing environment variable names, added how to use Athena Signed-off-by: Youngkyu OH <toping4445@gmail.com> * enforce AthenaSource to have a name Signed-off-by: Youngkyu OH <toping4445@gmail.com> Co-authored-by: toping4445 <yelo.blood@kakaopaycorp.com> Co-authored-by: Danny Chiao <danny@tecton.ai>
1 parent 32d2039 commit 989ce08

20 files changed

+1782
-2
lines changed

Makefile

+27
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,33 @@ test-python-universal-trino:
139139
not test_universal_types" \
140140
sdk/python/tests
141141

142+
#To use Athena as an offline store, you need to create an Athena database and an S3 bucket on AWS. https://docs.aws.amazon.com/athena/latest/ug/getting-started.html
143+
#Modify environment variables ATHENA_DATA_SOURCE, ATHENA_DATABASE, ATHENA_S3_BUCKET_NAME if you want to change the data source, database, and bucket name of S3 to use.
144+
#If tests fail with the pytest -n 8 option, change the number to 1.
145+
test-python-universal-athena:
146+
PYTHONPATH='.' \
147+
FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.offline_stores.contrib.athena_repo_configuration \
148+
PYTEST_PLUGINS=feast.infra.offline_stores.contrib.athena_offline_store.tests \
149+
FEAST_USAGE=False IS_TEST=True \
150+
ATHENA_DATA_SOURCE=AwsDataCatalog \
151+
ATHENA_DATABASE=default \
152+
ATHENA_S3_BUCKET_NAME=feast-integration-tests \
153+
python -m pytest -n 8 --integration \
154+
-k "not test_go_feature_server and \
155+
not test_logged_features_validation and \
156+
not test_lambda and \
157+
not test_feature_logging and \
158+
not test_offline_write and \
159+
not test_push_offline and \
160+
not test_historical_retrieval_with_validation and \
161+
not test_historical_features_persisting and \
162+
not test_historical_retrieval_fails_on_validation and \
163+
not gcs_registry and \
164+
not s3_registry" \
165+
sdk/python/tests
166+
167+
168+
142169
test-python-universal-postgres:
143170
PYTHONPATH='.' \
144171
FULL_REPO_CONFIGS_MODULE=sdk.python.feast.infra.offline_stores.contrib.postgres_repo_configuration \

protos/feast/core/DataSource.proto

+18
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ message DataSource {
4949
PUSH_SOURCE = 9;
5050
BATCH_TRINO = 10;
5151
BATCH_SPARK = 11;
52+
BATCH_ATHENA = 12;
5253
}
5354

5455
// Unique name of data source within the project
@@ -171,6 +172,22 @@ message DataSource {
171172
string database = 4;
172173
}
173174

175+
// Defines options for DataSource that sources features from a Athena Query
176+
message AthenaOptions {
177+
// Athena table name
178+
string table = 1;
179+
180+
// SQL query that returns a table containing feature data. Must contain an event_timestamp column, and respective
181+
// entity columns
182+
string query = 2;
183+
184+
// Athena database name
185+
string database = 3;
186+
187+
// Athena schema name
188+
string data_source = 4;
189+
}
190+
174191
// Defines options for DataSource that sources features from a Snowflake Query
175192
message SnowflakeOptions {
176193
// Snowflake table name
@@ -242,5 +259,6 @@ message DataSource {
242259
PushOptions push_options = 22;
243260
SparkOptions spark_options = 27;
244261
TrinoOptions trino_options = 30;
262+
AthenaOptions athena_options = 35;
245263
}
246264
}

protos/feast/core/FeatureService.proto

+6
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ message LoggingConfig {
6060
RedshiftDestination redshift_destination = 5;
6161
SnowflakeDestination snowflake_destination = 6;
6262
CustomDestination custom_destination = 7;
63+
AthenaDestination athena_destination = 8;
6364
}
6465

6566
message FileDestination {
@@ -80,6 +81,11 @@ message LoggingConfig {
8081
string table_name = 1;
8182
}
8283

84+
message AthenaDestination {
85+
// Destination table name. data_source and database will be taken from an offline store config
86+
string table_name = 1;
87+
}
88+
8389
message SnowflakeDestination {
8490
// Destination table name. Schema and database will be taken from an offline store config
8591
string table_name = 1;

protos/feast/core/SavedDataset.proto

+1
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ message SavedDatasetStorage {
5959
DataSource.TrinoOptions trino_storage = 8;
6060
DataSource.SparkOptions spark_storage = 9;
6161
DataSource.CustomSourceOptions custom_storage = 10;
62+
DataSource.AthenaOptions athena_storage = 11;
6263
}
6364
}
6465

sdk/python/feast/__init__.py

+4
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
from importlib_metadata import PackageNotFoundError, version as _version # type: ignore
66

77
from feast.infra.offline_stores.bigquery_source import BigQuerySource
8+
from feast.infra.offline_stores.contrib.athena_offline_store.athena_source import (
9+
AthenaSource,
10+
)
811
from feast.infra.offline_stores.file_source import FileSource
912
from feast.infra.offline_stores.redshift_source import RedshiftSource
1013
from feast.infra.offline_stores.snowflake_source import SnowflakeSource
@@ -50,4 +53,5 @@
5053
"SnowflakeSource",
5154
"PushSource",
5255
"RequestSource",
56+
"AthenaSource",
5357
]

sdk/python/feast/batch_feature_view.py

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
"SnowflakeSource",
1515
"SparkSource",
1616
"TrinoSource",
17+
"AthenaSource",
1718
}
1819

1920

sdk/python/feast/data_source.py

+9
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,7 @@ def to_proto(self) -> DataSourceProto.KinesisOptions:
156156
DataSourceProto.SourceType.BATCH_SNOWFLAKE: "feast.infra.offline_stores.snowflake_source.SnowflakeSource",
157157
DataSourceProto.SourceType.BATCH_TRINO: "feast.infra.offline_stores.contrib.trino_offline_store.trino_source.TrinoSource",
158158
DataSourceProto.SourceType.BATCH_SPARK: "feast.infra.offline_stores.contrib.spark_offline_store.spark_source.SparkSource",
159+
DataSourceProto.SourceType.BATCH_ATHENA: "feast.infra.offline_stores.contrib.athena_offline_store.athena_source.AthenaSource",
159160
DataSourceProto.SourceType.STREAM_KAFKA: "feast.data_source.KafkaSource",
160161
DataSourceProto.SourceType.STREAM_KINESIS: "feast.data_source.KinesisSource",
161162
DataSourceProto.SourceType.REQUEST_SOURCE: "feast.data_source.RequestSource",
@@ -183,6 +184,7 @@ class DataSource(ABC):
183184
maintainer.
184185
timestamp_field (optional): Event timestamp field used for point in time
185186
joins of feature values.
187+
date_partition_column (optional): Timestamp column used for partitioning. Not supported by all offline stores.
186188
"""
187189

188190
name: str
@@ -192,6 +194,7 @@ class DataSource(ABC):
192194
description: str
193195
tags: Dict[str, str]
194196
owner: str
197+
date_partition_column: str
195198

196199
def __init__(
197200
self,
@@ -203,6 +206,7 @@ def __init__(
203206
description: Optional[str] = "",
204207
tags: Optional[Dict[str, str]] = None,
205208
owner: Optional[str] = "",
209+
date_partition_column: Optional[str] = None,
206210
):
207211
"""
208212
Creates a DataSource object.
@@ -220,6 +224,7 @@ def __init__(
220224
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
221225
owner (optional): The owner of the data source, typically the email of the primary
222226
maintainer.
227+
date_partition_column (optional): Timestamp column used for partitioning. Not supported by all stores
223228
"""
224229
self.name = name
225230
self.timestamp_field = timestamp_field or ""
@@ -237,6 +242,9 @@ def __init__(
237242
self.description = description or ""
238243
self.tags = tags or {}
239244
self.owner = owner or ""
245+
self.date_partition_column = (
246+
date_partition_column if date_partition_column else ""
247+
)
240248

241249
def __hash__(self):
242250
return hash((self.name, self.timestamp_field))
@@ -256,6 +264,7 @@ def __eq__(self, other):
256264
or self.timestamp_field != other.timestamp_field
257265
or self.created_timestamp_column != other.created_timestamp_column
258266
or self.field_mapping != other.field_mapping
267+
or self.date_partition_column != other.date_partition_column
259268
or self.description != other.description
260269
or self.tags != other.tags
261270
or self.owner != other.owner

sdk/python/feast/infra/offline_stores/contrib/athena_offline_store/__init__.py

Whitespace-only changes.

0 commit comments

Comments
 (0)