diff --git a/protocol/data_store.pb.go b/protocol/data_store.pb.go index 726e9aea5..906291f77 100644 --- a/protocol/data_store.pb.go +++ b/protocol/data_store.pb.go @@ -14,8 +14,8 @@ // Code generated by protoc-gen-go. DO NOT EDIT. // versions: -// protoc-gen-go v1.35.2 -// protoc v5.29.0 +// protoc-gen-go v1.35.1 +// protoc v5.28.3 // source: data_store.proto package protocol @@ -180,9 +180,11 @@ type ScanOptions struct { BeginUserId *string `protobuf:"bytes,1,opt,name=begin_user_id,json=beginUserId,proto3,oneof" json:"begin_user_id,omitempty"` EndUserId *string `protobuf:"bytes,2,opt,name=end_user_id,json=endUserId,proto3,oneof" json:"end_user_id,omitempty"` - BeginTime *timestamppb.Timestamp `protobuf:"bytes,3,opt,name=begin_time,json=beginTime,proto3,oneof" json:"begin_time,omitempty"` - EndTime *timestamppb.Timestamp `protobuf:"bytes,4,opt,name=end_time,json=endTime,proto3,oneof" json:"end_time,omitempty"` - FeedbackTypes []string `protobuf:"bytes,5,rep,name=feedback_types,json=feedbackTypes,proto3" json:"feedback_types,omitempty"` + BeginItemId *string `protobuf:"bytes,3,opt,name=begin_item_id,json=beginItemId,proto3,oneof" json:"begin_item_id,omitempty"` + EndItemId *string `protobuf:"bytes,4,opt,name=end_item_id,json=endItemId,proto3,oneof" json:"end_item_id,omitempty"` + BeginTime *timestamppb.Timestamp `protobuf:"bytes,5,opt,name=begin_time,json=beginTime,proto3,oneof" json:"begin_time,omitempty"` + EndTime *timestamppb.Timestamp `protobuf:"bytes,6,opt,name=end_time,json=endTime,proto3,oneof" json:"end_time,omitempty"` + FeedbackTypes []string `protobuf:"bytes,7,rep,name=feedback_types,json=feedbackTypes,proto3" json:"feedback_types,omitempty"` } func (x *ScanOptions) Reset() { @@ -229,6 +231,20 @@ func (x *ScanOptions) GetEndUserId() string { return "" } +func (x *ScanOptions) GetBeginItemId() string { + if x != nil && x.BeginItemId != nil { + return *x.BeginItemId + } + return "" +} + +func (x *ScanOptions) GetEndItemId() string { + if x != nil && x.EndItemId != nil { + return *x.EndItemId + } + return "" +} + func (x *ScanOptions) GetBeginTime() *timestamppb.Timestamp { if x != nil { return x.BeginTime @@ -2073,25 +2089,32 @@ var file_data_store_proto_rawDesc = []byte{ 0x01, 0x28, 0x09, 0x48, 0x02, 0x52, 0x07, 0x63, 0x6f, 0x6d, 0x6d, 0x65, 0x6e, 0x74, 0x88, 0x01, 0x01, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x69, 0x73, 0x5f, 0x68, 0x69, 0x64, 0x64, 0x65, 0x6e, 0x42, 0x0c, 0x0a, 0x0a, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x42, 0x0a, 0x0a, - 0x08, 0x5f, 0x63, 0x6f, 0x6d, 0x6d, 0x65, 0x6e, 0x74, 0x22, 0xbc, 0x02, 0x0a, 0x0b, 0x53, 0x63, + 0x08, 0x5f, 0x63, 0x6f, 0x6d, 0x6d, 0x65, 0x6e, 0x74, 0x22, 0xac, 0x03, 0x0a, 0x0b, 0x53, 0x63, 0x61, 0x6e, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x27, 0x0a, 0x0d, 0x62, 0x65, 0x67, 0x69, 0x6e, 0x5f, 0x75, 0x73, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x48, 0x00, 0x52, 0x0b, 0x62, 0x65, 0x67, 0x69, 0x6e, 0x55, 0x73, 0x65, 0x72, 0x49, 0x64, 0x88, 0x01, 0x01, 0x12, 0x23, 0x0a, 0x0b, 0x65, 0x6e, 0x64, 0x5f, 0x75, 0x73, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x48, 0x01, 0x52, 0x09, 0x65, 0x6e, 0x64, 0x55, 0x73, - 0x65, 0x72, 0x49, 0x64, 0x88, 0x01, 0x01, 0x12, 0x3e, 0x0a, 0x0a, 0x62, 0x65, 0x67, 0x69, 0x6e, - 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, - 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, - 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x48, 0x02, 0x52, 0x09, 0x62, 0x65, 0x67, 0x69, 0x6e, - 0x54, 0x69, 0x6d, 0x65, 0x88, 0x01, 0x01, 0x12, 0x3a, 0x0a, 0x08, 0x65, 0x6e, 0x64, 0x5f, 0x74, - 0x69, 0x6d, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x65, 0x72, 0x49, 0x64, 0x88, 0x01, 0x01, 0x12, 0x27, 0x0a, 0x0d, 0x62, 0x65, 0x67, 0x69, 0x6e, + 0x5f, 0x69, 0x74, 0x65, 0x6d, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x48, 0x02, + 0x52, 0x0b, 0x62, 0x65, 0x67, 0x69, 0x6e, 0x49, 0x74, 0x65, 0x6d, 0x49, 0x64, 0x88, 0x01, 0x01, + 0x12, 0x23, 0x0a, 0x0b, 0x65, 0x6e, 0x64, 0x5f, 0x69, 0x74, 0x65, 0x6d, 0x5f, 0x69, 0x64, 0x18, + 0x04, 0x20, 0x01, 0x28, 0x09, 0x48, 0x03, 0x52, 0x09, 0x65, 0x6e, 0x64, 0x49, 0x74, 0x65, 0x6d, + 0x49, 0x64, 0x88, 0x01, 0x01, 0x12, 0x3e, 0x0a, 0x0a, 0x62, 0x65, 0x67, 0x69, 0x6e, 0x5f, 0x74, + 0x69, 0x6d, 0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, - 0x73, 0x74, 0x61, 0x6d, 0x70, 0x48, 0x03, 0x52, 0x07, 0x65, 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, - 0x88, 0x01, 0x01, 0x12, 0x25, 0x0a, 0x0e, 0x66, 0x65, 0x65, 0x64, 0x62, 0x61, 0x63, 0x6b, 0x5f, - 0x74, 0x79, 0x70, 0x65, 0x73, 0x18, 0x05, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0d, 0x66, 0x65, 0x65, - 0x64, 0x62, 0x61, 0x63, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x73, 0x42, 0x10, 0x0a, 0x0e, 0x5f, 0x62, - 0x65, 0x67, 0x69, 0x6e, 0x5f, 0x75, 0x73, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x42, 0x0e, 0x0a, 0x0c, - 0x5f, 0x65, 0x6e, 0x64, 0x5f, 0x75, 0x73, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x42, 0x0d, 0x0a, 0x0b, + 0x73, 0x74, 0x61, 0x6d, 0x70, 0x48, 0x04, 0x52, 0x09, 0x62, 0x65, 0x67, 0x69, 0x6e, 0x54, 0x69, + 0x6d, 0x65, 0x88, 0x01, 0x01, 0x12, 0x3a, 0x0a, 0x08, 0x65, 0x6e, 0x64, 0x5f, 0x74, 0x69, 0x6d, + 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x1a, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x54, 0x69, 0x6d, 0x65, 0x73, 0x74, + 0x61, 0x6d, 0x70, 0x48, 0x05, 0x52, 0x07, 0x65, 0x6e, 0x64, 0x54, 0x69, 0x6d, 0x65, 0x88, 0x01, + 0x01, 0x12, 0x25, 0x0a, 0x0e, 0x66, 0x65, 0x65, 0x64, 0x62, 0x61, 0x63, 0x6b, 0x5f, 0x74, 0x79, + 0x70, 0x65, 0x73, 0x18, 0x07, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0d, 0x66, 0x65, 0x65, 0x64, 0x62, + 0x61, 0x63, 0x6b, 0x54, 0x79, 0x70, 0x65, 0x73, 0x42, 0x10, 0x0a, 0x0e, 0x5f, 0x62, 0x65, 0x67, + 0x69, 0x6e, 0x5f, 0x75, 0x73, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x42, 0x0e, 0x0a, 0x0c, 0x5f, 0x65, + 0x6e, 0x64, 0x5f, 0x75, 0x73, 0x65, 0x72, 0x5f, 0x69, 0x64, 0x42, 0x10, 0x0a, 0x0e, 0x5f, 0x62, + 0x65, 0x67, 0x69, 0x6e, 0x5f, 0x69, 0x74, 0x65, 0x6d, 0x5f, 0x69, 0x64, 0x42, 0x0e, 0x0a, 0x0c, + 0x5f, 0x65, 0x6e, 0x64, 0x5f, 0x69, 0x74, 0x65, 0x6d, 0x5f, 0x69, 0x64, 0x42, 0x0d, 0x0a, 0x0b, 0x5f, 0x62, 0x65, 0x67, 0x69, 0x6e, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x42, 0x0b, 0x0a, 0x09, 0x5f, 0x65, 0x6e, 0x64, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x22, 0x3f, 0x0a, 0x17, 0x42, 0x61, 0x74, 0x63, 0x68, 0x49, 0x6e, 0x73, 0x65, 0x72, 0x74, 0x49, 0x74, 0x65, 0x6d, 0x73, 0x52, 0x65, 0x71, 0x75, diff --git a/protocol/data_store.proto b/protocol/data_store.proto index 22a73d8af..19f8829dd 100644 --- a/protocol/data_store.proto +++ b/protocol/data_store.proto @@ -38,9 +38,11 @@ message ItemPatch { message ScanOptions { optional string begin_user_id = 1; optional string end_user_id = 2; - optional google.protobuf.Timestamp begin_time = 3; - optional google.protobuf.Timestamp end_time = 4; - repeated string feedback_types = 5; + optional string begin_item_id = 3; + optional string end_item_id = 4; + optional google.protobuf.Timestamp begin_time = 5; + optional google.protobuf.Timestamp end_time = 6; + repeated string feedback_types = 7; } message BatchInsertItemsRequest { diff --git a/storage/data/database.go b/storage/data/database.go index 911ef3bfd..544227eca 100644 --- a/storage/data/database.go +++ b/storage/data/database.go @@ -174,6 +174,8 @@ func (sorter feedbackSorter) Swap(i, j int) { type ScanOptions struct { BeginUserId *string EndUserId *string + BeginItemId *string + EndItemId *string BeginTime *time.Time EndTime *time.Time FeedbackTypes []string @@ -195,6 +197,20 @@ func WithEndUserId(userId string) ScanOption { } } +// WithBeginItemId sets the beginning item id. The beginning item id is included in the result. +func WithBeginItemId(itemId string) ScanOption { + return func(options *ScanOptions) { + options.BeginItemId = &itemId + } +} + +// WithEndItemId sets the end item id. The end item id is included in the result. +func WithEndItemId(itemId string) ScanOption { + return func(options *ScanOptions) { + options.EndItemId = &itemId + } +} + // WithBeginTime sets the begin time. The begin time is included in the result. func WithBeginTime(t time.Time) ScanOption { return func(options *ScanOptions) { diff --git a/storage/data/database_test.go b/storage/data/database_test.go index ac9fcad1a..5d2a8d5d1 100644 --- a/storage/data/database_test.go +++ b/storage/data/database_test.go @@ -271,6 +271,8 @@ func (suite *baseTestSuite) TestFeedback() { suite.Empty(feedbackFromStream) feedbackFromStream = suite.getFeedbackStream(ctx, 3, WithBeginUserId("1"), WithEndUserId("3"), WithEndTime(time.Now()), WithFeedbackTypes(positiveFeedbackType)) suite.Equal(feedback[1:4], feedbackFromStream) + feedbackFromStream = suite.getFeedbackStream(ctx, 3, WithBeginItemId("2"), WithEndItemId("6"), WithEndTime(time.Now()), WithFeedbackTypes(positiveFeedbackType)) + suite.Equal(feedback[1:4], feedbackFromStream) // Get items err = suite.Database.Optimize() suite.NoError(err) diff --git a/storage/data/mongodb.go b/storage/data/mongodb.go index 3848e6b0a..1f953cb73 100644 --- a/storage/data/mongodb.go +++ b/storage/data/mongodb.go @@ -726,6 +726,16 @@ func (db *MongoDB) GetFeedbackStream(ctx context.Context, batchSize int, scanOpt } filter["feedbackkey.userid"] = userIdConditions } + if scan.BeginItemId != nil || scan.EndItemId != nil { + itemIdConditions := bson.M{} + if scan.BeginItemId != nil { + itemIdConditions["$gte"] = *scan.BeginItemId + } + if scan.EndItemId != nil { + itemIdConditions["$lte"] = *scan.EndItemId + } + filter["feedbackkey.itemid"] = itemIdConditions + } r, err := c.Find(ctx, filter, opt) if err != nil { diff --git a/storage/data/proxy.go b/storage/data/proxy.go index 12eaf29c0..38989281c 100644 --- a/storage/data/proxy.go +++ b/storage/data/proxy.go @@ -432,6 +432,12 @@ func (p *ProxyServer) GetFeedbackStream(in *protocol.GetFeedbackStreamRequest, s if in.ScanOptions.EndUserId != nil { opts = append(opts, WithEndUserId(*in.ScanOptions.EndUserId)) } + if in.ScanOptions.BeginItemId != nil { + opts = append(opts, WithBeginItemId(*in.ScanOptions.BeginItemId)) + } + if in.ScanOptions.EndItemId != nil { + opts = append(opts, WithEndItemId(*in.ScanOptions.EndItemId)) + } feedbackChan, errChan := p.database.GetFeedbackStream(stream.Context(), int(in.BatchSize), opts...) for feedback := range feedbackChan { pbFeedback := make([]*protocol.Feedback, len(feedback)) @@ -930,6 +936,8 @@ func (p ProxyClient) GetFeedbackStream(ctx context.Context, batchSize int, optio pbOptions := &protocol.ScanOptions{ BeginUserId: o.BeginUserId, EndUserId: o.EndUserId, + BeginItemId: o.BeginItemId, + EndItemId: o.EndItemId, FeedbackTypes: o.FeedbackTypes, } if o.BeginTime != nil { diff --git a/storage/data/sql.go b/storage/data/sql.go index caf5a85fc..9b27a99fa 100644 --- a/storage/data/sql.go +++ b/storage/data/sql.go @@ -976,6 +976,12 @@ func (d *SQLDatabase) GetFeedbackStream(ctx context.Context, batchSize int, scan if scan.EndUserId != nil { tx.Where("user_id <= ?", scan.EndUserId) } + if scan.BeginItemId != nil { + tx.Where("item_id >= ?", scan.BeginItemId) + } + if scan.EndItemId != nil { + tx.Where("item_id <= ?", scan.EndItemId) + } result, err := tx.Rows() if err != nil { errChan <- errors.Trace(err) diff --git a/worker/worker.go b/worker/worker.go index 899336896..d5554242e 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -736,7 +736,7 @@ func (w *Worker) Recommend(users []data.User) { if w.Config.Recommend.Offline.EnableLatestRecommend { localStartTime := time.Now() for _, category := range append([]string{""}, itemCategories...) { - latestItems, err := w.CacheClient.SearchScores(ctx, cache.LatestItems, "", []string{category}, 0, w.Config.Recommend.CacheSize) + latestItems, err := w.CacheClient.SearchScores(ctx, cache.NonPersonalized, cache.Latest, []string{category}, 0, w.Config.Recommend.CacheSize) if err != nil { log.Logger().Error("failed to load latest items", zap.Error(err)) return errors.Trace(err) @@ -756,7 +756,7 @@ func (w *Worker) Recommend(users []data.User) { if w.Config.Recommend.Offline.EnablePopularRecommend { localStartTime := time.Now() for _, category := range append([]string{""}, itemCategories...) { - popularItems, err := w.CacheClient.SearchScores(ctx, cache.PopularItems, "", []string{category}, 0, w.Config.Recommend.CacheSize) + popularItems, err := w.CacheClient.SearchScores(ctx, cache.NonPersonalized, cache.Popular, []string{category}, 0, w.Config.Recommend.CacheSize) if err != nil { log.Logger().Error("failed to load popular items", zap.Error(err)) return errors.Trace(err) @@ -1045,12 +1045,12 @@ func (w *Worker) exploreRecommend(exploitRecommend []cache.Score, excludeSet map exploreLatestThreshold += threshold } // load popular items - popularItems, err := w.CacheClient.SearchScores(ctx, cache.PopularItems, "", []string{category}, 0, w.Config.Recommend.CacheSize) + popularItems, err := w.CacheClient.SearchScores(ctx, cache.NonPersonalized, cache.Popular, []string{category}, 0, w.Config.Recommend.CacheSize) if err != nil { return nil, errors.Trace(err) } // load the latest items - latestItems, err := w.CacheClient.SearchScores(ctx, cache.LatestItems, "", []string{category}, 0, w.Config.Recommend.CacheSize) + latestItems, err := w.CacheClient.SearchScores(ctx, cache.NonPersonalized, cache.Latest, []string{category}, 0, w.Config.Recommend.CacheSize) if err != nil { return nil, errors.Trace(err) } diff --git a/worker/worker_test.go b/worker/worker_test.go index 5a2babac0..0bcbc322f 100644 --- a/worker/worker_test.go +++ b/worker/worker_test.go @@ -442,7 +442,7 @@ func (suite *WorkerTestSuite) TestRecommendPopular() { suite.Config.Recommend.Offline.EnableColRecommend = false suite.Config.Recommend.Offline.EnablePopularRecommend = true // insert popular items - err := suite.CacheClient.AddScores(ctx, cache.PopularItems, "", []cache.Score{ + err := suite.CacheClient.AddScores(ctx, cache.NonPersonalized, cache.Popular, []cache.Score{ {Id: "11", Score: 11, Categories: []string{""}}, {Id: "10", Score: 10, Categories: []string{""}}, {Id: "9", Score: 9, Categories: []string{""}}, @@ -491,7 +491,7 @@ func (suite *WorkerTestSuite) TestRecommendLatest() { suite.Config.Recommend.Offline.EnableColRecommend = false suite.Config.Recommend.Offline.EnableLatestRecommend = true // insert latest items - err := suite.CacheClient.AddScores(ctx, cache.LatestItems, "", []cache.Score{ + err := suite.CacheClient.AddScores(ctx, cache.NonPersonalized, cache.Latest, []cache.Score{ {Id: "11", Score: 11, Categories: []string{""}}, {Id: "10", Score: 10, Categories: []string{""}}, {Id: "9", Score: 9, Categories: []string{""}}, @@ -539,7 +539,7 @@ func (suite *WorkerTestSuite) TestRecommendColdStart() { suite.Config.Recommend.Offline.EnableColRecommend = true suite.Config.Recommend.Offline.EnableLatestRecommend = true // insert latest items - err := suite.CacheClient.AddScores(ctx, cache.LatestItems, "", []cache.Score{ + err := suite.CacheClient.AddScores(ctx, cache.NonPersonalized, cache.Latest, []cache.Score{ {Id: "11", Score: 11, Categories: []string{""}}, {Id: "10", Score: 10, Categories: []string{""}}, {Id: "9", Score: 9, Categories: []string{""}}, @@ -591,10 +591,10 @@ func (suite *WorkerTestSuite) TestExploreRecommend() { ctx := context.Background() suite.Config.Recommend.Offline.ExploreRecommend = map[string]float64{"popular": 0.3, "latest": 0.3} // insert popular items - err := suite.CacheClient.AddScores(ctx, cache.PopularItems, "", []cache.Score{{Id: "popular", Score: 0, Categories: []string{""}, Timestamp: time.Now()}}) + err := suite.CacheClient.AddScores(ctx, cache.NonPersonalized, cache.Popular, []cache.Score{{Id: "popular", Score: 0, Categories: []string{""}, Timestamp: time.Now()}}) suite.NoError(err) // insert latest items - err = suite.CacheClient.AddScores(ctx, cache.LatestItems, "", []cache.Score{{Id: "latest", Score: 0, Categories: []string{""}, Timestamp: time.Now()}}) + err = suite.CacheClient.AddScores(ctx, cache.NonPersonalized, cache.Latest, []cache.Score{{Id: "latest", Score: 0, Categories: []string{""}, Timestamp: time.Now()}}) suite.NoError(err) recommend, err := suite.exploreRecommend([]cache.Score{ @@ -940,7 +940,7 @@ func (suite *WorkerTestSuite) TestReplacement_ClickThroughRate() { err = suite.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastUpdateUserRecommendTime, "0"), time.Now().AddDate(-1, 0, 0))) suite.NoError(err) // insert popular items - err = suite.CacheClient.AddScores(ctx, cache.PopularItems, "", []cache.Score{ + err = suite.CacheClient.AddScores(ctx, cache.NonPersonalized, cache.Popular, []cache.Score{ {Id: "7", Score: 10, Categories: []string{""}}, {Id: "6", Score: 9, Categories: []string{""}}, {Id: "5", Score: 8, Categories: []string{""}}, @@ -1005,7 +1005,7 @@ func (suite *WorkerTestSuite) TestReplacement_CollaborativeFiltering() { err = suite.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastUpdateUserRecommendTime, "0"), time.Now().AddDate(-1, 0, 0))) suite.NoError(err) // insert popular items - err = suite.CacheClient.AddScores(ctx, cache.PopularItems, "", []cache.Score{ + err = suite.CacheClient.AddScores(ctx, cache.NonPersonalized, cache.Popular, []cache.Score{ {Id: "7", Score: 10, Categories: []string{""}}, {Id: "6", Score: 9, Categories: []string{""}}, {Id: "5", Score: 8, Categories: []string{""}}})