Skip to content

Commit

Permalink
data: count users, items and feedback (#907)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenghaoz authored Dec 28, 2024
1 parent dfc04c5 commit db0c194
Show file tree
Hide file tree
Showing 12 changed files with 877 additions and 168 deletions.
613 changes: 448 additions & 165 deletions protocol/data_store.pb.go

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions protocol/data_store.proto
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,24 @@ message GetFeedbackStreamResponse {
repeated Feedback feedback = 1;
}

message CountUsersRequest {}

message CountUsersResponse {
int32 count = 1;
}

message CountItemsRequest {}

message CountItemsResponse {
int32 count = 1;
}

message CountFeedbackRequest {}

message CountFeedbackResponse {
int32 count = 1;
}

service DataStore {
rpc Ping(PingRequest) returns (PingResponse) {}
rpc BatchInsertItems(BatchInsertItemsRequest) returns (BatchInsertItemsResponse) {}
Expand All @@ -226,4 +244,7 @@ service DataStore {
rpc GetUserStream(GetUserStreamRequest) returns (stream GetUserStreamResponse) {}
rpc GetItemStream(GetItemStreamRequest) returns (stream GetItemStreamResponse) {}
rpc GetFeedbackStream(GetFeedbackStreamRequest) returns (stream GetFeedbackStreamResponse) {}
rpc CountUsers(CountUsersRequest) returns (CountUsersResponse) {}
rpc CountItems(CountItemsRequest) returns (CountItemsResponse) {}
rpc CountFeedback(CountFeedbackRequest) returns (CountFeedbackResponse) {}
}
114 changes: 114 additions & 0 deletions protocol/data_store_grpc.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions storage/data/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,9 @@ type Database interface {
GetUserStream(ctx context.Context, batchSize int) (chan []User, chan error)
GetItemStream(ctx context.Context, batchSize int, timeLimit *time.Time) (chan []Item, chan error)
GetFeedbackStream(ctx context.Context, batchSize int, options ...ScanOption) (chan []Feedback, chan error)
CountUsers(ctx context.Context) (int, error)
CountItems(ctx context.Context) (int, error)
CountFeedback(ctx context.Context) (int, error)
}

// Open a connection to a database.
Expand Down
47 changes: 47 additions & 0 deletions storage/data/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/juju/errors"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"google.golang.org/protobuf/proto"
)
Expand Down Expand Up @@ -135,6 +136,19 @@ func (suite *baseTestSuite) isClickHouse() bool {
}
}

func (suite *baseTestSuite) analyzeTables() {
sqlDatabase, ok := suite.Database.(*SQLDatabase)
if ok && sqlDatabase.driver == Postgres {
sqlDatabase := suite.Database.(*SQLDatabase)
err := sqlDatabase.gormDB.Exec(fmt.Sprintf("ANALYZE %s", sqlDatabase.ItemsTable())).Error
suite.NoError(err)
err = sqlDatabase.gormDB.Exec(fmt.Sprintf("ANALYZE %s", sqlDatabase.UsersTable())).Error
suite.NoError(err)
err = sqlDatabase.gormDB.Exec(fmt.Sprintf("ANALYZE %s", sqlDatabase.FeedbackTable())).Error
suite.NoError(err)
}
}

func (suite *baseTestSuite) TearDownSuite() {
err := suite.Database.Close()
suite.NoError(err)
Expand Down Expand Up @@ -171,6 +185,11 @@ func (suite *baseTestSuite) TestUsers() {
}
err := suite.Database.BatchInsertUsers(ctx, insertedUsers)
suite.NoError(err)
// Count users
suite.analyzeTables()
count, err := suite.Database.CountUsers(ctx)
suite.NoError(err)
suite.Equal(10, count)
// Get users
users := suite.getUsers(ctx, 3)
suite.Equal(10, len(users))
Expand Down Expand Up @@ -255,6 +274,11 @@ func (suite *baseTestSuite) TestFeedback() {
}
err = suite.Database.BatchInsertFeedback(ctx, futureFeedback, true, true, true)
suite.NoError(err)
// Count feedback
suite.analyzeTables()
count, err := suite.Database.CountFeedback(ctx)
suite.NoError(err)
suite.Equal(12, count)
// Get feedback
ret := suite.getFeedback(ctx, 3, nil, lo.ToPtr(time.Now()), positiveFeedbackType)
suite.Equal(feedback, ret)
Expand Down Expand Up @@ -438,6 +462,11 @@ func (suite *baseTestSuite) TestItems() {
// Insert item
err := suite.Database.BatchInsertItems(ctx, items)
suite.NoError(err)
// Count items
suite.analyzeTables()
count, err := suite.Database.CountItems(ctx)
suite.NoError(err)
suite.Equal(5, count)
// Get items
totalItems := suite.getItems(ctx, 3)
suite.Equal(items, totalItems)
Expand Down Expand Up @@ -808,3 +837,21 @@ func TestValidateLabels(t *testing.T) {
assert.Error(t, ValidateLabels(map[string]any{"city": "wenzhou", "tags": []any{"1", json.Number("2"), "3"}}))
assert.Error(t, ValidateLabels(map[string]any{"city": "wenzhou", "tags": []any{"1", "2", json.Number("3")}}))
}

func benchmarkCountItems(b *testing.B, db Database) {
ctx := context.Background()
// Insert 10,000 items
items := make([]Item, 100000)
for i := range items {
items[i] = Item{ItemId: strconv.Itoa(i)}
}
err := db.BatchInsertItems(ctx, items)
require.NoError(b, err)
// Benchmark count items
b.ResetTimer()
for i := 0; i < b.N; i++ {
n, err := db.CountItems(ctx)
require.NoError(b, err)
require.Equal(b, 100000, n)
}
}
15 changes: 15 additions & 0 deletions storage/data/mongodb.go
Original file line number Diff line number Diff line change
Expand Up @@ -809,3 +809,18 @@ func (db *MongoDB) DeleteUserItemFeedback(ctx context.Context, userId, itemId st
}
return int(r.DeletedCount), nil
}

func (db *MongoDB) CountUsers(ctx context.Context) (int, error) {
n, err := db.client.Database(db.dbName).Collection(db.UsersTable()).EstimatedDocumentCount(ctx)
return int(n), err
}

func (db *MongoDB) CountItems(ctx context.Context) (int, error) {
n, err := db.client.Database(db.dbName).Collection(db.ItemsTable()).EstimatedDocumentCount(ctx)
return int(n), err
}

func (db *MongoDB) CountFeedback(ctx context.Context) (int, error) {
n, err := db.client.Database(db.dbName).Collection(db.FeedbackTable()).EstimatedDocumentCount(ctx)
return int(n), err
}
25 changes: 25 additions & 0 deletions storage/data/mongodb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"os"
"testing"

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
)

Expand Down Expand Up @@ -72,3 +73,27 @@ func (suite *MongoTestSuite) getMongoDB() *MongoDB {
func TestMongo(t *testing.T) {
suite.Run(t, new(MongoTestSuite))
}

func BenchmarkMongo_CountItems(b *testing.B) {
ctx := context.Background()
var err error

// create database
database, err := Open(mongoUri, "gorse_")
require.NoError(b, err)
dbName := "gorse_data_test"
databaseComm := database.(*MongoDB)
err = databaseComm.client.Database(dbName).Drop(ctx)
require.NoError(b, err)
database, err = Open(mongoUri+dbName+"?authSource=admin&connect=direct", "gorse_")
require.NoError(b, err)
err = database.Init()
require.NoError(b, err)

// benchmark
benchmarkCountItems(b, database)

// close database
err = database.Close()
require.NoError(b, err)
}
12 changes: 12 additions & 0 deletions storage/data/no_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,15 @@ func (d NoDatabase) ModifyItem(_ context.Context, _ string, _ ItemPatch) error {
func (d NoDatabase) ModifyUser(_ context.Context, _ string, _ UserPatch) error {
return ErrNoDatabase
}

func (d NoDatabase) CountUsers(_ context.Context) (int, error) {
return 0, ErrNoDatabase
}

func (d NoDatabase) CountItems(_ context.Context) (int, error) {
return 0, ErrNoDatabase
}

func (d NoDatabase) CountFeedback(_ context.Context) (int, error) {
return 0, ErrNoDatabase
}
12 changes: 10 additions & 2 deletions storage/data/no_database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@ package data

import (
"context"
"github.com/samber/lo"
"github.com/stretchr/testify/assert"
"testing"
"time"

"github.com/samber/lo"
"github.com/stretchr/testify/assert"
)

func TestNoDatabase(t *testing.T) {
Expand Down Expand Up @@ -81,4 +82,11 @@ func TestNoDatabase(t *testing.T) {
assert.ErrorIs(t, err, ErrNoDatabase)
_, c = database.GetFeedbackStream(ctx, 0)
assert.ErrorIs(t, <-c, ErrNoDatabase)

_, err = database.CountUsers(ctx)
assert.ErrorIs(t, err, ErrNoDatabase)
_, err = database.CountItems(ctx)
assert.ErrorIs(t, err, ErrNoDatabase)
_, err = database.CountFeedback(ctx)
assert.ErrorIs(t, err, ErrNoDatabase)
}
Loading

0 comments on commit db0c194

Please sign in to comment.