Skip to content

Commit 8028ae0

Browse files
authored
fix: Update Feast object metadata in the registry (feast-dev#4257)
1 parent 89bc551 commit 8028ae0

11 files changed

+570
-9
lines changed

sdk/python/feast/feature_view.py

+10
Original file line numberDiff line numberDiff line change
@@ -311,6 +311,16 @@ def with_join_key_map(self, join_key_map: Dict[str, str]):
311311

312312
return cp
313313

314+
def update_materialization_intervals(
315+
self, existing_materialization_intervals: List[Tuple[datetime, datetime]]
316+
):
317+
if (
318+
len(existing_materialization_intervals) > 0
319+
and len(self.materialization_intervals) == 0
320+
):
321+
for interval in existing_materialization_intervals:
322+
self.materialization_intervals.append((interval[0], interval[1]))
323+
314324
def to_proto(self) -> FeatureViewProto:
315325
"""
316326
Converts a feature view object to its protobuf representation.

sdk/python/feast/infra/registry/base_registry.py

+28
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,19 @@
2929
from feast.infra.infra_object import Infra
3030
from feast.on_demand_feature_view import OnDemandFeatureView
3131
from feast.project_metadata import ProjectMetadata
32+
from feast.protos.feast.core.Entity_pb2 import Entity as EntityProto
33+
from feast.protos.feast.core.FeatureService_pb2 import (
34+
FeatureService as FeatureServiceProto,
35+
)
36+
from feast.protos.feast.core.FeatureView_pb2 import FeatureView as FeatureViewProto
37+
from feast.protos.feast.core.OnDemandFeatureView_pb2 import (
38+
OnDemandFeatureView as OnDemandFeatureViewProto,
39+
)
3240
from feast.protos.feast.core.Registry_pb2 import Registry as RegistryProto
41+
from feast.protos.feast.core.SavedDataset_pb2 import SavedDataset as SavedDatasetProto
42+
from feast.protos.feast.core.StreamFeatureView_pb2 import (
43+
StreamFeatureView as StreamFeatureViewProto,
44+
)
3345
from feast.saved_dataset import SavedDataset, ValidationReference
3446
from feast.stream_feature_view import StreamFeatureView
3547
from feast.transformation.pandas_transformation import PandasTransformation
@@ -705,3 +717,19 @@ def to_dict(self, project: str) -> Dict[str, List[Any]]:
705717
self._message_to_sorted_dict(infra_object.to_proto())
706718
)
707719
return registry_dict
720+
721+
@staticmethod
722+
def deserialize_registry_values(serialized_proto, feast_obj_type) -> Any:
723+
if feast_obj_type == Entity:
724+
return EntityProto.FromString(serialized_proto)
725+
if feast_obj_type == SavedDataset:
726+
return SavedDatasetProto.FromString(serialized_proto)
727+
if feast_obj_type == FeatureView:
728+
return FeatureViewProto.FromString(serialized_proto)
729+
if feast_obj_type == StreamFeatureView:
730+
return StreamFeatureViewProto.FromString(serialized_proto)
731+
if feast_obj_type == OnDemandFeatureView:
732+
return OnDemandFeatureViewProto.FromString(serialized_proto)
733+
if feast_obj_type == FeatureService:
734+
return FeatureServiceProto.FromString(serialized_proto)
735+
return None

sdk/python/feast/infra/registry/registry.py

+33-1
Original file line numberDiff line numberDiff line change
@@ -265,9 +265,13 @@ def apply_entity(self, entity: Entity, project: str, commit: bool = True):
265265
existing_entity_proto.spec.name == entity_proto.spec.name
266266
and existing_entity_proto.spec.project == project
267267
):
268+
entity.created_timestamp = (
269+
existing_entity_proto.meta.created_timestamp.ToDatetime()
270+
)
271+
entity_proto = entity.to_proto()
272+
entity_proto.spec.project = project
268273
del self.cached_registry_proto.entities[idx]
269274
break
270-
271275
self.cached_registry_proto.entities.append(entity_proto)
272276
if commit:
273277
self.commit()
@@ -346,6 +350,11 @@ def apply_feature_service(
346350
== feature_service_proto.spec.name
347351
and existing_feature_service_proto.spec.project == project
348352
):
353+
feature_service.created_timestamp = (
354+
existing_feature_service_proto.meta.created_timestamp.ToDatetime()
355+
)
356+
feature_service_proto = feature_service.to_proto()
357+
feature_service_proto.spec.project = project
349358
del registry.feature_services[idx]
350359
registry.feature_services.append(feature_service_proto)
351360
if commit:
@@ -421,6 +430,18 @@ def apply_feature_view(
421430
):
422431
return
423432
else:
433+
existing_feature_view = type(feature_view).from_proto(
434+
existing_feature_view_proto
435+
)
436+
feature_view.created_timestamp = (
437+
existing_feature_view.created_timestamp
438+
)
439+
if isinstance(feature_view, (FeatureView, StreamFeatureView)):
440+
feature_view.update_materialization_intervals(
441+
existing_feature_view.materialization_intervals
442+
)
443+
feature_view_proto = feature_view.to_proto()
444+
feature_view_proto.spec.project = project
424445
del existing_feature_views_of_same_type[idx]
425446
break
426447

@@ -660,6 +681,17 @@ def apply_saved_dataset(
660681
existing_saved_dataset_proto.spec.name == saved_dataset_proto.spec.name
661682
and existing_saved_dataset_proto.spec.project == project
662683
):
684+
saved_dataset.created_timestamp = (
685+
existing_saved_dataset_proto.meta.created_timestamp.ToDatetime()
686+
)
687+
saved_dataset.min_event_timestamp = (
688+
existing_saved_dataset_proto.meta.min_event_timestamp.ToDatetime()
689+
)
690+
saved_dataset.max_event_timestamp = (
691+
existing_saved_dataset_proto.meta.max_event_timestamp.ToDatetime()
692+
)
693+
saved_dataset_proto = saved_dataset.to_proto()
694+
saved_dataset_proto.spec.project = project
663695
del self.cached_registry_proto.saved_datasets[idx]
664696
break
665697

sdk/python/feast/infra/registry/remote.py

+1
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,7 @@ def apply_materialization(
296296
start_date_timestamp.FromDatetime(start_date)
297297
end_date_timestamp.FromDatetime(end_date)
298298

299+
# TODO: for this to work for stream feature views, ApplyMaterializationRequest needs to be updated
299300
request = RegistryServer_pb2.ApplyMaterializationRequest(
300301
feature_view=feature_view.to_proto(),
301302
project=project,

sdk/python/feast/infra/registry/sql.py

+18
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,24 @@ def _apply_object(
713713
obj.last_updated_timestamp = update_datetime
714714

715715
if row:
716+
if proto_field_name in [
717+
"entity_proto",
718+
"saved_dataset_proto",
719+
"feature_view_proto",
720+
"feature_service_proto",
721+
]:
722+
deserialized_proto = self.deserialize_registry_values(
723+
row._mapping[proto_field_name], type(obj)
724+
)
725+
obj.created_timestamp = (
726+
deserialized_proto.meta.created_timestamp.ToDatetime()
727+
)
728+
if isinstance(obj, (FeatureView, StreamFeatureView)):
729+
obj.update_materialization_intervals(
730+
type(obj)
731+
.from_proto(deserialized_proto)
732+
.materialization_intervals
733+
)
716734
values = {
717735
proto_field_name: obj.to_proto().SerializeToString(),
718736
"last_updated_timestamp": update_time,

sdk/python/feast/registry_server.py

+3-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33

44
import grpc
55
from google.protobuf.empty_pb2 import Empty
6+
from pytz import utc
67

78
from feast import FeatureStore
89
from feast.data_source import DataSource
@@ -313,10 +314,10 @@ def ApplyMaterialization(
313314
feature_view=FeatureView.from_proto(request.feature_view),
314315
project=request.project,
315316
start_date=datetime.fromtimestamp(
316-
request.start_date.seconds + request.start_date.nanos / 1e9
317+
request.start_date.seconds + request.start_date.nanos / 1e9, tz=utc
317318
),
318319
end_date=datetime.fromtimestamp(
319-
request.end_date.seconds + request.end_date.nanos / 1e9
320+
request.end_date.seconds + request.end_date.nanos / 1e9, tz=utc
320321
),
321322
commit=request.commit,
322323
)

0 commit comments

Comments
 (0)