From 2e1931f88544fc952272614d3d9026fc33f55549 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Tue, 22 Mar 2022 12:20:58 -0700 Subject: [PATCH 1/7] Fix registry Signed-off-by: Kevin Zhang --- go/internal/feast/featurestore.go | 49 +++++++++---- go/internal/feast/registry.go | 113 +++++++++++++++++++++--------- 2 files changed, 113 insertions(+), 49 deletions(-) diff --git a/go/internal/feast/featurestore.go b/go/internal/feast/featurestore.go index 3d6d3b1a4bc..ddb0830cf6b 100644 --- a/go/internal/feast/featurestore.go +++ b/go/internal/feast/featurestore.go @@ -4,14 +4,15 @@ import ( "context" "errors" "fmt" + "sort" + "strings" + "github.com/feast-dev/feast/go/protos/feast/serving" "github.com/feast-dev/feast/go/protos/feast/types" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" - "sort" - "strings" ) type FeatureStore struct { @@ -216,7 +217,7 @@ func (fs *FeatureStore) parseFeatures(kind interface{}) (*Features, error) { return &Features{features: featureList.Features.GetVal(), featureService: nil}, nil } if featureServiceRequest, ok := kind.(*serving.GetOnlineFeaturesRequest_FeatureService); ok { - featureService, err := fs.registry.getFeatureService(fs.config.Project, featureServiceRequest.FeatureService) + featureService, err := fs.registry.getFeatureService(fs.config.Project, featureServiceRequest.FeatureService, true) if err != nil { return nil, err } @@ -258,17 +259,26 @@ func (fs *FeatureStore) getFeatureViewsToUse(features *Features, allowCache, hid requestFvs := make(map[string]*RequestFeatureView) odFvs := make(map[string]*OnDemandFeatureView) - featureViews := fs.listFeatureViews(allowCache, hideDummyEntity) + featureViews, err := fs.listFeatureViews(allowCache, hideDummyEntity) + if err != nil { + return nil, nil, nil, nil, err + } for _, featureView := range featureViews { fvs[featureView.base.name] = featureView } - requestFeatureViews := fs.registry.listRequestFeatureViews(fs.config.Project) + requestFeatureViews, err := fs.registry.listRequestFeatureViews(fs.config.Project, allowCache) + if err != nil { + return nil, nil, nil, nil, err + } for _, requestFeatureView := range requestFeatureViews { requestFvs[requestFeatureView.base.name] = requestFeatureView } - onDemandFeatureViews := fs.registry.listOnDemandFeatureViews(fs.config.Project) + onDemandFeatureViews, err := fs.registry.listOnDemandFeatureViews(fs.config.Project, allowCache) + if err != nil { + return nil, nil, nil, nil, err + } for _, onDemandFeatureView := range onDemandFeatureViews { odFvs[onDemandFeatureView.base.name] = onDemandFeatureView } @@ -347,7 +357,10 @@ func (fs *FeatureStore) getEntityMaps(requestedFeatureViews map[*FeatureView][]s var joinKeyMap map[string]string var featureView *FeatureView - entities := fs.listEntities(true, false) + entities, err := fs.listEntities(true, false) + if err != nil { + return nil, err + } for _, entity := range entities { entityNameToJoinKeyMap[entity.name] = entity.joinKey @@ -359,7 +372,7 @@ func (fs *FeatureStore) getEntityMaps(requestedFeatureViews map[*FeatureView][]s joinKeyMap = featureView.base.projection.joinKeyMap for entityName = range entityNames { - entity, err := fs.registry.getEntity(fs.config.Project, entityName) + entity, err := fs.registry.getEntity(fs.config.Project, entityName, true) if err != nil { return nil, err } @@ -636,26 +649,32 @@ func (fs *FeatureStore) dropUnneededColumns(onlineFeaturesResponse *serving.GetO } } -func (fs *FeatureStore) listFeatureViews(allowCache, hideDummyEntity bool) []*FeatureView { - featureViews := fs.registry.listFeatureViews(fs.config.Project) +func (fs *FeatureStore) listFeatureViews(allowCache, hideDummyEntity bool) ([]*FeatureView, error) { + featureViews, err := fs.registry.listFeatureViews(fs.config.Project, allowCache) + if err != nil { + return featureViews, err + } for _, featureView := range featureViews { if _, ok := featureView.entities[DUMMY_ENTITY_NAME]; ok && hideDummyEntity { featureView.entities = make(map[string]struct{}) } } - return featureViews + return featureViews, nil } -func (fs *FeatureStore) listEntities(allowCache, hideDummyEntity bool) []*Entity { +func (fs *FeatureStore) listEntities(allowCache, hideDummyEntity bool) ([]*Entity, error) { - allEntities := fs.registry.listEntities(fs.config.Project) + allEntities, err := fs.registry.listEntities(fs.config.Project, allowCache) + if err != nil { + return allEntities, err + } entities := make([]*Entity, 0) for _, entity := range allEntities { if entity.name != DUMMY_ENTITY_NAME || !hideDummyEntity { entities = append(entities, entity) } } - return entities + return entities, nil } func (fs *FeatureStore) getFvEntityValues(fv *FeatureView, @@ -821,7 +840,7 @@ func (fs *FeatureStore) groupFeatureRefs(requestedFeatureViews map[*FeatureView] } func (fs *FeatureStore) getFeatureView(project, featureViewName string, allowCache, hideDummyEntity bool) (*FeatureView, error) { - fv, err := fs.registry.getFeatureView(fs.config.Project, featureViewName) + fv, err := fs.registry.getFeatureView(fs.config.Project, featureViewName, allowCache) if err != nil { return nil, err } diff --git a/go/internal/feast/registry.go b/go/internal/feast/registry.go index f8ba83c45d7..8f99655eb78 100644 --- a/go/internal/feast/registry.go +++ b/go/internal/feast/registry.go @@ -3,9 +3,10 @@ package feast import ( "errors" "fmt" - "github.com/feast-dev/feast/go/protos/feast/core" "net/url" "time" + + "github.com/feast-dev/feast/go/protos/feast/core" ) var REGISTRY_SCHEMA_VERSION string = "1" @@ -28,7 +29,7 @@ type Registry struct { cachedFeatureViews map[string]map[string]*core.FeatureView cachedOnDemandFeatureViews map[string]map[string]*core.OnDemandFeatureView cachedRequestFeatureViews map[string]map[string]*core.RequestFeatureView - + cachedRegistry *core.Registry cachedRegistryProtoCreated time.Time cachedRegistryProtoTtl time.Duration } @@ -41,7 +42,7 @@ func NewRegistry(registryConfig *RegistryConfig, repoPath string) (*Registry, er } if len(registryStoreType) == 0 { - registryStore, err := getRegistryStoreFromSheme(registryPath, registryConfig, repoPath) + registryStore, err := getRegistryStoreFromScheme(registryPath, registryConfig, repoPath) if err != nil { return nil, err } @@ -58,7 +59,7 @@ func NewRegistry(registryConfig *RegistryConfig, repoPath string) (*Registry, er } func (r *Registry) initializeRegistry() { - err := r.getRegistryProto(false) + _, err := r.getRegistryProto(false) if err != nil { registryProto := &core.Registry{RegistrySchemaVersion: REGISTRY_SCHEMA_VERSION} r.registryStore.UpdateRegistryProto(registryProto) @@ -68,23 +69,26 @@ func (r *Registry) initializeRegistry() { // TODO: Add a goroutine and automatically refresh every cachedRegistryProtoTtl func (r *Registry) refresh() error { - return r.getRegistryProto(false) + _, err := r.getRegistryProto(false) + return err } -func (r *Registry) getRegistryProto(allowCache bool) error { - expired := r.cachedFeatureServices == nil || (r.cachedRegistryProtoTtl > 0 && time.Now().After(r.cachedRegistryProtoCreated.Add(r.cachedRegistryProtoTtl))) +func (r *Registry) getRegistryProto(allowCache bool) (*core.Registry, error) { + expired := r.cachedRegistry == nil || (r.cachedRegistryProtoTtl > 0 && time.Now().After(r.cachedRegistryProtoCreated.Add(r.cachedRegistryProtoTtl))) if allowCache && !expired { - return nil + return r.cachedRegistry, nil } registryProto, err := r.registryStore.GetRegistryProto() if err != nil { - return err + return registryProto, err } r.load(registryProto) - return nil + r.cachedRegistryProtoCreated = time.Now() + return registryProto, nil } func (r *Registry) load(registry *core.Registry) { + r.cachedRegistry = registry r.cachedFeatureServices = make(map[string]map[string]*core.FeatureService) r.cachedEntities = make(map[string]map[string]*core.Entity) r.cachedFeatureViews = make(map[string]map[string]*core.FeatureView) @@ -152,9 +156,13 @@ func (r *Registry) loadRequestFeatureViews(registry *core.Registry) { Returns empty list if project not found */ -func (r *Registry) listEntities(project string) []*Entity { +func (r *Registry) listEntities(project string, allowCache bool) ([]*Entity, error) { + _, err := r.getRegistryProto(allowCache) + if err != nil { + return nil, err + } if entities, ok := r.cachedEntities[project]; !ok { - return []*Entity{} + return []*Entity{}, errors.New(fmt.Sprintf("project %s not found in listEntities", project)) } else { entityList := make([]*Entity, len(entities)) index := 0 @@ -162,8 +170,9 @@ func (r *Registry) listEntities(project string) []*Entity { entityList[index] = NewEntityFromProto(entity) index += 1 } - return entityList + return entityList, nil } + } /* @@ -171,17 +180,21 @@ func (r *Registry) listEntities(project string) []*Entity { Returns empty list if project not found */ -func (r *Registry) listFeatureViews(project string) []*FeatureView { - if featureViewProtos, ok := r.cachedFeatureViews[project]; !ok { - return []*FeatureView{} +func (r *Registry) listFeatureViews(project string, allowCache bool) ([]*FeatureView, error) { + _, err := r.getRegistryProto(allowCache) + if err != nil { + return nil, err + } + if cachedFeatureViews, ok := r.cachedFeatureViews[project]; !ok { + return []*FeatureView{}, errors.New(fmt.Sprintf("project %s not found in listFeatureViews", project)) } else { - featureViews := make([]*FeatureView, len(featureViewProtos)) + featureViews := make([]*FeatureView, len(cachedFeatureViews)) index := 0 - for _, featureViewProto := range featureViewProtos { + for _, featureViewProto := range cachedFeatureViews { featureViews[index] = NewFeatureViewFromProto(featureViewProto) index += 1 } - return featureViews + return featureViews, nil } } @@ -190,9 +203,13 @@ func (r *Registry) listFeatureViews(project string) []*FeatureView { Returns empty list if project not found */ -func (r *Registry) listFeatureServices(project string) []*FeatureService { +func (r *Registry) listFeatureServices(project string, allowCache bool) ([]*FeatureService, error) { + _, err := r.getRegistryProto(allowCache) + if err != nil { + return nil, err + } if featureServiceProtos, ok := r.cachedFeatureServices[project]; !ok { - return []*FeatureService{} + return []*FeatureService{}, errors.New(fmt.Sprintf("project %s not found in listFeatureServices", project)) } else { featureServices := make([]*FeatureService, len(featureServiceProtos)) index := 0 @@ -200,7 +217,7 @@ func (r *Registry) listFeatureServices(project string) []*FeatureService { featureServices[index] = NewFeatureServiceFromProto(featureServiceProto) index += 1 } - return featureServices + return featureServices, nil } } @@ -209,9 +226,13 @@ func (r *Registry) listFeatureServices(project string) []*FeatureService { Returns empty list if project not found */ -func (r *Registry) listOnDemandFeatureViews(project string) []*OnDemandFeatureView { +func (r *Registry) listOnDemandFeatureViews(project string, allowCache bool) ([]*OnDemandFeatureView, error) { + _, err := r.getRegistryProto(allowCache) + if err != nil { + return nil, err + } if onDemandFeatureViewProtos, ok := r.cachedOnDemandFeatureViews[project]; !ok { - return []*OnDemandFeatureView{} + return []*OnDemandFeatureView{}, errors.New(fmt.Sprintf("project %s not found in listOnDemandFeatureViews", project)) } else { onDemandFeatureViews := make([]*OnDemandFeatureView, len(onDemandFeatureViewProtos)) index := 0 @@ -219,7 +240,7 @@ func (r *Registry) listOnDemandFeatureViews(project string) []*OnDemandFeatureVi onDemandFeatureViews[index] = NewOnDemandFeatureViewFromProto(onDemandFeatureViewProto) index += 1 } - return onDemandFeatureViews + return onDemandFeatureViews, nil } } @@ -228,9 +249,13 @@ func (r *Registry) listOnDemandFeatureViews(project string) []*OnDemandFeatureVi Returns empty list if project not found */ -func (r *Registry) listRequestFeatureViews(project string) []*RequestFeatureView { +func (r *Registry) listRequestFeatureViews(project string, allowCache bool) ([]*RequestFeatureView, error) { + _, err := r.getRegistryProto(allowCache) + if err != nil { + return nil, err + } if requestFeatureViewProtos, ok := r.cachedRequestFeatureViews[project]; !ok { - return []*RequestFeatureView{} + return []*RequestFeatureView{}, errors.New(fmt.Sprintf("project %s not found in listRequestFeatureViews", project)) } else { requestFeatureViews := make([]*RequestFeatureView, len(requestFeatureViewProtos)) index := 0 @@ -238,11 +263,15 @@ func (r *Registry) listRequestFeatureViews(project string) []*RequestFeatureView requestFeatureViews[index] = NewRequestFeatureViewFromProto(requestFeatureViewProto) index += 1 } - return requestFeatureViews + return requestFeatureViews, nil } } -func (r *Registry) getEntity(project, entityName string) (*Entity, error) { +func (r *Registry) getEntity(project, entityName string, allowCache bool) (*Entity, error) { + _, err := r.getRegistryProto(allowCache) + if err != nil { + return nil, err + } if entities, ok := r.cachedEntities[project]; !ok { return nil, errors.New(fmt.Sprintf("project %s not found in getEntity", project)) } else { @@ -254,7 +283,11 @@ func (r *Registry) getEntity(project, entityName string) (*Entity, error) { } } -func (r *Registry) getFeatureView(project, featureViewName string) (*FeatureView, error) { +func (r *Registry) getFeatureView(project, featureViewName string, allowCache bool) (*FeatureView, error) { + _, err := r.getRegistryProto(allowCache) + if err != nil { + return nil, err + } if featureViews, ok := r.cachedFeatureViews[project]; !ok { return nil, errors.New(fmt.Sprintf("project %s not found in getFeatureView", project)) } else { @@ -266,7 +299,11 @@ func (r *Registry) getFeatureView(project, featureViewName string) (*FeatureView } } -func (r *Registry) getFeatureService(project, featureServiceName string) (*FeatureService, error) { +func (r *Registry) getFeatureService(project, featureServiceName string, allowCache bool) (*FeatureService, error) { + _, err := r.getRegistryProto(allowCache) + if err != nil { + return nil, err + } if featureServices, ok := r.cachedFeatureServices[project]; !ok { return nil, errors.New(fmt.Sprintf("project %s not found in getFeatureService", project)) } else { @@ -278,7 +315,11 @@ func (r *Registry) getFeatureService(project, featureServiceName string) (*Featu } } -func (r *Registry) getOnDemandFeatureView(project, onDemandFeatureViewName string) (*OnDemandFeatureView, error) { +func (r *Registry) getOnDemandFeatureView(project, onDemandFeatureViewName string, allowCache bool) (*OnDemandFeatureView, error) { + _, err := r.getRegistryProto(allowCache) + if err != nil { + return nil, err + } if onDemandFeatureViews, ok := r.cachedOnDemandFeatureViews[project]; !ok { return nil, errors.New(fmt.Sprintf("project %s not found in getOnDemandFeatureView", project)) } else { @@ -290,7 +331,11 @@ func (r *Registry) getOnDemandFeatureView(project, onDemandFeatureViewName strin } } -func (r *Registry) getRequestFeatureView(project, requestFeatureViewName string) (*RequestFeatureView, error) { +func (r *Registry) getRequestFeatureView(project, requestFeatureViewName string, allowCache bool) (*RequestFeatureView, error) { + _, err := r.getRegistryProto(allowCache) + if err != nil { + return nil, err + } if requestFeatureViews, ok := r.cachedRequestFeatureViews[project]; !ok { return nil, errors.New(fmt.Sprintf("project %s not found in getRequestFeatureView", project)) } else { @@ -302,7 +347,7 @@ func (r *Registry) getRequestFeatureView(project, requestFeatureViewName string) } } -func getRegistryStoreFromSheme(registryPath string, registryConfig *RegistryConfig, repoPath string) (RegistryStore, error) { +func getRegistryStoreFromScheme(registryPath string, registryConfig *RegistryConfig, repoPath string) (RegistryStore, error) { uri, err := url.Parse(registryPath) if err != nil { return nil, err From c13e9bcb76de0e1c5aa60d637cd5e7d9f54f5270 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Tue, 22 Mar 2022 12:32:22 -0700 Subject: [PATCH 2/7] Clean up Signed-off-by: Kevin Zhang --- go/internal/feast/registry.go | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/go/internal/feast/registry.go b/go/internal/feast/registry.go index 8f99655eb78..d671d080625 100644 --- a/go/internal/feast/registry.go +++ b/go/internal/feast/registry.go @@ -162,7 +162,7 @@ func (r *Registry) listEntities(project string, allowCache bool) ([]*Entity, err return nil, err } if entities, ok := r.cachedEntities[project]; !ok { - return []*Entity{}, errors.New(fmt.Sprintf("project %s not found in listEntities", project)) + return []*Entity{}, nil } else { entityList := make([]*Entity, len(entities)) index := 0 @@ -186,7 +186,7 @@ func (r *Registry) listFeatureViews(project string, allowCache bool) ([]*Feature return nil, err } if cachedFeatureViews, ok := r.cachedFeatureViews[project]; !ok { - return []*FeatureView{}, errors.New(fmt.Sprintf("project %s not found in listFeatureViews", project)) + return []*FeatureView{}, nil } else { featureViews := make([]*FeatureView, len(cachedFeatureViews)) index := 0 @@ -209,7 +209,7 @@ func (r *Registry) listFeatureServices(project string, allowCache bool) ([]*Feat return nil, err } if featureServiceProtos, ok := r.cachedFeatureServices[project]; !ok { - return []*FeatureService{}, errors.New(fmt.Sprintf("project %s not found in listFeatureServices", project)) + return []*FeatureService{}, nil } else { featureServices := make([]*FeatureService, len(featureServiceProtos)) index := 0 @@ -232,7 +232,7 @@ func (r *Registry) listOnDemandFeatureViews(project string, allowCache bool) ([] return nil, err } if onDemandFeatureViewProtos, ok := r.cachedOnDemandFeatureViews[project]; !ok { - return []*OnDemandFeatureView{}, errors.New(fmt.Sprintf("project %s not found in listOnDemandFeatureViews", project)) + return []*OnDemandFeatureView{}, nil } else { onDemandFeatureViews := make([]*OnDemandFeatureView, len(onDemandFeatureViewProtos)) index := 0 @@ -255,7 +255,7 @@ func (r *Registry) listRequestFeatureViews(project string, allowCache bool) ([]* return nil, err } if requestFeatureViewProtos, ok := r.cachedRequestFeatureViews[project]; !ok { - return []*RequestFeatureView{}, errors.New(fmt.Sprintf("project %s not found in listRequestFeatureViews", project)) + return []*RequestFeatureView{}, nil } else { requestFeatureViews := make([]*RequestFeatureView, len(requestFeatureViewProtos)) index := 0 @@ -273,10 +273,10 @@ func (r *Registry) getEntity(project, entityName string, allowCache bool) (*Enti return nil, err } if entities, ok := r.cachedEntities[project]; !ok { - return nil, errors.New(fmt.Sprintf("project %s not found in getEntity", project)) + return nil, fmt.Errorf("project %s not found in getEntity", project) } else { if entity, ok := entities[entityName]; !ok { - return nil, errors.New(fmt.Sprintf("entity %s not found inside project %s", entityName, project)) + return nil, fmt.Errorf("entity %s not found inside project %s", entityName, project) } else { return NewEntityFromProto(entity), nil } @@ -289,10 +289,10 @@ func (r *Registry) getFeatureView(project, featureViewName string, allowCache bo return nil, err } if featureViews, ok := r.cachedFeatureViews[project]; !ok { - return nil, errors.New(fmt.Sprintf("project %s not found in getFeatureView", project)) + return nil, fmt.Errorf("project %s not found in getFeatureView", project) } else { if featureViewProto, ok := featureViews[featureViewName]; !ok { - return nil, errors.New(fmt.Sprintf("featureView %s not found inside project %s", featureViewName, project)) + return nil, fmt.Errorf("featureView %s not found inside project %s", featureViewName, project) } else { return NewFeatureViewFromProto(featureViewProto), nil } @@ -305,10 +305,10 @@ func (r *Registry) getFeatureService(project, featureServiceName string, allowCa return nil, err } if featureServices, ok := r.cachedFeatureServices[project]; !ok { - return nil, errors.New(fmt.Sprintf("project %s not found in getFeatureService", project)) + return nil, fmt.Errorf("project %s not found in getFeatureService", project) } else { if featureServiceProto, ok := featureServices[featureServiceName]; !ok { - return nil, errors.New(fmt.Sprintf("featureService %s not found inside project %s", featureServiceName, project)) + return nil, fmt.Errorf("featureService %s not found inside project %s", featureServiceName, project) } else { return NewFeatureServiceFromProto(featureServiceProto), nil } @@ -321,10 +321,10 @@ func (r *Registry) getOnDemandFeatureView(project, onDemandFeatureViewName strin return nil, err } if onDemandFeatureViews, ok := r.cachedOnDemandFeatureViews[project]; !ok { - return nil, errors.New(fmt.Sprintf("project %s not found in getOnDemandFeatureView", project)) + return nil, fmt.Errorf("project %s not found in getOnDemandFeatureView", project) } else { if onDemandFeatureViewProto, ok := onDemandFeatureViews[onDemandFeatureViewName]; !ok { - return nil, errors.New(fmt.Sprintf("onDemandFeatureView %s not found inside project %s", onDemandFeatureViewName, project)) + return nil, fmt.Errorf("onDemandFeatureView %s not found inside project %s", onDemandFeatureViewName, project) } else { return NewOnDemandFeatureViewFromProto(onDemandFeatureViewProto), nil } @@ -337,10 +337,10 @@ func (r *Registry) getRequestFeatureView(project, requestFeatureViewName string, return nil, err } if requestFeatureViews, ok := r.cachedRequestFeatureViews[project]; !ok { - return nil, errors.New(fmt.Sprintf("project %s not found in getRequestFeatureView", project)) + return nil, fmt.Errorf("project %s not found in getRequestFeatureView", project) } else { if requestFeatureViewProto, ok := requestFeatureViews[requestFeatureViewName]; !ok { - return nil, errors.New(fmt.Sprintf("requestFeatureView %s not found inside project %s", requestFeatureViewName, project)) + return nil, fmt.Errorf("requestFeatureView %s not found inside project %s", requestFeatureViewName, project) } else { return NewRequestFeatureViewFromProto(requestFeatureViewProto), nil } @@ -355,7 +355,7 @@ func getRegistryStoreFromScheme(registryPath string, registryConfig *RegistryCon if registryStoreType, ok := REGISTRY_STORE_CLASS_FOR_SCHEME[uri.Scheme]; ok { return getRegistryStoreFromType(registryStoreType, registryConfig, repoPath) } - return nil, errors.New(fmt.Sprintf("registry path %s has unsupported scheme %s. Supported schemes are file, s3 and gs.", registryPath, uri.Scheme)) + return nil, fmt.Errorf("registry path %s has unsupported scheme %s. Supported schemes are file, s3 and gs", registryPath, uri.Scheme) } func getRegistryStoreFromType(registryStoreType string, registryConfig *RegistryConfig, repoPath string) (RegistryStore, error) { From 3e137b40ac32bc08a72a399598125a95343c5e71 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Tue, 22 Mar 2022 12:36:10 -0700 Subject: [PATCH 3/7] Clean up Signed-off-by: Kevin Zhang --- go/internal/feast/registry.go | 1 - 1 file changed, 1 deletion(-) diff --git a/go/internal/feast/registry.go b/go/internal/feast/registry.go index d671d080625..95b8e1e8da9 100644 --- a/go/internal/feast/registry.go +++ b/go/internal/feast/registry.go @@ -172,7 +172,6 @@ func (r *Registry) listEntities(project string, allowCache bool) ([]*Entity, err } return entityList, nil } - } /* From 9b6a2f659744ca9012669c6550239f8d89207184 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Tue, 22 Mar 2022 13:44:11 -0700 Subject: [PATCH 4/7] Fix issues Signed-off-by: Kevin Zhang --- go/internal/feast/registry.go | 68 +++++++++++++++++------------------ 1 file changed, 34 insertions(+), 34 deletions(-) diff --git a/go/internal/feast/registry.go b/go/internal/feast/registry.go index 95b8e1e8da9..17c44ffc662 100644 --- a/go/internal/feast/registry.go +++ b/go/internal/feast/registry.go @@ -161,16 +161,16 @@ func (r *Registry) listEntities(project string, allowCache bool) ([]*Entity, err if err != nil { return nil, err } - if entities, ok := r.cachedEntities[project]; !ok { + if cachedEntities, ok := r.cachedEntities[project]; !ok { return []*Entity{}, nil } else { - entityList := make([]*Entity, len(entities)) + entities := make([]*Entity, len(cachedEntities)) index := 0 - for _, entity := range entities { - entityList[index] = NewEntityFromProto(entity) + for _, entityProto := range cachedEntities { + entities[index] = NewEntityFromProto(entityProto) index += 1 } - return entityList, nil + return entities, nil } } @@ -207,12 +207,12 @@ func (r *Registry) listFeatureServices(project string, allowCache bool) ([]*Feat if err != nil { return nil, err } - if featureServiceProtos, ok := r.cachedFeatureServices[project]; !ok { + if cachedFeatureServices, ok := r.cachedFeatureServices[project]; !ok { return []*FeatureService{}, nil } else { - featureServices := make([]*FeatureService, len(featureServiceProtos)) + featureServices := make([]*FeatureService, len(cachedFeatureServices)) index := 0 - for _, featureServiceProto := range featureServiceProtos { + for _, featureServiceProto := range cachedFeatureServices { featureServices[index] = NewFeatureServiceFromProto(featureServiceProto) index += 1 } @@ -230,12 +230,12 @@ func (r *Registry) listOnDemandFeatureViews(project string, allowCache bool) ([] if err != nil { return nil, err } - if onDemandFeatureViewProtos, ok := r.cachedOnDemandFeatureViews[project]; !ok { + if cachedOnDemandFeatureViews, ok := r.cachedOnDemandFeatureViews[project]; !ok { return []*OnDemandFeatureView{}, nil } else { - onDemandFeatureViews := make([]*OnDemandFeatureView, len(onDemandFeatureViewProtos)) + onDemandFeatureViews := make([]*OnDemandFeatureView, len(cachedOnDemandFeatureViews)) index := 0 - for _, onDemandFeatureViewProto := range onDemandFeatureViewProtos { + for _, onDemandFeatureViewProto := range cachedOnDemandFeatureViews { onDemandFeatureViews[index] = NewOnDemandFeatureViewFromProto(onDemandFeatureViewProto) index += 1 } @@ -253,12 +253,12 @@ func (r *Registry) listRequestFeatureViews(project string, allowCache bool) ([]* if err != nil { return nil, err } - if requestFeatureViewProtos, ok := r.cachedRequestFeatureViews[project]; !ok { + if cachedRequestFeatureViews, ok := r.cachedRequestFeatureViews[project]; !ok { return []*RequestFeatureView{}, nil } else { - requestFeatureViews := make([]*RequestFeatureView, len(requestFeatureViewProtos)) + requestFeatureViews := make([]*RequestFeatureView, len(cachedRequestFeatureViews)) index := 0 - for _, requestFeatureViewProto := range requestFeatureViewProtos { + for _, requestFeatureViewProto := range cachedRequestFeatureViews { requestFeatureViews[index] = NewRequestFeatureViewFromProto(requestFeatureViewProto) index += 1 } @@ -271,11 +271,11 @@ func (r *Registry) getEntity(project, entityName string, allowCache bool) (*Enti if err != nil { return nil, err } - if entities, ok := r.cachedEntities[project]; !ok { - return nil, fmt.Errorf("project %s not found in getEntity", project) + if cachedEntities, ok := r.cachedEntities[project]; !ok { + return nil, fmt.Errorf("no cached entities found for project %s", project) } else { - if entity, ok := entities[entityName]; !ok { - return nil, fmt.Errorf("entity %s not found inside project %s", entityName, project) + if entity, ok := cachedEntities[entityName]; !ok { + return nil, fmt.Errorf("no cached entity %s found for project %s", entityName, project) } else { return NewEntityFromProto(entity), nil } @@ -287,11 +287,11 @@ func (r *Registry) getFeatureView(project, featureViewName string, allowCache bo if err != nil { return nil, err } - if featureViews, ok := r.cachedFeatureViews[project]; !ok { - return nil, fmt.Errorf("project %s not found in getFeatureView", project) + if cachedFeatureViews, ok := r.cachedFeatureViews[project]; !ok { + return nil, fmt.Errorf("no cached feature views found for project %s", project) } else { - if featureViewProto, ok := featureViews[featureViewName]; !ok { - return nil, fmt.Errorf("featureView %s not found inside project %s", featureViewName, project) + if featureViewProto, ok := cachedFeatureViews[featureViewName]; !ok { + return nil, fmt.Errorf("no cached feature view %s found for project %s", featureViewName, project) } else { return NewFeatureViewFromProto(featureViewProto), nil } @@ -303,11 +303,11 @@ func (r *Registry) getFeatureService(project, featureServiceName string, allowCa if err != nil { return nil, err } - if featureServices, ok := r.cachedFeatureServices[project]; !ok { - return nil, fmt.Errorf("project %s not found in getFeatureService", project) + if cachedFeatureServices, ok := r.cachedFeatureServices[project]; !ok { + return nil, fmt.Errorf("no cached feature services found for project %s", project) } else { - if featureServiceProto, ok := featureServices[featureServiceName]; !ok { - return nil, fmt.Errorf("featureService %s not found inside project %s", featureServiceName, project) + if featureServiceProto, ok := cachedFeatureServices[featureServiceName]; !ok { + return nil, fmt.Errorf("no cached feature service %s found for project %s", featureServiceName, project) } else { return NewFeatureServiceFromProto(featureServiceProto), nil } @@ -319,11 +319,11 @@ func (r *Registry) getOnDemandFeatureView(project, onDemandFeatureViewName strin if err != nil { return nil, err } - if onDemandFeatureViews, ok := r.cachedOnDemandFeatureViews[project]; !ok { - return nil, fmt.Errorf("project %s not found in getOnDemandFeatureView", project) + if cachedOnDemandFeatureViews, ok := r.cachedOnDemandFeatureViews[project]; !ok { + return nil, fmt.Errorf("no cached on demand feature views found for project %s", project) } else { - if onDemandFeatureViewProto, ok := onDemandFeatureViews[onDemandFeatureViewName]; !ok { - return nil, fmt.Errorf("onDemandFeatureView %s not found inside project %s", onDemandFeatureViewName, project) + if onDemandFeatureViewProto, ok := cachedOnDemandFeatureViews[onDemandFeatureViewName]; !ok { + return nil, fmt.Errorf("no cached on demand feature view %s found for project %s", onDemandFeatureViewName, project) } else { return NewOnDemandFeatureViewFromProto(onDemandFeatureViewProto), nil } @@ -335,11 +335,11 @@ func (r *Registry) getRequestFeatureView(project, requestFeatureViewName string, if err != nil { return nil, err } - if requestFeatureViews, ok := r.cachedRequestFeatureViews[project]; !ok { - return nil, fmt.Errorf("project %s not found in getRequestFeatureView", project) + if cachedRequestFeatureViews, ok := r.cachedRequestFeatureViews[project]; !ok { + return nil, fmt.Errorf("no cached on request feature views found for project %s", project) } else { - if requestFeatureViewProto, ok := requestFeatureViews[requestFeatureViewName]; !ok { - return nil, fmt.Errorf("requestFeatureView %s not found inside project %s", requestFeatureViewName, project) + if requestFeatureViewProto, ok := cachedRequestFeatureViews[requestFeatureViewName]; !ok { + return nil, fmt.Errorf("no cached request feature view %s found for project %s", requestFeatureViewName, project) } else { return NewRequestFeatureViewFromProto(requestFeatureViewProto), nil } From c9ad601d6a61f03dc7b1ae8da2472ca5ba21de4e Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Tue, 22 Mar 2022 14:31:25 -0700 Subject: [PATCH 5/7] Remove the allowCache flag Signed-off-by: Kevin Zhang --- go/internal/feast/featurestore.go | 28 +++++----- go/internal/feast/registry.go | 86 +++++++++++-------------------- 2 files changed, 44 insertions(+), 70 deletions(-) diff --git a/go/internal/feast/featurestore.go b/go/internal/feast/featurestore.go index ddb0830cf6b..2ad15d295d2 100644 --- a/go/internal/feast/featurestore.go +++ b/go/internal/feast/featurestore.go @@ -93,7 +93,7 @@ func (fs *FeatureStore) GetOnlineFeatures(ctx context.Context, request *serving. return nil, err } - fvs, requestedFeatureViews, requestedRequestFeatureViews, requestedOnDemandFeatureViews, err := fs.getFeatureViewsToUse(features, true, false) + fvs, requestedFeatureViews, requestedRequestFeatureViews, requestedOnDemandFeatureViews, err := fs.getFeatureViewsToUse(features, false) if len(requestedRequestFeatureViews)+len(requestedOnDemandFeatureViews) > 0 { return nil, status.Errorf(codes.InvalidArgument, "on demand feature views are currently not supported") @@ -217,7 +217,7 @@ func (fs *FeatureStore) parseFeatures(kind interface{}) (*Features, error) { return &Features{features: featureList.Features.GetVal(), featureService: nil}, nil } if featureServiceRequest, ok := kind.(*serving.GetOnlineFeaturesRequest_FeatureService); ok { - featureService, err := fs.registry.getFeatureService(fs.config.Project, featureServiceRequest.FeatureService, true) + featureService, err := fs.registry.getFeatureService(fs.config.Project, featureServiceRequest.FeatureService) if err != nil { return nil, err } @@ -254,12 +254,12 @@ func (fs *FeatureStore) getFeatureRefs(features *Features) ([]string, error) { retrieving all feature views. Similar argument to FeatureService applies. */ -func (fs *FeatureStore) getFeatureViewsToUse(features *Features, allowCache, hideDummyEntity bool) (map[string]*FeatureView, map[*FeatureView][]string, []*RequestFeatureView, []*OnDemandFeatureView, error) { +func (fs *FeatureStore) getFeatureViewsToUse(features *Features, hideDummyEntity bool) (map[string]*FeatureView, map[*FeatureView][]string, []*RequestFeatureView, []*OnDemandFeatureView, error) { fvs := make(map[string]*FeatureView) requestFvs := make(map[string]*RequestFeatureView) odFvs := make(map[string]*OnDemandFeatureView) - featureViews, err := fs.listFeatureViews(allowCache, hideDummyEntity) + featureViews, err := fs.listFeatureViews(hideDummyEntity) if err != nil { return nil, nil, nil, nil, err } @@ -267,7 +267,7 @@ func (fs *FeatureStore) getFeatureViewsToUse(features *Features, allowCache, hid fvs[featureView.base.name] = featureView } - requestFeatureViews, err := fs.registry.listRequestFeatureViews(fs.config.Project, allowCache) + requestFeatureViews, err := fs.registry.listRequestFeatureViews(fs.config.Project) if err != nil { return nil, nil, nil, nil, err } @@ -275,7 +275,7 @@ func (fs *FeatureStore) getFeatureViewsToUse(features *Features, allowCache, hid requestFvs[requestFeatureView.base.name] = requestFeatureView } - onDemandFeatureViews, err := fs.registry.listOnDemandFeatureViews(fs.config.Project, allowCache) + onDemandFeatureViews, err := fs.registry.listOnDemandFeatureViews(fs.config.Project) if err != nil { return nil, nil, nil, nil, err } @@ -357,7 +357,7 @@ func (fs *FeatureStore) getEntityMaps(requestedFeatureViews map[*FeatureView][]s var joinKeyMap map[string]string var featureView *FeatureView - entities, err := fs.listEntities(true, false) + entities, err := fs.listEntities(false) if err != nil { return nil, err } @@ -372,7 +372,7 @@ func (fs *FeatureStore) getEntityMaps(requestedFeatureViews map[*FeatureView][]s joinKeyMap = featureView.base.projection.joinKeyMap for entityName = range entityNames { - entity, err := fs.registry.getEntity(fs.config.Project, entityName, true) + entity, err := fs.registry.getEntity(fs.config.Project, entityName) if err != nil { return nil, err } @@ -649,8 +649,8 @@ func (fs *FeatureStore) dropUnneededColumns(onlineFeaturesResponse *serving.GetO } } -func (fs *FeatureStore) listFeatureViews(allowCache, hideDummyEntity bool) ([]*FeatureView, error) { - featureViews, err := fs.registry.listFeatureViews(fs.config.Project, allowCache) +func (fs *FeatureStore) listFeatureViews(hideDummyEntity bool) ([]*FeatureView, error) { + featureViews, err := fs.registry.listFeatureViews(fs.config.Project) if err != nil { return featureViews, err } @@ -662,9 +662,9 @@ func (fs *FeatureStore) listFeatureViews(allowCache, hideDummyEntity bool) ([]*F return featureViews, nil } -func (fs *FeatureStore) listEntities(allowCache, hideDummyEntity bool) ([]*Entity, error) { +func (fs *FeatureStore) listEntities(hideDummyEntity bool) ([]*Entity, error) { - allEntities, err := fs.registry.listEntities(fs.config.Project, allowCache) + allEntities, err := fs.registry.listEntities(fs.config.Project) if err != nil { return allEntities, err } @@ -839,8 +839,8 @@ func (fs *FeatureStore) groupFeatureRefs(requestedFeatureViews map[*FeatureView] return fvFeatures, nil } -func (fs *FeatureStore) getFeatureView(project, featureViewName string, allowCache, hideDummyEntity bool) (*FeatureView, error) { - fv, err := fs.registry.getFeatureView(fs.config.Project, featureViewName, allowCache) +func (fs *FeatureStore) getFeatureView(project, featureViewName string, hideDummyEntity bool) (*FeatureView, error) { + fv, err := fs.registry.getFeatureView(fs.config.Project, featureViewName) if err != nil { return nil, err } diff --git a/go/internal/feast/registry.go b/go/internal/feast/registry.go index 17c44ffc662..41148d9d049 100644 --- a/go/internal/feast/registry.go +++ b/go/internal/feast/registry.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "net/url" + "sync" "time" "github.com/feast-dev/feast/go/protos/feast/core" @@ -32,6 +33,7 @@ type Registry struct { cachedRegistry *core.Registry cachedRegistryProtoCreated time.Time cachedRegistryProtoTtl time.Duration + mu sync.Mutex } func NewRegistry(registryConfig *RegistryConfig, repoPath string) (*Registry, error) { @@ -59,23 +61,33 @@ func NewRegistry(registryConfig *RegistryConfig, repoPath string) (*Registry, er } func (r *Registry) initializeRegistry() { - _, err := r.getRegistryProto(false) + _, err := r.getRegistryProto() if err != nil { registryProto := &core.Registry{RegistrySchemaVersion: REGISTRY_SCHEMA_VERSION} r.registryStore.UpdateRegistryProto(registryProto) - r.load(registryProto) + go r.refreshRegistryOnInterval() + } +} + +func (r *Registry) refreshRegistryOnInterval() { + ticker := time.NewTicker(r.cachedRegistryProtoTtl) + for ; true; <-ticker.C { + err := r.refresh() + if err != nil { + return + } } } // TODO: Add a goroutine and automatically refresh every cachedRegistryProtoTtl func (r *Registry) refresh() error { - _, err := r.getRegistryProto(false) + _, err := r.getRegistryProto() return err } -func (r *Registry) getRegistryProto(allowCache bool) (*core.Registry, error) { +func (r *Registry) getRegistryProto() (*core.Registry, error) { expired := r.cachedRegistry == nil || (r.cachedRegistryProtoTtl > 0 && time.Now().After(r.cachedRegistryProtoCreated.Add(r.cachedRegistryProtoTtl))) - if allowCache && !expired { + if !expired { return r.cachedRegistry, nil } registryProto, err := r.registryStore.GetRegistryProto() @@ -83,11 +95,12 @@ func (r *Registry) getRegistryProto(allowCache bool) (*core.Registry, error) { return registryProto, err } r.load(registryProto) - r.cachedRegistryProtoCreated = time.Now() return registryProto, nil } func (r *Registry) load(registry *core.Registry) { + r.mu.Lock() + defer r.mu.Unlock() r.cachedRegistry = registry r.cachedFeatureServices = make(map[string]map[string]*core.FeatureService) r.cachedEntities = make(map[string]map[string]*core.Entity) @@ -99,6 +112,7 @@ func (r *Registry) load(registry *core.Registry) { r.loadFeatureViews(registry) r.loadOnDemandFeatureViews(registry) r.loadRequestFeatureViews(registry) + r.cachedRegistryProtoCreated = time.Now() } func (r *Registry) loadEntities(registry *core.Registry) { @@ -156,11 +170,7 @@ func (r *Registry) loadRequestFeatureViews(registry *core.Registry) { Returns empty list if project not found */ -func (r *Registry) listEntities(project string, allowCache bool) ([]*Entity, error) { - _, err := r.getRegistryProto(allowCache) - if err != nil { - return nil, err - } +func (r *Registry) listEntities(project string) ([]*Entity, error) { if cachedEntities, ok := r.cachedEntities[project]; !ok { return []*Entity{}, nil } else { @@ -179,11 +189,7 @@ func (r *Registry) listEntities(project string, allowCache bool) ([]*Entity, err Returns empty list if project not found */ -func (r *Registry) listFeatureViews(project string, allowCache bool) ([]*FeatureView, error) { - _, err := r.getRegistryProto(allowCache) - if err != nil { - return nil, err - } +func (r *Registry) listFeatureViews(project string) ([]*FeatureView, error) { if cachedFeatureViews, ok := r.cachedFeatureViews[project]; !ok { return []*FeatureView{}, nil } else { @@ -202,11 +208,7 @@ func (r *Registry) listFeatureViews(project string, allowCache bool) ([]*Feature Returns empty list if project not found */ -func (r *Registry) listFeatureServices(project string, allowCache bool) ([]*FeatureService, error) { - _, err := r.getRegistryProto(allowCache) - if err != nil { - return nil, err - } +func (r *Registry) listFeatureServices(project string) ([]*FeatureService, error) { if cachedFeatureServices, ok := r.cachedFeatureServices[project]; !ok { return []*FeatureService{}, nil } else { @@ -225,11 +227,7 @@ func (r *Registry) listFeatureServices(project string, allowCache bool) ([]*Feat Returns empty list if project not found */ -func (r *Registry) listOnDemandFeatureViews(project string, allowCache bool) ([]*OnDemandFeatureView, error) { - _, err := r.getRegistryProto(allowCache) - if err != nil { - return nil, err - } +func (r *Registry) listOnDemandFeatureViews(project string) ([]*OnDemandFeatureView, error) { if cachedOnDemandFeatureViews, ok := r.cachedOnDemandFeatureViews[project]; !ok { return []*OnDemandFeatureView{}, nil } else { @@ -248,11 +246,7 @@ func (r *Registry) listOnDemandFeatureViews(project string, allowCache bool) ([] Returns empty list if project not found */ -func (r *Registry) listRequestFeatureViews(project string, allowCache bool) ([]*RequestFeatureView, error) { - _, err := r.getRegistryProto(allowCache) - if err != nil { - return nil, err - } +func (r *Registry) listRequestFeatureViews(project string) ([]*RequestFeatureView, error) { if cachedRequestFeatureViews, ok := r.cachedRequestFeatureViews[project]; !ok { return []*RequestFeatureView{}, nil } else { @@ -266,11 +260,7 @@ func (r *Registry) listRequestFeatureViews(project string, allowCache bool) ([]* } } -func (r *Registry) getEntity(project, entityName string, allowCache bool) (*Entity, error) { - _, err := r.getRegistryProto(allowCache) - if err != nil { - return nil, err - } +func (r *Registry) getEntity(project, entityName string) (*Entity, error) { if cachedEntities, ok := r.cachedEntities[project]; !ok { return nil, fmt.Errorf("no cached entities found for project %s", project) } else { @@ -282,11 +272,7 @@ func (r *Registry) getEntity(project, entityName string, allowCache bool) (*Enti } } -func (r *Registry) getFeatureView(project, featureViewName string, allowCache bool) (*FeatureView, error) { - _, err := r.getRegistryProto(allowCache) - if err != nil { - return nil, err - } +func (r *Registry) getFeatureView(project, featureViewName string) (*FeatureView, error) { if cachedFeatureViews, ok := r.cachedFeatureViews[project]; !ok { return nil, fmt.Errorf("no cached feature views found for project %s", project) } else { @@ -298,11 +284,7 @@ func (r *Registry) getFeatureView(project, featureViewName string, allowCache bo } } -func (r *Registry) getFeatureService(project, featureServiceName string, allowCache bool) (*FeatureService, error) { - _, err := r.getRegistryProto(allowCache) - if err != nil { - return nil, err - } +func (r *Registry) getFeatureService(project, featureServiceName string) (*FeatureService, error) { if cachedFeatureServices, ok := r.cachedFeatureServices[project]; !ok { return nil, fmt.Errorf("no cached feature services found for project %s", project) } else { @@ -314,11 +296,7 @@ func (r *Registry) getFeatureService(project, featureServiceName string, allowCa } } -func (r *Registry) getOnDemandFeatureView(project, onDemandFeatureViewName string, allowCache bool) (*OnDemandFeatureView, error) { - _, err := r.getRegistryProto(allowCache) - if err != nil { - return nil, err - } +func (r *Registry) getOnDemandFeatureView(project, onDemandFeatureViewName string) (*OnDemandFeatureView, error) { if cachedOnDemandFeatureViews, ok := r.cachedOnDemandFeatureViews[project]; !ok { return nil, fmt.Errorf("no cached on demand feature views found for project %s", project) } else { @@ -330,11 +308,7 @@ func (r *Registry) getOnDemandFeatureView(project, onDemandFeatureViewName strin } } -func (r *Registry) getRequestFeatureView(project, requestFeatureViewName string, allowCache bool) (*RequestFeatureView, error) { - _, err := r.getRegistryProto(allowCache) - if err != nil { - return nil, err - } +func (r *Registry) getRequestFeatureView(project, requestFeatureViewName string) (*RequestFeatureView, error) { if cachedRequestFeatureViews, ok := r.cachedRequestFeatureViews[project]; !ok { return nil, fmt.Errorf("no cached on request feature views found for project %s", project) } else { From 110a1c3071601182a55a6008aabf64a30e9dabc0 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Wed, 23 Mar 2022 10:12:10 -0700 Subject: [PATCH 6/7] Address review issues Signed-off-by: Kevin Zhang --- go/internal/feast/featurestore.go | 13 ++++++++----- go/internal/feast/registry.go | 24 ++++++++++++------------ 2 files changed, 20 insertions(+), 17 deletions(-) diff --git a/go/internal/feast/featurestore.go b/go/internal/feast/featurestore.go index 2ad15d295d2..4465d344147 100644 --- a/go/internal/feast/featurestore.go +++ b/go/internal/feast/featurestore.go @@ -654,14 +654,17 @@ func (fs *FeatureStore) listFeatureViews(hideDummyEntity bool) ([]*FeatureView, if err != nil { return featureViews, err } - for _, featureView := range featureViews { - if _, ok := featureView.entities[DUMMY_ENTITY_NAME]; ok && hideDummyEntity { - featureView.entities = make(map[string]struct{}) - } - } return featureViews, nil } +func (fs *FeatureStore) listRequestFeatureViews() ([]*RequestFeatureView, error) { + requestFeatureViews, err := fs.registry.listRequestFeatureViews(fs.config.Project) + if err != nil { + return requestFeatureViews, err + } + return requestFeatureViews, nil +} + func (fs *FeatureStore) listEntities(hideDummyEntity bool) ([]*Entity, error) { allEntities, err := fs.registry.listEntities(fs.config.Project) diff --git a/go/internal/feast/registry.go b/go/internal/feast/registry.go index 41148d9d049..8963703f514 100644 --- a/go/internal/feast/registry.go +++ b/go/internal/feast/registry.go @@ -24,16 +24,16 @@ var REGISTRY_STORE_CLASS_FOR_SCHEME map[string]string = map[string]string{ */ type Registry struct { - registryStore RegistryStore - cachedFeatureServices map[string]map[string]*core.FeatureService - cachedEntities map[string]map[string]*core.Entity - cachedFeatureViews map[string]map[string]*core.FeatureView - cachedOnDemandFeatureViews map[string]map[string]*core.OnDemandFeatureView - cachedRequestFeatureViews map[string]map[string]*core.RequestFeatureView - cachedRegistry *core.Registry - cachedRegistryProtoCreated time.Time - cachedRegistryProtoTtl time.Duration - mu sync.Mutex + registryStore RegistryStore + cachedFeatureServices map[string]map[string]*core.FeatureService + cachedEntities map[string]map[string]*core.Entity + cachedFeatureViews map[string]map[string]*core.FeatureView + cachedOnDemandFeatureViews map[string]map[string]*core.OnDemandFeatureView + cachedRequestFeatureViews map[string]map[string]*core.RequestFeatureView + cachedRegistry *core.Registry + cachedRegistryProtoLastUpdated time.Time + cachedRegistryProtoTtl time.Duration + mu sync.Mutex } func NewRegistry(registryConfig *RegistryConfig, repoPath string) (*Registry, error) { @@ -86,7 +86,7 @@ func (r *Registry) refresh() error { } func (r *Registry) getRegistryProto() (*core.Registry, error) { - expired := r.cachedRegistry == nil || (r.cachedRegistryProtoTtl > 0 && time.Now().After(r.cachedRegistryProtoCreated.Add(r.cachedRegistryProtoTtl))) + expired := r.cachedRegistry == nil || (r.cachedRegistryProtoTtl > 0 && time.Now().After(r.cachedRegistryProtoLastUpdated.Add(r.cachedRegistryProtoTtl))) if !expired { return r.cachedRegistry, nil } @@ -112,7 +112,7 @@ func (r *Registry) load(registry *core.Registry) { r.loadFeatureViews(registry) r.loadOnDemandFeatureViews(registry) r.loadRequestFeatureViews(registry) - r.cachedRegistryProtoCreated = time.Now() + r.cachedRegistryProtoLastUpdated = time.Now() } func (r *Registry) loadEntities(registry *core.Registry) { From a3b8214bac0fc70b19b869ca2f2e9e0bdc2a6f39 Mon Sep 17 00:00:00 2001 From: Kevin Zhang Date: Wed, 23 Mar 2022 11:45:03 -0700 Subject: [PATCH 7/7] Fix dumb mistake Signed-off-by: Kevin Zhang --- go/internal/feast/featurestore.go | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/go/internal/feast/featurestore.go b/go/internal/feast/featurestore.go index 4465d344147..11be8ee94ce 100644 --- a/go/internal/feast/featurestore.go +++ b/go/internal/feast/featurestore.go @@ -658,11 +658,7 @@ func (fs *FeatureStore) listFeatureViews(hideDummyEntity bool) ([]*FeatureView, } func (fs *FeatureStore) listRequestFeatureViews() ([]*RequestFeatureView, error) { - requestFeatureViews, err := fs.registry.listRequestFeatureViews(fs.config.Project) - if err != nil { - return requestFeatureViews, err - } - return requestFeatureViews, nil + return fs.registry.listRequestFeatureViews(fs.config.Project) } func (fs *FeatureStore) listEntities(hideDummyEntity bool) ([]*Entity, error) {