Skip to content

Commit

Permalink
support with begin/end item id
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenghaoz committed Dec 16, 2024
1 parent b48e7b8 commit 4bbb6c0
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 32 deletions.
59 changes: 41 additions & 18 deletions protocol/data_store.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 5 additions & 3 deletions protocol/data_store.proto
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@ message ItemPatch {
message ScanOptions {
optional string begin_user_id = 1;
optional string end_user_id = 2;
optional google.protobuf.Timestamp begin_time = 3;
optional google.protobuf.Timestamp end_time = 4;
repeated string feedback_types = 5;
optional string begin_item_id = 3;
optional string end_item_id = 4;
optional google.protobuf.Timestamp begin_time = 5;
optional google.protobuf.Timestamp end_time = 6;
repeated string feedback_types = 7;
}

message BatchInsertItemsRequest {
Expand Down
16 changes: 16 additions & 0 deletions storage/data/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ func (sorter feedbackSorter) Swap(i, j int) {
type ScanOptions struct {
BeginUserId *string
EndUserId *string
BeginItemId *string
EndItemId *string
BeginTime *time.Time
EndTime *time.Time
FeedbackTypes []string
Expand All @@ -195,6 +197,20 @@ func WithEndUserId(userId string) ScanOption {
}
}

// WithBeginItemId sets the beginning item id. The beginning item id is included in the result.
func WithBeginItemId(itemId string) ScanOption {
return func(options *ScanOptions) {
options.BeginItemId = &itemId
}
}

// WithEndItemId sets the end item id. The end item id is included in the result.
func WithEndItemId(itemId string) ScanOption {
return func(options *ScanOptions) {
options.EndItemId = &itemId
}
}

// WithBeginTime sets the begin time. The begin time is included in the result.
func WithBeginTime(t time.Time) ScanOption {
return func(options *ScanOptions) {
Expand Down
2 changes: 2 additions & 0 deletions storage/data/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,8 @@ func (suite *baseTestSuite) TestFeedback() {
suite.Empty(feedbackFromStream)
feedbackFromStream = suite.getFeedbackStream(ctx, 3, WithBeginUserId("1"), WithEndUserId("3"), WithEndTime(time.Now()), WithFeedbackTypes(positiveFeedbackType))
suite.Equal(feedback[1:4], feedbackFromStream)
feedbackFromStream = suite.getFeedbackStream(ctx, 3, WithBeginItemId("2"), WithEndItemId("6"), WithEndTime(time.Now()), WithFeedbackTypes(positiveFeedbackType))
suite.Equal(feedback[1:4], feedbackFromStream)
// Get items
err = suite.Database.Optimize()
suite.NoError(err)
Expand Down
10 changes: 10 additions & 0 deletions storage/data/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -726,6 +726,16 @@ func (db *MongoDB) GetFeedbackStream(ctx context.Context, batchSize int, scanOpt
}
filter["feedbackkey.userid"] = userIdConditions
}
if scan.BeginItemId != nil || scan.EndItemId != nil {
itemIdConditions := bson.M{}
if scan.BeginItemId != nil {
itemIdConditions["$gte"] = *scan.BeginItemId
}
if scan.EndItemId != nil {
itemIdConditions["$lte"] = *scan.EndItemId
}
filter["feedbackkey.itemid"] = itemIdConditions
}

r, err := c.Find(ctx, filter, opt)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions storage/data/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,12 @@ func (p *ProxyServer) GetFeedbackStream(in *protocol.GetFeedbackStreamRequest, s
if in.ScanOptions.EndUserId != nil {
opts = append(opts, WithEndUserId(*in.ScanOptions.EndUserId))
}
if in.ScanOptions.BeginItemId != nil {
opts = append(opts, WithBeginItemId(*in.ScanOptions.BeginItemId))
}
if in.ScanOptions.EndItemId != nil {
opts = append(opts, WithEndItemId(*in.ScanOptions.EndItemId))
}
feedbackChan, errChan := p.database.GetFeedbackStream(stream.Context(), int(in.BatchSize), opts...)
for feedback := range feedbackChan {
pbFeedback := make([]*protocol.Feedback, len(feedback))
Expand Down Expand Up @@ -930,6 +936,8 @@ func (p ProxyClient) GetFeedbackStream(ctx context.Context, batchSize int, optio
pbOptions := &protocol.ScanOptions{
BeginUserId: o.BeginUserId,
EndUserId: o.EndUserId,
BeginItemId: o.BeginItemId,
EndItemId: o.EndItemId,
FeedbackTypes: o.FeedbackTypes,
}
if o.BeginTime != nil {
Expand Down
6 changes: 6 additions & 0 deletions storage/data/sql.go
Original file line number Diff line number Diff line change
Expand Up @@ -976,6 +976,12 @@ func (d *SQLDatabase) GetFeedbackStream(ctx context.Context, batchSize int, scan
if scan.EndUserId != nil {
tx.Where("user_id <= ?", scan.EndUserId)
}
if scan.BeginItemId != nil {
tx.Where("item_id >= ?", scan.BeginItemId)
}
if scan.EndItemId != nil {
tx.Where("item_id <= ?", scan.EndItemId)
}
result, err := tx.Rows()
if err != nil {
errChan <- errors.Trace(err)
Expand Down
8 changes: 4 additions & 4 deletions worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,7 +736,7 @@ func (w *Worker) Recommend(users []data.User) {
if w.Config.Recommend.Offline.EnableLatestRecommend {
localStartTime := time.Now()
for _, category := range append([]string{""}, itemCategories...) {
latestItems, err := w.CacheClient.SearchScores(ctx, cache.LatestItems, "", []string{category}, 0, w.Config.Recommend.CacheSize)
latestItems, err := w.CacheClient.SearchScores(ctx, cache.NonPersonalized, cache.Latest, []string{category}, 0, w.Config.Recommend.CacheSize)
if err != nil {
log.Logger().Error("failed to load latest items", zap.Error(err))
return errors.Trace(err)
Expand All @@ -756,7 +756,7 @@ func (w *Worker) Recommend(users []data.User) {
if w.Config.Recommend.Offline.EnablePopularRecommend {
localStartTime := time.Now()
for _, category := range append([]string{""}, itemCategories...) {
popularItems, err := w.CacheClient.SearchScores(ctx, cache.PopularItems, "", []string{category}, 0, w.Config.Recommend.CacheSize)
popularItems, err := w.CacheClient.SearchScores(ctx, cache.NonPersonalized, cache.Popular, []string{category}, 0, w.Config.Recommend.CacheSize)
if err != nil {
log.Logger().Error("failed to load popular items", zap.Error(err))
return errors.Trace(err)
Expand Down Expand Up @@ -1045,12 +1045,12 @@ func (w *Worker) exploreRecommend(exploitRecommend []cache.Score, excludeSet map
exploreLatestThreshold += threshold
}
// load popular items
popularItems, err := w.CacheClient.SearchScores(ctx, cache.PopularItems, "", []string{category}, 0, w.Config.Recommend.CacheSize)
popularItems, err := w.CacheClient.SearchScores(ctx, cache.NonPersonalized, cache.Popular, []string{category}, 0, w.Config.Recommend.CacheSize)
if err != nil {
return nil, errors.Trace(err)
}
// load the latest items
latestItems, err := w.CacheClient.SearchScores(ctx, cache.LatestItems, "", []string{category}, 0, w.Config.Recommend.CacheSize)
latestItems, err := w.CacheClient.SearchScores(ctx, cache.NonPersonalized, cache.Latest, []string{category}, 0, w.Config.Recommend.CacheSize)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
14 changes: 7 additions & 7 deletions worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,7 @@ func (suite *WorkerTestSuite) TestRecommendPopular() {
suite.Config.Recommend.Offline.EnableColRecommend = false
suite.Config.Recommend.Offline.EnablePopularRecommend = true
// insert popular items
err := suite.CacheClient.AddScores(ctx, cache.PopularItems, "", []cache.Score{
err := suite.CacheClient.AddScores(ctx, cache.NonPersonalized, cache.Popular, []cache.Score{
{Id: "11", Score: 11, Categories: []string{""}},
{Id: "10", Score: 10, Categories: []string{""}},
{Id: "9", Score: 9, Categories: []string{""}},
Expand Down Expand Up @@ -491,7 +491,7 @@ func (suite *WorkerTestSuite) TestRecommendLatest() {
suite.Config.Recommend.Offline.EnableColRecommend = false
suite.Config.Recommend.Offline.EnableLatestRecommend = true
// insert latest items
err := suite.CacheClient.AddScores(ctx, cache.LatestItems, "", []cache.Score{
err := suite.CacheClient.AddScores(ctx, cache.NonPersonalized, cache.Latest, []cache.Score{
{Id: "11", Score: 11, Categories: []string{""}},
{Id: "10", Score: 10, Categories: []string{""}},
{Id: "9", Score: 9, Categories: []string{""}},
Expand Down Expand Up @@ -539,7 +539,7 @@ func (suite *WorkerTestSuite) TestRecommendColdStart() {
suite.Config.Recommend.Offline.EnableColRecommend = true
suite.Config.Recommend.Offline.EnableLatestRecommend = true
// insert latest items
err := suite.CacheClient.AddScores(ctx, cache.LatestItems, "", []cache.Score{
err := suite.CacheClient.AddScores(ctx, cache.NonPersonalized, cache.Latest, []cache.Score{
{Id: "11", Score: 11, Categories: []string{""}},
{Id: "10", Score: 10, Categories: []string{""}},
{Id: "9", Score: 9, Categories: []string{""}},
Expand Down Expand Up @@ -591,10 +591,10 @@ func (suite *WorkerTestSuite) TestExploreRecommend() {
ctx := context.Background()
suite.Config.Recommend.Offline.ExploreRecommend = map[string]float64{"popular": 0.3, "latest": 0.3}
// insert popular items
err := suite.CacheClient.AddScores(ctx, cache.PopularItems, "", []cache.Score{{Id: "popular", Score: 0, Categories: []string{""}, Timestamp: time.Now()}})
err := suite.CacheClient.AddScores(ctx, cache.NonPersonalized, cache.Popular, []cache.Score{{Id: "popular", Score: 0, Categories: []string{""}, Timestamp: time.Now()}})
suite.NoError(err)
// insert latest items
err = suite.CacheClient.AddScores(ctx, cache.LatestItems, "", []cache.Score{{Id: "latest", Score: 0, Categories: []string{""}, Timestamp: time.Now()}})
err = suite.CacheClient.AddScores(ctx, cache.NonPersonalized, cache.Latest, []cache.Score{{Id: "latest", Score: 0, Categories: []string{""}, Timestamp: time.Now()}})
suite.NoError(err)

recommend, err := suite.exploreRecommend([]cache.Score{
Expand Down Expand Up @@ -940,7 +940,7 @@ func (suite *WorkerTestSuite) TestReplacement_ClickThroughRate() {
err = suite.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastUpdateUserRecommendTime, "0"), time.Now().AddDate(-1, 0, 0)))
suite.NoError(err)
// insert popular items
err = suite.CacheClient.AddScores(ctx, cache.PopularItems, "", []cache.Score{
err = suite.CacheClient.AddScores(ctx, cache.NonPersonalized, cache.Popular, []cache.Score{
{Id: "7", Score: 10, Categories: []string{""}},
{Id: "6", Score: 9, Categories: []string{""}},
{Id: "5", Score: 8, Categories: []string{""}},
Expand Down Expand Up @@ -1005,7 +1005,7 @@ func (suite *WorkerTestSuite) TestReplacement_CollaborativeFiltering() {
err = suite.CacheClient.Set(ctx, cache.Time(cache.Key(cache.LastUpdateUserRecommendTime, "0"), time.Now().AddDate(-1, 0, 0)))
suite.NoError(err)
// insert popular items
err = suite.CacheClient.AddScores(ctx, cache.PopularItems, "", []cache.Score{
err = suite.CacheClient.AddScores(ctx, cache.NonPersonalized, cache.Popular, []cache.Score{
{Id: "7", Score: 10, Categories: []string{""}},
{Id: "6", Score: 9, Categories: []string{""}},
{Id: "5", Score: 8, Categories: []string{""}}})
Expand Down

0 comments on commit 4bbb6c0

Please sign in to comment.