Skip to content

Commit 12aafc2

Browse files
kevjumbaadchia
authored andcommitted
feat: Feast Spark Offline Store (feast-dev#2349)
* State of feast Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Clean up changes Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix random incorrect changes Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix lint Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix build errors Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix lint Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Add spark offline store components to test against current integration tests Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix lint Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Rename to pass checks Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix issues Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix type checking issues Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix lint Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Clean up print statements for first review Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix lint Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix flake 8 lint tests Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Add warnings for alpha version release Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Format Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Address review Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Address review Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix lint Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Add file store functionality Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * lint Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Add example feature repo Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Update data source creator Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Make cli work for feast init with spark Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Update the docs Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Clean up code Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Clean up more code Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Uncomment repo configs Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix setup.py Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Update dependencies Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix ci dependencies Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Screwed up rebase Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Screwed up rebase Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Screwed up rebase Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Realign with master Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix accidental changes Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Make type map change cleaner Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Address review comments Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix tests accidentally broken Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Add comments Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Reformat Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix logger Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Remove unused imports Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix imports Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix CI dependencies Signed-off-by: Danny Chiao <danny@tecton.ai> * Prefix destinations with project name Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Update comment Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix 3.8 Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * temporary fix Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * rollback Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * update Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Update ci? Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Move third party to contrib Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Fix imports Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Remove third_party refactor Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Revert ci requirements and update comment in type map Signed-off-by: Kevin Zhang <kzhang@tecton.ai> * Revert 3.8-requirements Signed-off-by: Kevin Zhang <kzhang@tecton.ai> Co-authored-by: Danny Chiao <danny@tecton.ai> Signed-off-by: Achal Shah <achals@gmail.com>
1 parent 7e85d6c commit 12aafc2

File tree

4 files changed

+10
-26
lines changed

4 files changed

+10
-26
lines changed

docs/reference/data-sources/spark.md

+3-9
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@ The spark data source API allows for the retrieval of historical feature values
1313
Using a table reference from SparkSession(for example, either in memory or a Hive Metastore)
1414

1515
```python
16-
from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import (
17-
SparkSource,
18-
)
16+
from feast import SparkSource
1917

2018
my_spark_source = SparkSource(
2119
table="FEATURE_TABLE",
@@ -25,9 +23,7 @@ my_spark_source = SparkSource(
2523
Using a query
2624

2725
```python
28-
from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import (
29-
SparkSource,
30-
)
26+
from feast import SparkSource
3127

3228
my_spark_source = SparkSource(
3329
query="SELECT timestamp as ts, created, f1, f2 "
@@ -38,9 +34,7 @@ my_spark_source = SparkSource(
3834
Using a file reference
3935

4036
```python
41-
from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import (
42-
SparkSource,
43-
)
37+
from feast import SparkSource
4438

4539
my_spark_source = SparkSource(
4640
path=f"{CURRENT_DIR}/data/driver_hourly_stats",

sdk/python/feast/__init__.py

+4
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,9 @@
33
from pkg_resources import DistributionNotFound, get_distribution
44

55
from feast.infra.offline_stores.bigquery_source import BigQuerySource
6+
from feast.infra.offline_stores.contrib.spark_offline_store.spark_source import (
7+
SparkSource,
8+
)
69
from feast.infra.offline_stores.file_source import FileSource
710
from feast.infra.offline_stores.redshift_source import RedshiftSource
811
from feast.infra.offline_stores.snowflake_source import SnowflakeSource
@@ -47,4 +50,5 @@
4750
"RedshiftSource",
4851
"RequestFeatureView",
4952
"SnowflakeSource",
53+
"SparkSource",
5054
]

sdk/python/feast/inference.py

+3-5
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
FileSource,
99
RedshiftSource,
1010
SnowflakeSource,
11+
SparkSource,
1112
)
1213
from feast.data_source import DataSource, RequestDataSource
1314
from feast.errors import RegistryInferenceFailure
@@ -86,10 +87,8 @@ def update_data_sources_with_inferred_event_timestamp_col(
8687
):
8788
# prepare right match pattern for data source
8889
ts_column_type_regex_pattern = ""
89-
# TODO(adchia): Move Spark source inference out of this logic
90-
if (
91-
isinstance(data_source, FileSource)
92-
or "SparkSource" == data_source.__class__.__name__
90+
if isinstance(data_source, FileSource) or isinstance(
91+
data_source, SparkSource
9392
):
9493
ts_column_type_regex_pattern = r"^timestamp"
9594
elif isinstance(data_source, BigQuerySource):
@@ -104,7 +103,6 @@ def update_data_sources_with_inferred_event_timestamp_col(
104103
f"""
105104
DataSource inferencing of event_timestamp_column is currently only supported
106105
for FileSource, SparkSource, BigQuerySource, RedshiftSource, and SnowflakeSource.
107-
Attempting to infer from {data_source}.
108106
""",
109107
)
110108
# for informing the type checker

sdk/python/feast/infra/offline_stores/contrib/spark_offline_store/spark_source.py

-12
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88
from pyspark.sql import SparkSession
99

1010
from feast.data_source import DataSource
11-
from feast.errors import DataSourceNoNameException
1211
from feast.infra.offline_stores.offline_utils import get_temp_entity_table_name
1312
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
1413
from feast.protos.feast.core.SavedDataset_pb2 import (
@@ -31,7 +30,6 @@ class SparkSourceFormat(Enum):
3130
class SparkSource(DataSource):
3231
def __init__(
3332
self,
34-
name: Optional[str] = None,
3533
table: Optional[str] = None,
3634
query: Optional[str] = None,
3735
path: Optional[str] = None,
@@ -41,15 +39,7 @@ def __init__(
4139
field_mapping: Optional[Dict[str, str]] = None,
4240
date_partition_column: Optional[str] = None,
4341
):
44-
# If no name, use the table_ref as the default name
45-
_name = name
46-
if not _name:
47-
if table:
48-
_name = table
49-
else:
50-
raise DataSourceNoNameException()
5142
super().__init__(
52-
_name,
5343
event_timestamp_column,
5444
created_timestamp_column,
5545
field_mapping,
@@ -116,7 +106,6 @@ def from_proto(data_source: DataSourceProto) -> Any:
116106

117107
spark_options = SparkOptions.from_proto(data_source.custom_options)
118108
return SparkSource(
119-
name=data_source.name,
120109
field_mapping=dict(data_source.field_mapping),
121110
table=spark_options.table,
122111
query=spark_options.query,
@@ -129,7 +118,6 @@ def from_proto(data_source: DataSourceProto) -> Any:
129118

130119
def to_proto(self) -> DataSourceProto:
131120
data_source_proto = DataSourceProto(
132-
name=self.name,
133121
type=DataSourceProto.CUSTOM_SOURCE,
134122
field_mapping=self.field_mapping,
135123
custom_options=self.spark_options.to_proto(),

0 commit comments

Comments
 (0)