Skip to content

Commit c5006c2

Browse files
committed
fix: Fix push sources and add docs / tests pushing via the python feature server (#2561)
* bug: Fixing push API endpoint to include a way to specify whether the registry should be looked up from a cache. Adding docs for feature server usage Signed-off-by: Danny Chiao <danny@tecton.ai> * fix Signed-off-by: Danny Chiao <danny@tecton.ai> * prune out unneeded fields in push source Signed-off-by: Danny Chiao <danny@tecton.ai> * prune out unneeded fields in push source Signed-off-by: Danny Chiao <danny@tecton.ai> * prune out unneeded fields in push source Signed-off-by: Danny Chiao <danny@tecton.ai> * fix comment Signed-off-by: Danny Chiao <danny@tecton.ai> * fix comment Signed-off-by: Danny Chiao <danny@tecton.ai> * fix comment Signed-off-by: Danny Chiao <danny@tecton.ai> * fix comment Signed-off-by: Danny Chiao <danny@tecton.ai> * fix generator Signed-off-by: Danny Chiao <danny@tecton.ai> * add data source creator teardown Signed-off-by: Danny Chiao <danny@tecton.ai> * add data source creator teardown Signed-off-by: Danny Chiao <danny@tecton.ai> * update push source to alpha Signed-off-by: Danny Chiao <danny@tecton.ai> * lint Signed-off-by: Danny Chiao <danny@tecton.ai> * lint Signed-off-by: Danny Chiao <danny@tecton.ai> * lint Signed-off-by: Danny Chiao <danny@tecton.ai> * lint Signed-off-by: Danny Chiao <danny@tecton.ai> * lint Signed-off-by: Danny Chiao <danny@tecton.ai>
1 parent 07ec4c5 commit c5006c2

File tree

15 files changed

+221
-82
lines changed

15 files changed

+221
-82
lines changed

docs/SUMMARY.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@
8181
* [feature\_store.yaml](reference/feature-repository/feature-store-yaml.md)
8282
* [.feastignore](reference/feature-repository/feast-ignore.md)
8383
* [Feature servers](reference/feature-servers/README.md)
84-
* [Local feature server](reference/feature-servers/local-feature-server.md)
84+
* [Python feature server](reference/feature-servers/python-feature-server.md)
8585
* [Go-based feature retrieval](reference/feature-servers/go-feature-retrieval.md)
8686
* [\[Alpha\] Data quality monitoring](reference/dqm.md)
8787
* [\[Alpha\] On demand feature view](reference/alpha-on-demand-feature-view.md)

docs/reference/data-sources/push.md

+6-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
# Push source
22

3+
**Warning**: This is an _experimental_ feature. It's intended for early testing and feedback, and could change without warnings in future releases.
4+
35
## Description
46

57
Push sources allow feature values to be pushed to the online store in real time. This allows fresh feature values to be made available to applications. Push sources supercede the
@@ -31,18 +33,14 @@ from feast.types import Int64
3133

3234
push_source = PushSource(
3335
name="push_source",
34-
schema=[
35-
Field(name="user_id", dtype=Int64),
36-
Field(name="life_time_value", dtype=Int64)
37-
],
3836
batch_source=BigQuerySource(table="test.test"),
3937
)
4038

4139
fv = FeatureView(
4240
name="feature view",
4341
entities=["user_id"],
4442
schema=[Field(name="life_time_value", dtype=Int64)],
45-
stream_source=push_source,
43+
source=push_source,
4644
)
4745
```
4846

@@ -53,6 +51,8 @@ import pandas as pd
5351

5452
fs = FeatureStore(...)
5553
feature_data_frame = pd.DataFrame()
56-
fs.push("push_source", feature_data_frame)
54+
fs.push("push_source_name", feature_data_frame)
5755
```
5856

57+
See also [Python feature server](../feature-servers/python-feature-server.md) for instructions on how to push data to a deployed feature server.
58+

docs/reference/feature-servers/README.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@
22

33
Feast users can choose to retrieve features from a feature server, as opposed to through the Python SDK.
44

5-
{% page-ref page="local-feature-server.md" %}
5+
{% page-ref page="python-feature-server.md" %}

docs/reference/feature-servers/go-feature-retrieval.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
## Overview
44

5-
The Go Feature Retrieval component is a Go implementation of the core feature serving logic, embedded in the Python SDK. It supports retrieval of feature references, feature services, and on demand feature views, and can be used either through the Python SDK or the [Python feature server](local-feature-server.md).
5+
The Go Feature Retrieval component is a Go implementation of the core feature serving logic, embedded in the Python SDK. It supports retrieval of feature references, feature services, and on demand feature views, and can be used either through the Python SDK or the [Python feature server](python-feature-server.md).
66

77
Currently, this component only supports online serving and does not have an offline component including APIs to create feast feature repositories or apply configuration to the registry to facilitate online materialization. It also does not expose its own dedicated cli to perform feast actions. Furthermore, this component is only meant to expose an online serving API that can be called through the python SDK to facilitate faster online feature retrieval.
88

docs/reference/feature-servers/local-feature-server.md docs/reference/feature-servers/python-feature-server.md

+54-3
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,23 @@
1-
# Local feature server
1+
# Python feature server
22

33
## Overview
44

5-
The local feature server is an HTTP endpoint that serves features with JSON I/O. This enables users to get features from Feast using any programming language that can make HTTP requests. A [remote feature server](../alpha-aws-lambda-feature-server.md) on AWS Lambda is also available. A remote feature server on GCP Cloud Run is currently being developed.
5+
The feature server is an HTTP endpoint that serves features with JSON I/O. This enables users to write + read features from Feast online stores using any programming language that can make HTTP requests.
66

77
## CLI
88

9-
There is a new CLI command that starts the server: `feast serve`. By default Feast uses port 6566; the port be overridden by a `--port` flag.
9+
There is a CLI command that starts the server: `feast serve`. By default, Feast uses port 6566; the port be overridden by a `--port` flag.
10+
11+
## Deploying as a service
12+
13+
One can also deploy a feature server by building a docker image that bundles in the project's `feature_store.yaml`. See [helm chart](https://github.com/feast-dev/feast/blob/master/infra/charts/feast-python-server) for example.
14+
15+
A [remote feature server](../alpha-aws-lambda-feature-server.md) on AWS Lambda is available. A remote feature server on GCP Cloud Run is currently being developed.
16+
1017

1118
## Example
1219

20+
### Initializing a feature server
1321
Here's the local feature server usage example with the local template:
1422

1523
```bash
@@ -41,6 +49,7 @@ INFO: Uvicorn running on http://127.0.0.1:6566 (Press CTRL+C to quit)
4149
09/10/2021 10:42:11 AM INFO:Uvicorn running on http://127.0.0.1:6566 (Press CTRL+C to quit)
4250
```
4351
52+
### Retrieving features from the online store
4453
After the server starts, we can execute cURL commands from another terminal tab:
4554
4655
```bash
@@ -142,3 +151,45 @@ curl -X POST \
142151
}
143152
}' | jq
144153
```
154+
155+
### Pushing features to the online store
156+
You can push data corresponding to a push source to the online store (note that timestamps need to be strings):
157+
158+
```text
159+
curl -X POST "http://localhost:6566/push" -d '{
160+
"push_source_name": "driver_hourly_stats_push_source",
161+
"df": {
162+
"driver_id": [1001],
163+
"event_timestamp": ["2022-05-13 10:59:42"],
164+
"created": ["2022-05-13 10:59:42"],
165+
"conv_rate": [1.0],
166+
"acc_rate": [1.0],
167+
"avg_daily_trips": [1000]
168+
}
169+
}' | jq
170+
```
171+
172+
or equivalently from Python:
173+
```python
174+
import json
175+
import requests
176+
import pandas as pd
177+
from datetime import datetime
178+
179+
event_dict = {
180+
"driver_id": [1001],
181+
"event_timestamp": [str(datetime(2021, 5, 13, 10, 59, 42))],
182+
"created": [str(datetime(2021, 5, 13, 10, 59, 42))],
183+
"conv_rate": [1.0],
184+
"acc_rate": [1.0],
185+
"avg_daily_trips": [1000],
186+
"string_feature": "test2",
187+
}
188+
push_data = {
189+
"push_source_name":"driver_stats_push_source",
190+
"df":event_dict
191+
}
192+
requests.post(
193+
"http://localhost:6566/push",
194+
data=json.dumps(push_data))
195+
```

protos/feast/core/DataSource.proto

+1-2
Original file line numberDiff line numberDiff line change
@@ -222,8 +222,7 @@ message DataSource {
222222
// Defines options for DataSource that supports pushing data to it. This allows data to be pushed to
223223
// the online store on-demand, such as by stream consumers.
224224
message PushOptions {
225-
// Mapping of feature name to type
226-
map<string, feast.types.ValueType.Enum> schema = 1;
225+
reserved 1;
227226
}
228227

229228

sdk/python/feast/data_source.py

+3-26
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121

2222
from feast import type_map
2323
from feast.data_format import StreamFormat
24-
from feast.field import Field, from_value_type
24+
from feast.field import Field
2525
from feast.protos.feast.core.DataSource_pb2 import DataSource as DataSourceProto
2626
from feast.repo_config import RepoConfig, get_data_source_class_from_type
2727
from feast.types import VALUE_TYPES_TO_FEAST_TYPES
@@ -714,45 +714,35 @@ class PushSource(DataSource):
714714
A source that can be used to ingest features on request
715715
"""
716716

717-
name: str
718-
schema: List[Field]
717+
# TODO(adchia): consider adding schema here in case where Feast manages pushing events to the offline store
718+
# TODO(adchia): consider a "mode" to support pushing raw vs transformed events
719719
batch_source: DataSource
720-
timestamp_field: str
721720

722721
def __init__(
723722
self,
724723
*,
725724
name: str,
726-
schema: List[Field],
727725
batch_source: DataSource,
728726
description: Optional[str] = "",
729727
tags: Optional[Dict[str, str]] = None,
730728
owner: Optional[str] = "",
731-
timestamp_field: Optional[str] = "",
732729
):
733730
"""
734731
Creates a PushSource object.
735732
Args:
736733
name: Name of the push source
737-
schema: Schema mapping from the input feature name to a ValueType
738734
batch_source: The batch source that backs this push source. It's used when materializing from the offline
739735
store to the online store, and when retrieving historical features.
740736
description (optional): A human-readable description.
741737
tags (optional): A dictionary of key-value pairs to store arbitrary metadata.
742738
owner (optional): The owner of the data source, typically the email of the primary
743739
maintainer.
744-
timestamp_field (optional): Event timestamp foe;d used for point in time
745-
joins of feature values.
746740
747741
"""
748742
super().__init__(name=name, description=description, tags=tags, owner=owner)
749-
self.schema = sorted(schema) # TODO: add schema inference from a batch source
750743
self.batch_source = batch_source
751744
if not self.batch_source:
752745
raise ValueError(f"batch_source is needed for push source {self.name}")
753-
if not timestamp_field:
754-
raise ValueError(f"timestamp field is needed for push source {self.name}")
755-
self.timestamp_field = timestamp_field
756746

757747
def validate(self, config: RepoConfig):
758748
pass
@@ -764,38 +754,25 @@ def get_table_column_names_and_types(
764754

765755
@staticmethod
766756
def from_proto(data_source: DataSourceProto):
767-
schema_pb = data_source.push_options.schema
768-
schema = []
769-
for key, val in schema_pb.items():
770-
schema.append(Field(name=key, dtype=from_value_type(ValueType(val))))
771-
772757
assert data_source.HasField("batch_source")
773758
batch_source = DataSource.from_proto(data_source.batch_source)
774759

775760
return PushSource(
776761
name=data_source.name,
777-
schema=sorted(schema),
778762
batch_source=batch_source,
779-
timestamp_field=data_source.timestamp_field,
780763
description=data_source.description,
781764
tags=dict(data_source.tags),
782765
owner=data_source.owner,
783766
)
784767

785768
def to_proto(self) -> DataSourceProto:
786-
schema_pb = {}
787-
for field in self.schema:
788-
schema_pb[field.name] = field.dtype.to_value_type().value
789769
batch_source_proto = None
790770
if self.batch_source:
791771
batch_source_proto = self.batch_source.to_proto()
792772

793-
options = DataSourceProto.PushOptions(schema=schema_pb,)
794773
data_source_proto = DataSourceProto(
795774
name=self.name,
796775
type=DataSourceProto.PUSH_SOURCE,
797-
push_options=options,
798-
timestamp_field=self.timestamp_field,
799776
description=self.description,
800777
tags=self.tags,
801778
owner=self.owner,

sdk/python/feast/feature_server.py

+1-3
Original file line numberDiff line numberDiff line change
@@ -94,9 +94,7 @@ def push(body=Depends(get_body)):
9494
@app.post("/write-to-online-store")
9595
def write_to_online_store(body=Depends(get_body)):
9696
warnings.warn(
97-
"write_to_online_store is an experimental feature. "
98-
"This API is unstable and it could be changed in the future. "
99-
"We do not guarantee that future changes will maintain backward compatibility.",
97+
"write_to_online_store is deprecated. Please consider using /push instead",
10098
RuntimeWarning,
10199
)
102100
try:

sdk/python/feast/feature_store.py

+14-4
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,6 @@
9292

9393
warnings.simplefilter("once", DeprecationWarning)
9494

95-
9695
if TYPE_CHECKING:
9796
from feast.embedded_go.online_features_service import EmbeddedOnlineFeatureServer
9897

@@ -1186,16 +1185,25 @@ def tqdm_builder(length):
11861185
)
11871186

11881187
@log_exceptions_and_usage
1189-
def push(self, push_source_name: str, df: pd.DataFrame):
1188+
def push(
1189+
self, push_source_name: str, df: pd.DataFrame, allow_registry_cache: bool = True
1190+
):
11901191
"""
11911192
Push features to a push source. This updates all the feature views that have the push source as stream source.
11921193
Args:
11931194
push_source_name: The name of the push source we want to push data to.
11941195
df: the data being pushed.
1196+
allow_registry_cache: whether to allow cached versions of the registry.
11951197
"""
1198+
warnings.warn(
1199+
"Push source is an experimental feature. "
1200+
"This API is unstable and it could and might change in the future. "
1201+
"We do not guarantee that future changes will maintain backward compatibility.",
1202+
RuntimeWarning,
1203+
)
11961204
from feast.data_source import PushSource
11971205

1198-
all_fvs = self.list_feature_views(allow_cache=True)
1206+
all_fvs = self.list_feature_views(allow_cache=allow_registry_cache)
11991207

12001208
fvs_with_push_sources = {
12011209
fv
@@ -1208,7 +1216,9 @@ def push(self, push_source_name: str, df: pd.DataFrame):
12081216
}
12091217

12101218
for fv in fvs_with_push_sources:
1211-
self.write_to_online_store(fv.name, df, allow_registry_cache=True)
1219+
self.write_to_online_store(
1220+
fv.name, df, allow_registry_cache=allow_registry_cache
1221+
)
12121222

12131223
@log_exceptions_and_usage
12141224
def write_to_online_store(

sdk/python/feast/inference.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from typing import List
33

44
from feast import BigQuerySource, Entity, FileSource, RedshiftSource, SnowflakeSource
5-
from feast.data_source import DataSource, RequestSource
5+
from feast.data_source import DataSource, PushSource, RequestSource
66
from feast.errors import RegistryInferenceFailure
77
from feast.feature_view import FeatureView
88
from feast.field import Field, from_value_type
@@ -74,6 +74,8 @@ def update_data_sources_with_inferred_event_timestamp_col(
7474
for data_source in data_sources:
7575
if isinstance(data_source, RequestSource):
7676
continue
77+
if isinstance(data_source, PushSource):
78+
data_source = data_source.batch_source
7779
if data_source.timestamp_field is None or data_source.timestamp_field == "":
7880
# prepare right match pattern for data source
7981
ts_column_type_regex_pattern = ""

sdk/python/feast/infra/online_stores/sqlite.py

+3-1
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,9 @@ def teardown(
230230
def _initialize_conn(db_path: str):
231231
Path(db_path).parent.mkdir(exist_ok=True)
232232
return sqlite3.connect(
233-
db_path, detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES,
233+
db_path,
234+
detect_types=sqlite3.PARSE_DECLTYPES | sqlite3.PARSE_COLNAMES,
235+
check_same_thread=False,
234236
)
235237

236238

sdk/python/tests/example_repos/example_feature_repo_1.py

+1-8
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,7 @@
4040
)
4141

4242
driver_locations_push_source = PushSource(
43-
name="driver_locations_push",
44-
schema=[
45-
Field(name="driver_id", dtype=String),
46-
Field(name="driver_lat", dtype=Float32),
47-
Field(name="driver_long", dtype=String),
48-
],
49-
batch_source=driver_locations_source,
50-
timestamp_field="event_timestamp",
43+
name="driver_locations_push", batch_source=driver_locations_source,
5144
)
5245

5346
driver = Entity(

0 commit comments

Comments
 (0)