Skip to content

Commit

Permalink
remove leaderboard to non-personalized
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenghaoz committed Dec 8, 2024
1 parent e5ea0e2 commit b48e7b8
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 81 deletions.
6 changes: 3 additions & 3 deletions master/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -1360,11 +1360,11 @@ func writeError(response http.ResponseWriter, httpStatus int, message string) {
}
}

func (s *Master) checkAdmin(request *http.Request) bool {
if s.Config.Master.AdminAPIKey == "" {
func (m *Master) checkAdmin(request *http.Request) bool {
if m.Config.Master.AdminAPIKey == "" {
return true
}
if request.FormValue("X-API-Key") == s.Config.Master.AdminAPIKey {
if request.FormValue("X-API-Key") == m.Config.Master.AdminAPIKey {
return true
}
return false
Expand Down
114 changes: 42 additions & 72 deletions master/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package master
import (
"context"
"fmt"
"github.com/zhenghaoz/gorse/logics"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -49,7 +50,6 @@ import (
const (
PositiveFeedbackRate = "PositiveFeedbackRate"

TaskLoadDataset = "Load dataset"
TaskFindItemNeighbors = "Find neighbors of items"
TaskFindUserNeighbors = "Find neighbors of users"
TaskFitRankingModel = "Fit collaborative filtering model"
Expand All @@ -73,45 +73,49 @@ func (m *Master) runLoadDatasetTask() error {
ctx, span := m.tracer.Start(context.Background(), "Load Dataset", 1)
defer span.End()

// Build non-personalized recommenders
initialStartTime := time.Now()
nonPersonalizedRecommenders := []*logics.NonPersonalized{
logics.NewLatest(m.Config.Recommend.CacheSize, initialStartTime),
logics.NewPopular(m.Config.Recommend.Popular.PopularWindow, m.Config.Recommend.CacheSize, initialStartTime),
}
for _, cfg := range m.Config.Recommend.NonPersonalized {
recommender, err := logics.NewNonPersonalized(cfg, m.Config.Recommend.CacheSize, initialStartTime)
if err != nil {
return errors.Trace(err)
}
nonPersonalizedRecommenders = append(nonPersonalizedRecommenders, recommender)
}

log.Logger().Info("load dataset",
zap.Strings("positive_feedback_types", m.Config.Recommend.DataSource.PositiveFeedbackTypes),
zap.Strings("read_feedback_types", m.Config.Recommend.DataSource.ReadFeedbackTypes),
zap.Uint("item_ttl", m.Config.Recommend.DataSource.ItemTTL),
zap.Uint("feedback_ttl", m.Config.Recommend.DataSource.PositiveFeedbackTTL))
evaluator := NewOnlineEvaluator()
rankingDataset, clickDataset, latestItems, popularItems, err := m.LoadDataFromDatabase(ctx, m.DataClient,
rankingDataset, clickDataset, err := m.LoadDataFromDatabase(ctx, m.DataClient,
m.Config.Recommend.DataSource.PositiveFeedbackTypes,
m.Config.Recommend.DataSource.ReadFeedbackTypes,
m.Config.Recommend.DataSource.ItemTTL,
m.Config.Recommend.DataSource.PositiveFeedbackTTL,
evaluator)
evaluator, nonPersonalizedRecommenders)
if err != nil {
return errors.Trace(err)
}

// save popular items to cache
if err = m.CacheClient.AddScores(ctx, cache.NonPersonalized, cache.Popular, popularItems.ToSlice()); err != nil {
log.Logger().Error("failed to cache popular items", zap.Error(err))
}
if err = m.CacheClient.DeleteScores(ctx, []string{cache.NonPersonalized},
cache.ScoreCondition{Subset: proto.String(cache.Popular), Before: &popularItems.Timestamp}); err != nil {
log.Logger().Error("failed to reclaim outdated items", zap.Error(err))
}
if err = m.CacheClient.Set(ctx, cache.Time(cache.Key(cache.GlobalMeta, cache.LastUpdatePopularItemsTime), time.Now())); err != nil {
log.Logger().Error("failed to write latest update popular items time", zap.Error(err))
}

// save the latest items to cache
if err = m.CacheClient.AddScores(ctx, cache.NonPersonalized, cache.Latest, latestItems.ToSlice()); err != nil {
log.Logger().Error("failed to cache latest items", zap.Error(err))
}
if err = m.CacheClient.DeleteScores(ctx, []string{cache.NonPersonalized},
cache.ScoreCondition{Subset: proto.String(cache.Latest), Before: &latestItems.Timestamp}); err != nil {
log.Logger().Error("failed to reclaim outdated items", zap.Error(err))
}
if err = m.CacheClient.Set(ctx, cache.Time(cache.Key(cache.GlobalMeta, cache.LastUpdateLatestItemsTime), time.Now())); err != nil {
log.Logger().Error("failed to write latest update latest items time", zap.Error(err))
// save non-personalized recommenders to cache
for _, recommender := range nonPersonalizedRecommenders {
scores := recommender.PopAll()
if err = m.CacheClient.AddScores(ctx, cache.NonPersonalized, recommender.Name(), scores); err != nil {
log.Logger().Error("failed to cache non-personalized recommenders", zap.Error(err))
}
if err = m.CacheClient.DeleteScores(ctx, []string{cache.NonPersonalized},
cache.ScoreCondition{
Subset: proto.String(recommender.Name()),
Before: lo.ToPtr(recommender.Timestamp()),
}); err != nil {
log.Logger().Error("failed to reclaim outdated items", zap.Error(err))
}
}

// write statistics to database
Expand Down Expand Up @@ -1396,12 +1400,17 @@ func (t *CacheGarbageCollectionTask) run(ctx context.Context, j *task.JobsAlloca
}

// LoadDataFromDatabase loads dataset from data store.
func (m *Master) LoadDataFromDatabase(ctx context.Context, database data.Database, posFeedbackTypes, readTypes []string, itemTTL, positiveFeedbackTTL uint, evaluator *OnlineEvaluator) (
rankingDataset *ranking.DataSet, clickDataset *click.Dataset, latestItems *cache.DocumentAggregator, popularItems *cache.DocumentAggregator, err error) {
func (m *Master) LoadDataFromDatabase(
ctx context.Context,
database data.Database,
posFeedbackTypes, readTypes []string,
itemTTL, positiveFeedbackTTL uint,
evaluator *OnlineEvaluator,
nonPersonalizedRecommenders []*logics.NonPersonalized,
) (rankingDataset *ranking.DataSet, clickDataset *click.Dataset, err error) {
newCtx, span := progress.Start(ctx, "LoadDataFromDatabase", 4)
defer span.End()

startLoadTime := time.Now()
// setup time limit
var feedbackTimeLimit data.ScanOption
var itemTimeLimit *time.Time
Expand All @@ -1419,10 +1428,6 @@ func (m *Master) LoadDataFromDatabase(ctx context.Context, database data.Databas
}
rankingDataset = ranking.NewMapIndexDataset()

// create filers for latest items
latestItemsFilters := make(map[string]*heap.TopKFilter[string, float64])
latestItemsFilters[""] = heap.NewTopKFilter[string, float64](m.Config.Recommend.CacheSize)

// STEP 1: pull users
userLabelCount := make(map[string]int)
userLabelFirst := make(map[string]int32)
Expand Down Expand Up @@ -1465,7 +1470,7 @@ func (m *Master) LoadDataFromDatabase(ctx context.Context, database data.Databas
}
}
if err = <-errChan; err != nil {
return nil, nil, nil, nil, errors.Trace(err)
return nil, nil, errors.Trace(err)
}
rankingDataset.NumUserLabels = userLabelIndex.Len()
log.Logger().Debug("pulled users from database",
Expand Down Expand Up @@ -1519,19 +1524,11 @@ func (m *Master) LoadDataFromDatabase(ctx context.Context, database data.Databas
}
if item.IsHidden { // set hidden flag
rankingDataset.HiddenItems[itemIndex] = true
} else if !item.Timestamp.IsZero() { // add items to the latest items filter
latestItemsFilters[""].Push(item.ItemId, float64(item.Timestamp.Unix()))
for _, category := range item.Categories {
if _, exist := latestItemsFilters[category]; !exist {
latestItemsFilters[category] = heap.NewTopKFilter[string, float64](m.Config.Recommend.CacheSize)
}
latestItemsFilters[category].Push(item.ItemId, float64(item.Timestamp.Unix()))
}
}
}
}
if err = <-errChan; err != nil {
return nil, nil, nil, nil, errors.Trace(err)
return nil, nil, errors.Trace(err)
}
rankingDataset.NumItemLabels = itemLabelIndex.Len()
log.Logger().Debug("pulled items from database",
Expand Down Expand Up @@ -1597,7 +1594,7 @@ func (m *Master) LoadDataFromDatabase(ctx context.Context, database data.Databas
return nil
})
if err != nil {
return nil, nil, nil, nil, errors.Trace(err)
return nil, nil, errors.Trace(err)
}
log.Logger().Debug("pulled positive feedback from database",
zap.Int("n_positive_feedback", posFeedbackCount),
Expand Down Expand Up @@ -1647,7 +1644,7 @@ func (m *Master) LoadDataFromDatabase(ctx context.Context, database data.Databas
return nil
})
if err != nil {
return nil, nil, nil, nil, errors.Trace(err)
return nil, nil, errors.Trace(err)
}
log.Logger().Debug("pulled negative feedback from database",
zap.Int("n_negative_feedback", int(negativeFeedbackCount)),
Expand Down Expand Up @@ -1697,32 +1694,5 @@ func (m *Master) LoadDataFromDatabase(ctx context.Context, database data.Databas
zap.Int("n_valid_negative", clickDataset.NegativeCount),
zap.Duration("used_time", time.Since(start)))
LoadDatasetStepSecondsVec.WithLabelValues("create_ranking_dataset").Set(time.Since(start).Seconds())

// collect latest items
latestItems = cache.NewDocumentAggregator(startLoadTime)
for category, latestItemsFilter := range latestItemsFilters {
items, scores := latestItemsFilter.PopAll()
latestItems.Add(category, items, scores)
}

// collect popular items
popularItemFilters := make(map[string]*heap.TopKFilter[string, float64])
popularItemFilters[""] = heap.NewTopKFilter[string, float64](m.Config.Recommend.CacheSize)
for itemIndex, val := range popularCount {
itemId := rankingDataset.ItemIndex.ToName(int32(itemIndex))
popularItemFilters[""].Push(itemId, float64(val))
for _, category := range rankingDataset.ItemCategories[itemIndex] {
if _, exist := popularItemFilters[category]; !exist {
popularItemFilters[category] = heap.NewTopKFilter[string, float64](m.Config.Recommend.CacheSize)
}
popularItemFilters[category].Push(itemId, float64(val))
}
}
popularItems = cache.NewDocumentAggregator(startLoadTime)
for category, popularItemFilter := range popularItemFilters {
items, scores := popularItemFilter.PopAll()
popularItems.Add(category, items, scores)
}

return rankingDataset, clickDataset, latestItems, popularItems, nil
return rankingDataset, clickDataset, nil
}
18 changes: 12 additions & 6 deletions master/tasks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ func (s *MasterTestSuite) TestFindItemNeighborsBruteForce() {
}

// load mock dataset
dataset, _, _, _, err := s.LoadDataFromDatabase(context.Background(), s.DataClient, []string{"FeedbackType"}, nil, 0, 0, NewOnlineEvaluator())
dataset, _, err := s.LoadDataFromDatabase(context.Background(), s.DataClient, []string{"FeedbackType"},
nil, 0, 0, NewOnlineEvaluator(), nil)
s.NoError(err)
s.rankingTrainSet = dataset

Expand Down Expand Up @@ -186,7 +187,8 @@ func (s *MasterTestSuite) TestFindItemNeighborsIVF() {
}

// load mock dataset
dataset, _, _, _, err := s.LoadDataFromDatabase(context.Background(), s.DataClient, []string{"FeedbackType"}, nil, 0, 0, NewOnlineEvaluator())
dataset, _, err := s.LoadDataFromDatabase(context.Background(), s.DataClient, []string{"FeedbackType"},
nil, 0, 0, NewOnlineEvaluator(), nil)
s.NoError(err)
s.rankingTrainSet = dataset

Expand Down Expand Up @@ -253,7 +255,8 @@ func (s *MasterTestSuite) TestFindItemNeighborsIVF_ZeroIDF() {
{FeedbackKey: data.FeedbackKey{FeedbackType: "FeedbackType", UserId: "0", ItemId: "1"}},
}, true, true, true)
s.NoError(err)
dataset, _, _, _, err := s.LoadDataFromDatabase(context.Background(), s.DataClient, []string{"FeedbackType"}, nil, 0, 0, NewOnlineEvaluator())
dataset, _, err := s.LoadDataFromDatabase(context.Background(), s.DataClient, []string{"FeedbackType"},
nil, 0, 0, NewOnlineEvaluator(), nil)
s.NoError(err)
s.rankingTrainSet = dataset

Expand Down Expand Up @@ -313,7 +316,8 @@ func (s *MasterTestSuite) TestFindUserNeighborsBruteForce() {
s.NoError(err)
err = s.DataClient.BatchInsertFeedback(ctx, feedbacks, true, true, true)
s.NoError(err)
dataset, _, _, _, err := s.LoadDataFromDatabase(context.Background(), s.DataClient, []string{"FeedbackType"}, nil, 0, 0, NewOnlineEvaluator())
dataset, _, err := s.LoadDataFromDatabase(context.Background(), s.DataClient, []string{"FeedbackType"},
nil, 0, 0, NewOnlineEvaluator(), nil)
s.NoError(err)
s.rankingTrainSet = dataset

Expand Down Expand Up @@ -393,7 +397,8 @@ func (s *MasterTestSuite) TestFindUserNeighborsIVF() {
s.NoError(err)
err = s.DataClient.BatchInsertFeedback(ctx, feedbacks, true, true, true)
s.NoError(err)
dataset, _, _, _, err := s.LoadDataFromDatabase(context.Background(), s.DataClient, []string{"FeedbackType"}, nil, 0, 0, NewOnlineEvaluator())
dataset, _, err := s.LoadDataFromDatabase(context.Background(), s.DataClient, []string{"FeedbackType"},
nil, 0, 0, NewOnlineEvaluator(), nil)
s.NoError(err)
s.rankingTrainSet = dataset

Expand Down Expand Up @@ -452,7 +457,8 @@ func (s *MasterTestSuite) TestFindUserNeighborsIVF_ZeroIDF() {
{FeedbackKey: data.FeedbackKey{FeedbackType: "FeedbackType", UserId: "1", ItemId: "0"}},
}, true, true, true)
s.NoError(err)
dataset, _, _, _, err := s.LoadDataFromDatabase(context.Background(), s.DataClient, []string{"FeedbackType"}, nil, 0, 0, NewOnlineEvaluator())
dataset, _, err := s.LoadDataFromDatabase(context.Background(), s.DataClient, []string{"FeedbackType"},
nil, 0, 0, NewOnlineEvaluator(), nil)
s.NoError(err)
s.rankingTrainSet = dataset

Expand Down

0 comments on commit b48e7b8

Please sign in to comment.