Skip to content

Commit ab78702

Browse files
fix: Fix materialization with ttl=0 bug (#2666)
* Fix materialization with ttl=0 bug Signed-off-by: Felix Wang <wangfelix98@gmail.com> * Add TODO Signed-off-by: Felix Wang <wangfelix98@gmail.com>
1 parent 44ca9f5 commit ab78702

File tree

3 files changed

+74
-7
lines changed

3 files changed

+74
-7
lines changed

sdk/python/feast/feature_store.py

+11-2
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
import os
1717
import warnings
1818
from collections import Counter, defaultdict
19-
from datetime import datetime
19+
from datetime import datetime, timedelta
2020
from pathlib import Path
2121
from typing import (
2222
TYPE_CHECKING,
@@ -1080,7 +1080,16 @@ def materialize_incremental(
10801080
f"No start time found for feature view {feature_view.name}. materialize_incremental() requires"
10811081
f" either a ttl to be set or for materialize() to have been run at least once."
10821082
)
1083-
start_date = datetime.utcnow() - feature_view.ttl
1083+
elif feature_view.ttl.total_seconds() > 0:
1084+
start_date = datetime.utcnow() - feature_view.ttl
1085+
else:
1086+
# TODO(felixwang9817): Find the earliest timestamp for this specific feature
1087+
# view from the offline store, and set the start date to that timestamp.
1088+
print(
1089+
f"Since the ttl is 0 for feature view {Style.BRIGHT + Fore.GREEN}{feature_view.name}{Style.RESET_ALL}, "
1090+
"the start date will be set to 1 year before the current time."
1091+
)
1092+
start_date = datetime.utcnow() - timedelta(weeks=52)
10841093
provider = self._get_provider()
10851094
print(
10861095
f"{Style.BRIGHT + Fore.GREEN}{feature_view.name}{Style.RESET_ALL}"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
from datetime import timedelta
2+
3+
from feast import Entity, FeatureView, Field, FileSource, ValueType
4+
from feast.types import Float32, Int32, Int64
5+
6+
driver_hourly_stats = FileSource(
7+
path="%PARQUET_PATH%", # placeholder to be replaced by the test
8+
timestamp_field="event_timestamp",
9+
created_timestamp_column="created",
10+
)
11+
12+
driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id")
13+
14+
15+
driver_hourly_stats_view = FeatureView(
16+
name="driver_hourly_stats",
17+
entities=[driver],
18+
ttl=timedelta(days=0),
19+
schema=[
20+
Field(name="conv_rate", dtype=Float32),
21+
Field(name="acc_rate", dtype=Float32),
22+
Field(name="avg_daily_trips", dtype=Int64),
23+
],
24+
online=True,
25+
source=driver_hourly_stats,
26+
tags={},
27+
)
28+
29+
30+
global_daily_stats = FileSource(
31+
path="%PARQUET_PATH_GLOBAL%", # placeholder to be replaced by the test
32+
timestamp_field="event_timestamp",
33+
created_timestamp_column="created",
34+
)
35+
36+
37+
global_stats_feature_view = FeatureView(
38+
name="global_daily_stats",
39+
entities=[],
40+
ttl=timedelta(days=0),
41+
schema=[
42+
Field(name="num_rides", dtype=Int32),
43+
Field(name="avg_ride_length", dtype=Float32),
44+
],
45+
online=True,
46+
source=global_daily_stats,
47+
tags={},
48+
)

sdk/python/tests/integration/online_store/test_e2e_local.py

+15-5
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,12 @@ def _test_materialize_and_online_retrieval(
101101

102102
def test_e2e_local() -> None:
103103
"""
104-
A more comprehensive than "basic" test, using local provider.
104+
Tests the end-to-end workflow of apply, materialize, and online retrieval.
105105
106-
1. Create a repo.
107-
2. Apply
108-
3. Ingest some data to online store from parquet
109-
4. Read from the online store to make sure it made it there.
106+
This test runs against several different types of repos:
107+
1. A repo with a normal FV and an entity-less FV.
108+
2. A repo using the SDK from version 0.19.0.
109+
3. A repo with a FV with a ttl of 0.
110110
"""
111111
runner = CliRunner()
112112
with tempfile.TemporaryDirectory() as data_dir:
@@ -143,6 +143,16 @@ def test_e2e_local() -> None:
143143
runner, store, start_date, end_date, driver_df
144144
)
145145

146+
with runner.local_repo(
147+
get_example_repo("example_feature_repo_with_ttl_0.py")
148+
.replace("%PARQUET_PATH%", driver_stats_path)
149+
.replace("%PARQUET_PATH_GLOBAL%", global_stats_path),
150+
"file",
151+
) as store:
152+
_test_materialize_and_online_retrieval(
153+
runner, store, start_date, end_date, driver_df
154+
)
155+
146156
# Test a failure case when the parquet file doesn't include a join key
147157
with runner.local_repo(
148158
get_example_repo("example_feature_repo_with_entity_join_key.py").replace(

0 commit comments

Comments
 (0)