Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add range query onlineserving helper methods #192

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
642 changes: 642 additions & 0 deletions go/internal/feast/onlineserving/serving.go

Large diffs are not rendered by default.

296 changes: 294 additions & 2 deletions go/internal/feast/onlineserving/serving_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
package onlineserving

import (
"testing"

"github.com/apache/arrow/go/v17/arrow/memory"
"github.com/feast-dev/feast/go/internal/feast/onlinestore"
"github.com/feast-dev/feast/go/protos/feast/serving"
"github.com/golang/protobuf/ptypes/timestamp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
"testing"
"time"

"github.com/feast-dev/feast/go/internal/feast/model"
"github.com/feast-dev/feast/go/protos/feast/core"
Expand Down Expand Up @@ -367,3 +372,290 @@ func TestUnpackFeatureViewsByReferences(t *testing.T) {

assertCorrectUnpacking(t, fvs, sortedFvs, odfvs, err)
}

func TestValidateSortKeyFilters(t *testing.T) {
sortKey1 := createSortKey("timestamp", core.SortOrder_DESC, types.ValueType_UNIX_TIMESTAMP)
sortKey2 := createSortKey("price", core.SortOrder_ASC, types.ValueType_DOUBLE)
sortKey3 := createSortKey("name", core.SortOrder_ASC, types.ValueType_STRING)

sfv1 := createSortedFeatureView("sfv1", []string{"driver"},
[]*core.SortKey{sortKey1, sortKey2},
createFeature("f1", types.ValueType_DOUBLE))

sfv2 := createSortedFeatureView("sfv2", []string{"customer"},
[]*core.SortKey{sortKey3},
createFeature("f2", types.ValueType_STRING))

sortedViews := []*SortedFeatureViewAndRefs{
{View: sfv1, FeatureRefs: []string{"f1"}},
{View: sfv2, FeatureRefs: []string{"f2"}},
}

validFilters := []*serving.SortKeyFilter{
{
SortKeyName: "timestamp",
RangeStart: &types.Value{Val: &types.Value_UnixTimestampVal{UnixTimestampVal: 1640995200}},
RangeEnd: &types.Value{Val: &types.Value_UnixTimestampVal{UnixTimestampVal: 1672531200}},
StartInclusive: true,
EndInclusive: false,
},
{
SortKeyName: "price",
RangeStart: &types.Value{Val: &types.Value_DoubleVal{DoubleVal: 10.5}},
RangeEnd: &types.Value{Val: &types.Value_DoubleVal{DoubleVal: 50.0}},
StartInclusive: true,
EndInclusive: true,
},
}

err := ValidateSortKeyFilters(validFilters, sortedViews)
assert.NoError(t, err, "Valid filters should not produce an error")

nonExistentKeyFilter := []*serving.SortKeyFilter{
{
SortKeyName: "non_existent_key",
RangeStart: &types.Value{Val: &types.Value_Int64Val{Int64Val: 123}},
},
}

err = ValidateSortKeyFilters(nonExistentKeyFilter, sortedViews)
assert.Error(t, err, "Non-existent sort key should produce an error")
assert.Contains(t, err.Error(), "not found in any of the requested sorted feature views")

typeMismatchFilter := []*serving.SortKeyFilter{
{
SortKeyName: "timestamp",
RangeStart: &types.Value{Val: &types.Value_StringVal{StringVal: "2022-01-01"}},
},
}

err = ValidateSortKeyFilters(typeMismatchFilter, sortedViews)
assert.Error(t, err, "Type mismatch should produce an error")
assert.Contains(t, err.Error(), "has incompatible type")
}

func TestGroupSortedFeatureRefs(t *testing.T) {
sortKey1 := createSortKey("timestamp", core.SortOrder_DESC, types.ValueType_UNIX_TIMESTAMP)
viewA := createSortedFeatureView("viewA", []string{"driver", "customer"},
[]*core.SortKey{sortKey1},
createFeature("featureA", types.ValueType_DOUBLE),
createFeature("featureB", types.ValueType_DOUBLE))

viewB := createSortedFeatureView("viewB", []string{"driver", "customer"},
[]*core.SortKey{sortKey1},
createFeature("featureC", types.ValueType_DOUBLE),
createFeature("featureD", types.ValueType_DOUBLE))

viewC := createSortedFeatureView("viewC", []string{"driver"},
[]*core.SortKey{sortKey1},
createFeature("featureE", types.ValueType_DOUBLE))

viewD := createSortedFeatureView("viewD", []string{"customer"},
[]*core.SortKey{sortKey1},
createFeature("featureF", types.ValueType_DOUBLE))

if viewA.Base != nil && viewA.Base.Projection == nil {
viewA.Base.Projection = &model.FeatureViewProjection{
NameAlias: "aliasViewA",
}
}

sortKeyFilters := []*serving.SortKeyFilter{
{
SortKeyName: "timestamp",
RangeStart: &types.Value{Val: &types.Value_UnixTimestampVal{UnixTimestampVal: 1640995200}},
RangeEnd: &types.Value{Val: &types.Value_UnixTimestampVal{UnixTimestampVal: 1672531200}},
StartInclusive: true,
EndInclusive: false,
},
}

refGroups, err := GroupSortedFeatureRefs(
[]*SortedFeatureViewAndRefs{
{View: viewA, FeatureRefs: []string{"featureA", "featureB"}},
{View: viewB, FeatureRefs: []string{"featureC", "featureD"}},
{View: viewC, FeatureRefs: []string{"featureE"}},
{View: viewD, FeatureRefs: []string{"featureF"}},
},
map[string]*types.RepeatedValue{
"driver_id": {Val: []*types.Value{
{Val: &types.Value_Int32Val{Int32Val: 0}},
{Val: &types.Value_Int32Val{Int32Val: 0}},
{Val: &types.Value_Int32Val{Int32Val: 1}},
{Val: &types.Value_Int32Val{Int32Val: 1}},
{Val: &types.Value_Int32Val{Int32Val: 1}},
}},
"customer_id": {Val: []*types.Value{
{Val: &types.Value_Int32Val{Int32Val: 1}},
{Val: &types.Value_Int32Val{Int32Val: 2}},
{Val: &types.Value_Int32Val{Int32Val: 3}},
{Val: &types.Value_Int32Val{Int32Val: 3}},
{Val: &types.Value_Int32Val{Int32Val: 4}},
}},
},
map[string]string{
"driver": "driver_id",
"customer": "customer_id",
},
sortKeyFilters,
false,
10,
true,
)

t.Logf("GroupSortedFeatureRefs returned %d groups", len(refGroups))
for i, group := range refGroups {
t.Logf("Group %d:", i)
t.Logf(" Features: %v", group.FeatureNames)
t.Logf(" AliasedNames: %v", group.AliasedFeatureNames)
}

assert.NoError(t, err)
assert.NotEmpty(t, refGroups, "Should return at least one group")

for _, group := range refGroups {
assert.Equal(t, sortKeyFilters, group.SortKeyFilters)
assert.Equal(t, false, group.ReverseSortOrder)
assert.Equal(t, int32(10), group.Limit)
}

featureAFound := false
featureCFound := false
featureEFound := false

for _, group := range refGroups {
for _, feature := range group.FeatureNames {
if feature == "featureA" {
featureAFound = true
}
if feature == "featureC" {
featureCFound = true
}
if feature == "featureE" {
featureEFound = true
}
}
}

assert.True(t, featureAFound, "Feature A should be present in results")
assert.True(t, featureCFound, "Feature C should be present in results")
assert.True(t, featureEFound, "Feature E should be present in results")
}

func TestEntitiesToRangeFeatureVectors(t *testing.T) {
entityColumns := map[string]*types.RepeatedValue{
"driver_id": {Val: []*types.Value{
{Val: &types.Value_Int32Val{Int32Val: 1}},
{Val: &types.Value_Int32Val{Int32Val: 2}},
{Val: &types.Value_Int32Val{Int32Val: 3}},
}},
"customer_id": {Val: []*types.Value{
{Val: &types.Value_StringVal{StringVal: "A"}},
{Val: &types.Value_StringVal{StringVal: "B"}},
{Val: &types.Value_StringVal{StringVal: "C"}},
}},
}

arrowAllocator := memory.NewGoAllocator()
numRows := 3

vectors, err := EntitiesToRangeFeatureVectors(entityColumns, arrowAllocator, numRows)

assert.NoError(t, err)
assert.Len(t, vectors, 2)

var driverVector, customerVector *RangeFeatureVector
for _, vector := range vectors {
if vector.Name == "driver_id" {
driverVector = vector
} else if vector.Name == "customer_id" {
customerVector = vector
}
}

require.NotNil(t, driverVector)
assert.Equal(t, "driver_id", driverVector.Name)
assert.Len(t, driverVector.RangeStatuses, numRows)
assert.Len(t, driverVector.RangeTimestamps, numRows)

for i := 0; i < numRows; i++ {
assert.Len(t, driverVector.RangeStatuses[i], 1)
assert.Equal(t, serving.FieldStatus_PRESENT, driverVector.RangeStatuses[i][0])
assert.Len(t, driverVector.RangeTimestamps[i], 1)
}

require.NotNil(t, customerVector)
assert.Equal(t, "customer_id", customerVector.Name)
assert.Len(t, customerVector.RangeStatuses, numRows)
assert.Len(t, customerVector.RangeTimestamps, numRows)

assert.NotNil(t, driverVector.RangeValues)
assert.NotNil(t, customerVector.RangeValues)

driverVector.RangeValues.Release()
customerVector.RangeValues.Release()
}

func TestTransposeRangeFeatureRowsIntoColumns(t *testing.T) {
arrowAllocator := memory.NewGoAllocator()
numRows := 2

sortKey1 := createSortKey("timestamp", core.SortOrder_DESC, types.ValueType_UNIX_TIMESTAMP)
sfv := createSortedFeatureView("testView", []string{"driver"}, []*core.SortKey{sortKey1},
createFeature("f1", types.ValueType_DOUBLE))

sortedViews := []*SortedFeatureViewAndRefs{
{View: sfv, FeatureRefs: []string{"f1"}},
}

groupRef := &GroupedRangeFeatureRefs{
FeatureNames: []string{"f1"},
FeatureViewNames: []string{"testView"},
AliasedFeatureNames: []string{"testView__f1"},
Indices: [][]int{{0}, {1}},
}

nowTime := time.Now()
yesterdayTime := nowTime.Add(-24 * time.Hour)

featureData := [][]onlinestore.RangeFeatureData{
{
{
FeatureView: "testView",
FeatureName: "f1",
Values: []interface{}{42.5, 43.2},
EventTimestamps: []timestamp.Timestamp{
{Seconds: nowTime.Unix()},
{Seconds: yesterdayTime.Unix()},
},
},
},
{
{
FeatureView: "testView",
FeatureName: "f1",
Values: []interface{}{99.9},
EventTimestamps: []timestamp.Timestamp{
{Seconds: nowTime.Unix()},
},
},
},
}

vectors, err := TransposeRangeFeatureRowsIntoColumns(featureData, groupRef, sortedViews, arrowAllocator, numRows)

assert.NoError(t, err)
assert.Len(t, vectors, 1)
vector := vectors[0]
assert.Equal(t, "testView__f1", vector.Name)
assert.Len(t, vector.RangeStatuses, numRows)
assert.Len(t, vector.RangeTimestamps, numRows)
assert.Len(t, vector.RangeStatuses[0], 2)
assert.Len(t, vector.RangeTimestamps[0], 2)
assert.Equal(t, serving.FieldStatus_PRESENT, vector.RangeStatuses[0][0])
assert.Len(t, vector.RangeStatuses[1], 1)
assert.Len(t, vector.RangeTimestamps[1], 1)
assert.Equal(t, serving.FieldStatus_PRESENT, vector.RangeStatuses[1][0])
assert.NotNil(t, vector.RangeValues)
vector.RangeValues.Release()
}
4 changes: 4 additions & 0 deletions go/internal/feast/onlinestore/cassandraonlinestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,6 +685,10 @@ func (c *CassandraOnlineStore) OnlineRead(ctx context.Context, entityKeys []*typ
}
}

func (c *CassandraOnlineStore) OnlineReadRange(ctx context.Context, entityKeys []*types.EntityKey, featureViewNames []string, featureNames []string, sortKeyFilters []*serving.SortKeyFilter, reverseSortOrder bool, limit int32) ([][]RangeFeatureData, error) {
return nil, errors.New("not implemented")
}

func (c *CassandraOnlineStore) Destruct() {
c.session.Close()
}
9 changes: 9 additions & 0 deletions go/internal/feast/onlinestore/onlinestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ type FeatureData struct {
Value types.Value
}

type RangeFeatureData struct {
FeatureView string
FeatureName string
Values []interface{}
Statuses []serving.FieldStatus
EventTimestamps []timestamp.Timestamp
}

type OnlineStore interface {
// OnlineRead reads multiple features (specified in featureReferences) for multiple
// entity keys (specified in entityKeys) and returns an array of array of features,
Expand All @@ -36,6 +44,7 @@ type OnlineStore interface {
// => allocate memory for each field once in OnlineRead
// and reuse them in GetOnlineFeaturesResponse?
OnlineRead(ctx context.Context, entityKeys []*types.EntityKey, featureViewNames []string, featureNames []string) ([][]FeatureData, error)
OnlineReadRange(ctx context.Context, entityRows []*types.EntityKey, featureViewNames []string, featureNames []string, sortKeyFilters []*serving.SortKeyFilter, reverseSortOrder bool, limit int32) ([][]RangeFeatureData, error)
// Destruct must be call once user is done using OnlineStore
// This is to comply with the Connector since we have to close the plugin
Destruct()
Expand Down
4 changes: 4 additions & 0 deletions go/internal/feast/onlinestore/redisonlinestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,10 @@ func (r *RedisOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types.E
return results, nil
}

func (r *RedisOnlineStore) OnlineReadRange(ctx context.Context, entityKeys []*types.EntityKey, featureViewNames []string, featureNames []string, sortKeyFilters []*serving.SortKeyFilter, reverseSortOrder bool, limit int32) ([][]RangeFeatureData, error) {
return nil, errors.New("not implemented")
}

// Dummy destruct function to conform with plugin OnlineStore interface
func (r *RedisOnlineStore) Destruct() {

Expand Down
4 changes: 4 additions & 0 deletions go/internal/feast/onlinestore/sqliteonlinestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@ func (s *SqliteOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types.
return results, nil
}

func (s *SqliteOnlineStore) OnlineReadRange(ctx context.Context, entityRows []*types.EntityKey, featureViewNames []string, featureNames []string, sortKeyFilters []*serving.SortKeyFilter, reverseSortOrder bool, limit int32) ([][]RangeFeatureData, error) {
return nil, errors.New("not implemented")
}

// Gets a sqlite connection and sets it to the online store and also returns a pointer to the connection.
func (s *SqliteOnlineStore) getConnection() (*sql.DB, error) {
s.db_mu.Lock()
Expand Down
Loading
Loading