Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Range query materialization #185

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union
from feast.entity import Entity
from feast.feature_view import FeatureView
from feast.batch_feature_view import BatchFeatureView
from feast.sorted_feature_view import SortedFeatureView
from feast.stream_feature_view import StreamFeatureView
from feast.infra.materialization import LocalMaterializationEngine, LocalMaterializationJob, MaterializationTask
from feast.infra.offline_stores.offline_store import OfflineStore
Expand Down Expand Up @@ -53,10 +54,10 @@ class MyCustomEngine(LocalMaterializationEngine):
self,
project: str,
views_to_delete: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, SortedFeatureView]
],
views_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, SortedFeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
Expand Down
8 changes: 4 additions & 4 deletions examples/rbac-remote/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,10 @@ The application works with Kubernetes or OpenShift and the instructions assume t
- As an example, we created 3 different users: 1. [admin_user](client/k8s/admin_user_resources.yaml), 2. [readonly_user](client/k8s/readonly_user_resources.yaml) and 3. [unauthorized_user](client/k8s/unauthorized_user_resources.yaml) .
- Each user is assigned their own service account and roles, as shown in the table below.
##### Roles and Permissions for Examples (Admin and User)
| **User** | **Service Account** | **Roles** | **Permission** | **Feast Resources** | **Actions** |
|-----------------|----------------------------|------------------|--------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------|
| admin | feast-admin-sa | feast-admin-role | feast_admin_permission | FeatureView, OnDemandFeatureView, BatchFeatureView, StreamFeatureView, Entity, FeatureService, DataSource, ValidationReference, SavedDataset, Permission | CREATE, DESCRIBE, UPDATE, DELETE, READ_ONLINE, READY_OFFLINE, WRITE_ONLINE, WRITE_OFFLINE |
| user | feast-user-sa | feast-user-role | feast_user_permission | FeatureView, OnDemandFeatureView, BatchFeatureView, StreamFeatureView, Entity, FeatureService, DataSource, ValidationReference, SavedDataset, Permission | READ, READ_OFFLINE, READ_ONLINE |
| **User** | **Service Account** | **Roles** | **Permission** | **Feast Resources** | **Actions** |
|-----------------|----------------------------|------------------|--------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-------------------------------------------------------------------------------------------|
| admin | feast-admin-sa | feast-admin-role | feast_admin_permission | FeatureView, OnDemandFeatureView, BatchFeatureView, StreamFeatureView, , SortedFeatureView, Entity, FeatureService, DataSource, ValidationReference, SavedDataset, Permission | CREATE, DESCRIBE, UPDATE, DELETE, READ_ONLINE, READY_OFFLINE, WRITE_ONLINE, WRITE_OFFLINE |
| user | feast-user-sa | feast-user-role | feast_user_permission | FeatureView, OnDemandFeatureView, BatchFeatureView, StreamFeatureView, , SortedFeatureView, Entity, FeatureService, DataSource, ValidationReference, SavedDataset, Permission | READ, READ_OFFLINE, READ_ONLINE |
|unauthorized-user| feast-unauthorized-user-sa | |
- To deploy the client confirm `Apply client creation examples` `Y`
- The Deployment of the overall setup looks like :
Expand Down
1 change: 1 addition & 0 deletions sdk/python/feast/diff/registry_diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,7 @@ def apply_diff_to_registry(
FeastObjectType.FEATURE_VIEW,
FeastObjectType.ON_DEMAND_FEATURE_VIEW,
FeastObjectType.STREAM_FEATURE_VIEW,
FeastObjectType.SORTED_FEATURE_VIEW,
]:
registry.apply_feature_view(
cast(BaseFeatureView, feast_object_diff.new_feast_object),
Expand Down
13 changes: 8 additions & 5 deletions sdk/python/feast/infra/contrib/spark_kafka_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
_SparkSerializedArtifacts,
)
from feast.infra.provider import get_provider
from feast.sorted_feature_view import SortedFeatureView
from feast.stream_feature_view import StreamFeatureView


Expand Down Expand Up @@ -258,11 +259,13 @@ def batch_write(row: DataFrame, batch_id: int):
ts_field = self.sfv.timestamp_field
else:
ts_field = self.sfv.stream_source.timestamp_field # type: ignore
rows = (
rows.sort_values(by=[*self.join_keys, ts_field], ascending=False)
.groupby(self.join_keys)
.nth(0)
)

if not isinstance(self.sfv, SortedFeatureView):
rows = (
rows.sort_values(by=[*self.join_keys, ts_field], ascending=False)
.groupby(self.join_keys)
.nth(0)
)
# Created column is not used anywhere in the code, but it is added to the dataframe.
# Commenting this out as it is not used anywhere in the code
# rows["created"] = pd.to_datetime("now", utc=True)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.registry.base_registry import BaseRegistry
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.sorted_feature_view import SortedFeatureView
from feast.stream_feature_view import StreamFeatureView
from feast.utils import _get_column_names
from feast.version import get_version
Expand Down Expand Up @@ -80,10 +81,10 @@ def update(
self,
project: str,
views_to_delete: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, SortedFeatureView]
],
views_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, SortedFeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
Expand Down Expand Up @@ -114,7 +115,7 @@ def update(
def teardown_infra(
self,
project: str,
fvs: Sequence[Union[BatchFeatureView, StreamFeatureView, FeatureView]],
fvs: Sequence[Union[BatchFeatureView, StreamFeatureView, FeatureView, SortedFeatureView]],
entities: Sequence[Entity],
):
# This should be tearing down the lambda function.
Expand Down Expand Up @@ -166,7 +167,7 @@ def materialize(
def _materialize_one(
self,
registry: BaseRegistry,
feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView],
feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView, SortedFeatureView],
start_date: datetime,
end_date: datetime,
project: str,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.registry.base_registry import BaseRegistry
from feast.repo_config import RepoConfig
from feast.sorted_feature_view import SortedFeatureView
from feast.stream_feature_view import StreamFeatureView


Expand All @@ -24,7 +25,7 @@ class MaterializationTask:
"""

project: str
feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView]
feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView, SortedFeatureView]
start_time: datetime
end_time: datetime
tqdm_builder: Callable[[int], tqdm]
Expand Down Expand Up @@ -86,10 +87,10 @@ def update(
self,
project: str,
views_to_delete: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, SortedFeatureView]
],
views_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, SortedFeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
Expand Down Expand Up @@ -128,7 +129,7 @@ def materialize(
def teardown_infra(
self,
project: str,
fvs: Sequence[Union[BatchFeatureView, StreamFeatureView, FeatureView]],
fvs: Sequence[Union[BatchFeatureView, StreamFeatureView, FeatureView, SortedFeatureView]],
entities: Sequence[Entity],
):
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from feast.infra.registry.base_registry import BaseRegistry
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.sorted_feature_view import SortedFeatureView
from feast.stream_feature_view import StreamFeatureView
from feast.utils import (
_convert_arrow_to_proto,
Expand Down Expand Up @@ -78,10 +79,10 @@ def update(
self,
project: str,
views_to_delete: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, SortedFeatureView]
],
views_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, SortedFeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
Expand All @@ -92,7 +93,7 @@ def update(
def teardown_infra(
self,
project: str,
fvs: Sequence[Union[BatchFeatureView, StreamFeatureView, FeatureView]],
fvs: Sequence[Union[BatchFeatureView, StreamFeatureView, FeatureView, SortedFeatureView]],
entities: Sequence[Entity],
):
# Nothing to tear down.
Expand Down Expand Up @@ -135,7 +136,7 @@ def materialize(
def _materialize_one(
self,
registry: BaseRegistry,
feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView],
feature_view: Union[BatchFeatureView, SortedFeatureView, StreamFeatureView, FeatureView],
start_date: datetime,
end_date: datetime,
project: str,
Expand All @@ -155,19 +156,33 @@ def _materialize_one(
job_id = f"{feature_view.name}-{start_date}-{end_date}"

try:
offline_job = cast(
SparkRetrievalJob,
self.offline_store.pull_latest_from_table_or_query(
config=self.repo_config,
data_source=feature_view.batch_source,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
created_timestamp_column=created_timestamp_column,
start_date=start_date,
end_date=end_date,
),
)
if isinstance(feature_view, SortedFeatureView):
offline_job = cast(
SparkRetrievalJob,
self.offline_store.pull_all_from_table_or_query(
config=self.repo_config,
data_source=feature_view.batch_source,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
start_date=start_date,
end_date=end_date,
),
)
else:
offline_job = cast(
SparkRetrievalJob,
self.offline_store.pull_latest_from_table_or_query(
config=self.repo_config,
data_source=feature_view.batch_source,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
created_timestamp_column=created_timestamp_column,
start_date=start_date,
end_date=end_date,
),
)

spark_serialized_artifacts = _SparkSerializedArtifacts.serialize(
feature_view=feature_view, repo_config=self.repo_config
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.registry.base_registry import BaseRegistry
from feast.repo_config import FeastConfigBaseModel
from feast.sorted_feature_view import SortedFeatureView
from feast.stream_feature_view import StreamFeatureView
from feast.utils import _get_column_names

Expand Down Expand Up @@ -122,10 +123,10 @@ def update(
self,
project: str,
views_to_delete: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, SortedFeatureView]
],
views_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, SortedFeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
Expand All @@ -137,7 +138,7 @@ def update(
def teardown_infra(
self,
project: str,
fvs: Sequence[Union[BatchFeatureView, StreamFeatureView, FeatureView]],
fvs: Sequence[Union[BatchFeatureView, StreamFeatureView, FeatureView, SortedFeatureView]],
entities: Sequence[Entity],
):
"""This method ensures that any infrastructure or resources set up by ``update()``are torn down."""
Expand All @@ -163,7 +164,7 @@ def materialize(
def _materialize_one(
self,
registry: BaseRegistry,
feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView],
feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView, SortedFeatureView],
start_date: datetime,
end_date: datetime,
project: str,
Expand All @@ -179,17 +180,27 @@ def _materialize_one(
timestamp_field,
created_timestamp_column,
) = _get_column_names(feature_view, entities)

offline_job = self.offline_store.pull_latest_from_table_or_query(
config=self.repo_config,
data_source=feature_view.batch_source,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
created_timestamp_column=created_timestamp_column,
start_date=start_date,
end_date=end_date,
)
if isinstance(feature_view, SortedFeatureView):
offline_job = self.offline_store.pull_all_from_table_or_query(
config=self.repo_config,
data_source=feature_view.batch_source,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
start_date=start_date,
end_date=end_date,
)
else:
offline_job = self.offline_store.pull_latest_from_table_or_query(
config=self.repo_config,
data_source=feature_view.batch_source,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
created_timestamp_column=created_timestamp_column,
start_date=start_date,
end_date=end_date,
)

paths = offline_job.to_remote_storage()
if self.batch_engine_config.synchronous:
Expand Down
40 changes: 26 additions & 14 deletions sdk/python/feast/infra/materialization/local_engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from feast.infra.online_stores.online_store import OnlineStore
from feast.infra.registry.base_registry import BaseRegistry
from feast.repo_config import FeastConfigBaseModel, RepoConfig
from feast.sorted_feature_view import SortedFeatureView
from feast.stream_feature_view import StreamFeatureView
from feast.utils import (
_convert_arrow_to_proto,
Expand Down Expand Up @@ -69,10 +70,10 @@ def update(
self,
project: str,
views_to_delete: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, SortedFeatureView]
],
views_to_keep: Sequence[
Union[BatchFeatureView, StreamFeatureView, FeatureView]
Union[BatchFeatureView, StreamFeatureView, FeatureView, SortedFeatureView]
],
entities_to_delete: Sequence[Entity],
entities_to_keep: Sequence[Entity],
Expand All @@ -83,7 +84,7 @@ def update(
def teardown_infra(
self,
project: str,
fvs: Sequence[Union[BatchFeatureView, StreamFeatureView, FeatureView]],
fvs: Sequence[Union[BatchFeatureView, StreamFeatureView, FeatureView, SortedFeatureView]],
entities: Sequence[Entity],
):
# Nothing to tear down.
Expand Down Expand Up @@ -122,7 +123,7 @@ def materialize(
def _materialize_one(
self,
registry: BaseRegistry,
feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView],
feature_view: Union[BatchFeatureView, StreamFeatureView, FeatureView, SortedFeatureView],
start_date: datetime,
end_date: datetime,
project: str,
Expand All @@ -142,16 +143,27 @@ def _materialize_one(
job_id = f"{feature_view.name}-{start_date}-{end_date}"

try:
offline_job = self.offline_store.pull_latest_from_table_or_query(
config=self.repo_config,
data_source=feature_view.batch_source,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
created_timestamp_column=created_timestamp_column,
start_date=start_date,
end_date=end_date,
)
if isinstance(feature_view, SortedFeatureView):
offline_job = self.offline_store.pull_all_from_table_or_query(
config=self.repo_config,
data_source=feature_view.batch_source,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
start_date=start_date,
end_date=end_date,
)
else:
offline_job = self.offline_store.pull_latest_from_table_or_query(
config=self.repo_config,
data_source=feature_view.batch_source,
join_key_columns=join_key_columns,
feature_name_columns=feature_name_columns,
timestamp_field=timestamp_field,
created_timestamp_column=created_timestamp_column,
start_date=start_date,
end_date=end_date,
)

table = offline_job.to_arrow()

Expand Down
Loading
Loading