Skip to content

Commit bf24595

Browse files
terryyylimTerencewoop
authored and
Oleksii Moskalenko
committed
Fix bug where default project is always used for ingestion (#868)
* Fix ingestion with same featureset name * Address PR comments * Remove unrelated test * Remove unnecessary line Co-authored-by: Terence <terence.limxp@go-jek.com> Co-authored-by: Willem Pienaar <6728866+woop@users.noreply.github.com>
1 parent e11d860 commit bf24595

File tree

2 files changed

+131
-1
lines changed

2 files changed

+131
-1
lines changed

sdk/python/feast/client.py

+25-1
Original file line numberDiff line numberDiff line change
@@ -849,11 +849,33 @@ def ingest(
849849
Returns:
850850
str:
851851
ingestion id for this dataset
852+
853+
Examples:
854+
>>> from feast import Client
855+
>>>
856+
>>> client = Client(core_url="localhost:6565")
857+
>>> fs_df = pd.DataFrame(
858+
>>> {
859+
>>> "datetime": [pd.datetime.now()],
860+
>>> "driver": [1001],
861+
>>> "rating": [4.3],
862+
>>> }
863+
>>> )
864+
>>> client.set_project("project1")
865+
>>> client.ingest("driver", fs_df)
866+
>>>
867+
>>> driver_fs = client.get_feature_set(name="driver", project="project1")
868+
>>> client.ingest(driver_fs, fs_df)
852869
"""
853870

854871
if isinstance(feature_set, FeatureSet):
855872
name = feature_set.name
873+
project = feature_set.project
856874
elif isinstance(feature_set, str):
875+
if self.project is not None:
876+
project = self.project
877+
else:
878+
project = "default"
857879
name = feature_set
858880
else:
859881
raise Exception("Feature set name must be provided")
@@ -871,7 +893,9 @@ def ingest(
871893
while True:
872894
if timeout is not None and time.time() - current_time >= timeout:
873895
raise TimeoutError("Timed out waiting for feature set to be ready")
874-
fetched_feature_set: Optional[FeatureSet] = self.get_feature_set(name)
896+
fetched_feature_set: Optional[FeatureSet] = self.get_feature_set(
897+
name, project
898+
)
875899
if (
876900
fetched_feature_set is not None
877901
and fetched_feature_set.status == FeatureSetStatus.STATUS_READY

tests/e2e/redis/basic-ingest-redis-serving.py

+106
Original file line numberDiff line numberDiff line change
@@ -569,6 +569,112 @@ def try_get_features2():
569569
)
570570

571571

572+
@pytest.mark.timeout(600)
573+
@pytest.mark.run(order=16)
574+
def test_basic_ingest_retrieval_fs(client):
575+
# Set to another project to test ingestion based on current project context
576+
client.set_project(PROJECT_NAME + "_NS1")
577+
driver_fs = FeatureSet(
578+
name="driver_fs",
579+
features=[
580+
Feature(name="driver_fs_rating", dtype=ValueType.FLOAT),
581+
Feature(name="driver_fs_cost", dtype=ValueType.FLOAT),
582+
],
583+
entities=[Entity("driver_fs_id", ValueType.INT64)],
584+
max_age=Duration(seconds=3600),
585+
)
586+
client.apply(driver_fs)
587+
588+
N_ROWS = 2
589+
time_offset = datetime.utcnow().replace(tzinfo=pytz.utc)
590+
driver_df = pd.DataFrame(
591+
{
592+
"datetime": [time_offset] * N_ROWS,
593+
"driver_fs_id": [i for i in range(N_ROWS)],
594+
"driver_fs_rating": [float(i) for i in range(N_ROWS)],
595+
"driver_fs_cost": [float(i) + 0.5 for i in range(N_ROWS)],
596+
}
597+
)
598+
client.ingest(driver_fs, driver_df, timeout=600)
599+
time.sleep(15)
600+
601+
online_request_entity = [{"driver_fs_id": 0}, {"driver_fs_id": 1}]
602+
online_request_features = ["driver_fs_rating", "driver_fs_cost"]
603+
604+
def try_get_features():
605+
response = client.get_online_features(
606+
entity_rows=online_request_entity, feature_refs=online_request_features
607+
)
608+
return response, True
609+
610+
online_features_actual = wait_retry_backoff(
611+
retry_fn=try_get_features,
612+
timeout_secs=90,
613+
timeout_msg="Timed out trying to get online feature values",
614+
)
615+
616+
online_features_expected = {
617+
"driver_fs_id": [0, 1],
618+
"driver_fs_rating": [0.0, 1.0],
619+
"driver_fs_cost": [0.5, 1.5],
620+
}
621+
622+
assert online_features_actual.to_dict() == online_features_expected
623+
624+
625+
@pytest.mark.timeout(600)
626+
@pytest.mark.run(order=17)
627+
def test_basic_ingest_retrieval_str(client):
628+
# Set to another project to test ingestion based on current project context
629+
client.set_project(PROJECT_NAME + "_NS1")
630+
customer_fs = FeatureSet(
631+
name="cust_fs",
632+
features=[
633+
Feature(name="cust_rating", dtype=ValueType.INT64),
634+
Feature(name="cust_cost", dtype=ValueType.FLOAT),
635+
],
636+
entities=[Entity("cust_id", ValueType.INT64)],
637+
max_age=Duration(seconds=3600),
638+
)
639+
client.apply(customer_fs)
640+
641+
N_ROWS = 2
642+
time_offset = datetime.utcnow().replace(tzinfo=pytz.utc)
643+
cust_df = pd.DataFrame(
644+
{
645+
"datetime": [time_offset] * N_ROWS,
646+
"cust_id": [i for i in range(N_ROWS)],
647+
"cust_rating": [i for i in range(N_ROWS)],
648+
"cust_cost": [float(i) + 0.5 for i in range(N_ROWS)],
649+
}
650+
)
651+
client.ingest("cust_fs", cust_df, timeout=600)
652+
time.sleep(15)
653+
654+
online_request_entity = [{"cust_id": 0}, {"cust_id": 1}]
655+
online_request_features = ["cust_rating", "cust_cost"]
656+
657+
def try_get_features():
658+
response = client.get_online_features(
659+
entity_rows=online_request_entity, feature_refs=online_request_features
660+
)
661+
return response, True
662+
663+
online_features_actual = wait_retry_backoff(
664+
retry_fn=try_get_features,
665+
timeout_secs=90,
666+
timeout_msg="Timed out trying to get online feature values",
667+
)
668+
669+
online_features_expected = {
670+
"cust_id": [0, 1],
671+
"cust_rating": [0, 1],
672+
"cust_cost": [0.5, 1.5],
673+
}
674+
675+
assert online_features_actual.to_dict() == online_features_expected
676+
677+
572678
@pytest.fixture(scope="module")
573679
def all_types_dataframe():
574680
return pd.DataFrame(

0 commit comments

Comments
 (0)