diff --git a/sdk/python/tests/integration/online_store/test_universal_online.py b/sdk/python/tests/integration/online_store/test_universal_online.py index 9beba4d72b..4822a8d4f7 100644 --- a/sdk/python/tests/integration/online_store/test_universal_online.py +++ b/sdk/python/tests/integration/online_store/test_universal_online.py @@ -1,3 +1,4 @@ +import asyncio import datetime import os import time @@ -12,6 +13,7 @@ import requests from botocore.exceptions import BotoCoreError +from feast import FeatureStore from feast.entity import Entity from feast.errors import FeatureNameCollisionError from feast.feature_service import FeatureService @@ -400,19 +402,15 @@ def test_online_retrieval_with_shared_batch_source(environment, universal_data_s ) -@pytest.mark.integration -@pytest.mark.universal_online_stores -@pytest.mark.parametrize("full_feature_names", [True, False], ids=lambda v: str(v)) -def test_online_retrieval_with_event_timestamps( - environment, universal_data_sources, full_feature_names -): - fs = environment.feature_store +def setup_feature_store_universal_feature_views( + environment, universal_data_sources +) -> FeatureStore: + fs: FeatureStore = environment.feature_store entities, datasets, data_sources = universal_data_sources feature_views = construct_universal_feature_views(data_sources) fs.apply([driver(), feature_views.driver, feature_views.global_fv]) - # fake data to ingest into Online Store data = { "driver_id": [1, 2], "conv_rate": [0.5, 0.3], @@ -429,18 +427,11 @@ def test_online_retrieval_with_event_timestamps( } df_ingest = pd.DataFrame(data) - # directly ingest data into the Online Store fs.write_to_online_store("driver_stats", df_ingest) + return fs - response = fs.get_online_features( - features=[ - "driver_stats:avg_daily_trips", - "driver_stats:acc_rate", - "driver_stats:conv_rate", - ], - entity_rows=[{"driver_id": 1}, {"driver_id": 2}], - ) - df = response.to_df(True) + +def assert_feature_store_universal_feature_views_response(df: pd.DataFrame): assertpy.assert_that(len(df)).is_equal_to(2) assertpy.assert_that(df["driver_id"].iloc[0]).is_equal_to(1) assertpy.assert_that(df["driver_id"].iloc[1]).is_equal_to(2) @@ -464,6 +455,50 @@ def test_online_retrieval_with_event_timestamps( ) +@pytest.mark.integration +@pytest.mark.universal_online_stores +def test_online_retrieval_with_event_timestamps(environment, universal_data_sources): + fs = setup_feature_store_universal_feature_views( + environment, universal_data_sources + ) + + response = fs.get_online_features( + features=[ + "driver_stats:avg_daily_trips", + "driver_stats:acc_rate", + "driver_stats:conv_rate", + ], + entity_rows=[{"driver_id": 1}, {"driver_id": 2}], + ) + df = response.to_df(True) + + assert_feature_store_universal_feature_views_response(df) + + +@pytest.mark.integration +@pytest.mark.universal_online_stores(only=["redis"]) +def test_async_online_retrieval_with_event_timestamps( + environment, universal_data_sources +): + fs = setup_feature_store_universal_feature_views( + environment, universal_data_sources + ) + + response = asyncio.run( + fs.get_online_features_async( + features=[ + "driver_stats:avg_daily_trips", + "driver_stats:acc_rate", + "driver_stats:conv_rate", + ], + entity_rows=[{"driver_id": 1}, {"driver_id": 2}], + ) + ) + df = response.to_df(True) + + assert_feature_store_universal_feature_views_response(df) + + @pytest.mark.integration @pytest.mark.universal_online_stores(only=["redis"]) def test_online_store_cleanup(environment, universal_data_sources):