Skip to content

Commit

Permalink
store meta data in SQLite (#898)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenghaoz authored Dec 7, 2024
1 parent 4665b55 commit beb120c
Show file tree
Hide file tree
Showing 17 changed files with 472 additions and 1,087 deletions.
847 changes: 0 additions & 847 deletions base/names.go

This file was deleted.

27 changes: 0 additions & 27 deletions base/names_test.go

This file was deleted.

2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ require (
github.com/go-sql-driver/mysql v1.6.0
github.com/google/uuid v1.6.0
github.com/gorilla/securecookie v1.1.1
github.com/gorse-io/dashboard v0.0.0-20241115145254-4def1c814899
github.com/gorse-io/dashboard v0.0.0-20241207032532-3b75acd211c4
github.com/haxii/go-swagger-ui v0.0.0-20210203093335-a63a6bbde946
github.com/jaswdr/faker v1.16.0
github.com/jellydator/ttlcache/v3 v3.3.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,8 @@ github.com/gorse-io/clickhouse v0.3.3-0.20220715124633-688011a495bb h1:z/oOWE+Vy
github.com/gorse-io/clickhouse v0.3.3-0.20220715124633-688011a495bb/go.mod h1:iILWzbul8U+gsf4kqbheF2QzBmdvVp63mloGGK8emDI=
github.com/gorse-io/dashboard v0.0.0-20241115145254-4def1c814899 h1:1BQ8+NLDKMYp7BcBhjJgEska+Gt8t2JTj6Rj0afYwG8=
github.com/gorse-io/dashboard v0.0.0-20241115145254-4def1c814899/go.mod h1:LBLzsMv3XVLmpaM/1q8/sGvv2Avj1YxmHBZfXcdqRjU=
github.com/gorse-io/dashboard v0.0.0-20241207032532-3b75acd211c4 h1:FOUvD2HvTY/8j1/I4j/FlX3LEqKGLWPWQLl6jPtUqQ0=
github.com/gorse-io/dashboard v0.0.0-20241207032532-3b75acd211c4/go.mod h1:LBLzsMv3XVLmpaM/1q8/sGvv2Avj1YxmHBZfXcdqRjU=
github.com/gorse-io/gorgonia v0.0.0-20230817132253-6dd1dbf95849 h1:Hwywr6NxzYeZYn35KwOsw7j8ZiMT60TBzpbn1MbEido=
github.com/gorse-io/gorgonia v0.0.0-20230817132253-6dd1dbf95849/go.mod h1:TtVGAt7ENNmgBnC0JA68CAjIDCEtcqaRHvnkAWJ/Fu0=
github.com/gorse-io/sqlite v1.3.3-0.20220713123255-c322aec4e59e h1:uPQtYQzG1QcC3Qbv+tuEe8Q2l++V4KEcqYSSwB9qobg=
Expand Down
20 changes: 11 additions & 9 deletions master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"math/rand"
"net"
"net/http"
"os"
"sync"
"time"

Expand All @@ -44,6 +45,7 @@ import (
"github.com/zhenghaoz/gorse/storage"
"github.com/zhenghaoz/gorse/storage/cache"
"github.com/zhenghaoz/gorse/storage/data"
"github.com/zhenghaoz/gorse/storage/meta"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/propagation"
"go.uber.org/zap"
Expand All @@ -70,9 +72,7 @@ type Master struct {
managedMode bool

// cluster meta cache
ttlCache *ttlcache.Cache[string, *Node]
nodesInfo map[string]*Node
nodesInfoMutex sync.RWMutex
metaStore meta.Database

// ranking dataset
rankingTrainSet *ranking.DataSet
Expand Down Expand Up @@ -124,7 +124,6 @@ func NewMaster(cfg *config.Config, cacheFile string, managedMode bool) *Master {
otel.SetErrorHandler(log.GetErrorHandler())
otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.TraceContext{}, propagation.Baggage{}))
m := &Master{
nodesInfo: make(map[string]*Node),
// create task monitor
cacheFile: cacheFile,
managedMode: managedMode,
Expand Down Expand Up @@ -217,11 +216,14 @@ func (m *Master) Serve() {
MemoryInUseBytesVec.WithLabelValues("ranking_model").Set(float64(sizeof.DeepSize(m.ClickModel)))
}

// create cluster meta cache
m.ttlCache = ttlcache.New[string, *Node](
ttlcache.WithTTL[string, *Node](m.Config.Master.MetaTimeout + 10*time.Second))
m.ttlCache.OnEviction(m.nodeDown)
go m.ttlCache.Start()
// connect meta database
m.metaStore, err = meta.Open(fmt.Sprintf("sqlite://%s/gorse_meta.db", os.TempDir()), m.Config.Master.MetaTimeout)
if err != nil {
log.Logger().Fatal("failed to connect meta database", zap.Error(err))
}
if err = m.metaStore.Init(); err != nil {
log.Logger().Fatal("failed to init meta database", zap.Error(err))
}

// connect data database
m.DataClient, err = data.Open(m.Config.Database.DataStore, m.Config.Database.DataTablePrefix,
Expand Down
55 changes: 27 additions & 28 deletions master/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"net/http"
"os"
"reflect"
"sort"
"strconv"
"strings"
"time"
Expand All @@ -49,6 +50,7 @@ import (
"github.com/zhenghaoz/gorse/server"
"github.com/zhenghaoz/gorse/storage/cache"
"github.com/zhenghaoz/gorse/storage/data"
"github.com/zhenghaoz/gorse/storage/meta"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/timestamppb"
Expand Down Expand Up @@ -81,8 +83,8 @@ func (m *Master) CreateWebService() {
ws.Route(ws.GET("/dashboard/cluster").To(m.getCluster).
Doc("Get nodes in the cluster.").
Metadata(restfulspec.KeyOpenAPITags, []string{"dashboard"}).
Returns(http.StatusOK, "OK", []Node{}).
Writes([]Node{}))
Returns(http.StatusOK, "OK", []meta.Node{}).
Writes([]meta.Node{}))
ws.Route(ws.GET("/dashboard/categories").To(m.getCategories).
Doc("Get categories of items.").
Metadata(restfulspec.KeyOpenAPITags, []string{"dashboard"}).
Expand Down Expand Up @@ -473,23 +475,14 @@ func (m *Master) getCategories(request *restful.Request, response *restful.Respo
}

func (m *Master) getCluster(_ *restful.Request, response *restful.Response) {
// collect nodes
workers := make([]*Node, 0)
servers := make([]*Node, 0)
m.nodesInfoMutex.RLock()
for _, info := range m.nodesInfo {
switch info.Type {
case WorkerNode:
workers = append(workers, info)
case ServerNode:
servers = append(servers, info)
}
nodes, err := m.metaStore.ListNodes()
if err != nil {
server.InternalServerError(response, err)
return
}
m.nodesInfoMutex.RUnlock()
// return nodes
nodes := make([]*Node, 0)
nodes = append(nodes, workers...)
nodes = append(nodes, servers...)
sort.Slice(nodes, func(i, j int) bool {
return nodes[i].Type < nodes[j].Type
})
server.Ok(response, nodes)
}

Expand Down Expand Up @@ -584,16 +577,19 @@ func (m *Master) getStats(request *restful.Request, response *restful.Response)
log.ResponseLogger(response).Warn("failed to get number of valid negative feedbacks", zap.Error(err))
}
// count the number of workers and servers
m.nodesInfoMutex.Lock()
for _, node := range m.nodesInfo {
nodes, err := m.metaStore.ListNodes()
if err != nil {
server.InternalServerError(response, err)
return
}
for _, node := range nodes {
switch node.Type {
case ServerNode:
case protocol.NodeType_Server.String():
status.NumServers++
case WorkerNode:
case protocol.NodeType_Worker.String():
status.NumWorkers++
}
}
m.nodesInfoMutex.Unlock()
// read popular items update time
if status.PopularItemsUpdateTime, err = m.CacheClient.Get(ctx, cache.Key(cache.GlobalMeta, cache.LastUpdatePopularItemsTime)).Time(); err != nil {
log.ResponseLogger(response).Warn("failed to get popular items update time", zap.Error(err))
Expand Down Expand Up @@ -643,13 +639,16 @@ func (m *Master) getStats(request *restful.Request, response *restful.Response)
func (m *Master) getTasks(_ *restful.Request, response *restful.Response) {
// List workers
workers := mapset.NewSet[string]()
m.nodesInfoMutex.RLock()
for _, info := range m.nodesInfo {
if info.Type == WorkerNode {
workers.Add(info.Name)
nodes, err := m.metaStore.ListNodes()
if err != nil {
server.InternalServerError(response, err)
return
}
for _, node := range nodes {
if node.Type == protocol.NodeType_Worker.String() {
workers.Add(node.UUID)
}
}
m.nodesInfoMutex.RUnlock()
// List local progress
progressList := m.tracer.List()
// list remote progress
Expand Down
35 changes: 28 additions & 7 deletions master/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ import (
"github.com/zhenghaoz/gorse/config"
"github.com/zhenghaoz/gorse/model/click"
"github.com/zhenghaoz/gorse/model/ranking"
"github.com/zhenghaoz/gorse/protocol"
"github.com/zhenghaoz/gorse/server"
"github.com/zhenghaoz/gorse/storage/cache"
"github.com/zhenghaoz/gorse/storage/data"
"github.com/zhenghaoz/gorse/storage/meta"
"google.golang.org/protobuf/proto"
)

Expand All @@ -57,11 +59,15 @@ func newMockServer(t *testing.T) (*mockServer, string) {
// open database
var err error
s.Settings = config.NewSettings()
s.metaStore, err = meta.Open(fmt.Sprintf("sqlite://%s/meta.db", t.TempDir()), s.Config.Master.MetaTimeout)
assert.NoError(t, err)
s.DataClient, err = data.Open(fmt.Sprintf("sqlite://%s/data.db", t.TempDir()), "")
assert.NoError(t, err)
s.CacheClient, err = cache.Open(fmt.Sprintf("sqlite://%s/cache.db", t.TempDir()), "")
assert.NoError(t, err)
// init database
err = s.metaStore.Init()
assert.NoError(t, err)
err = s.DataClient.Init()
assert.NoError(t, err)
err = s.CacheClient.Init()
Expand All @@ -88,7 +94,9 @@ func newMockServer(t *testing.T) (*mockServer, string) {
}

func (s *mockServer) Close(t *testing.T) {
err := s.DataClient.Close()
err := s.metaStore.Close()
assert.NoError(t, err)
err = s.DataClient.Close()
assert.NoError(t, err)
err = s.CacheClient.Close()
assert.NoError(t, err)
Expand Down Expand Up @@ -328,19 +336,32 @@ func TestMaster_GetCluster(t *testing.T) {
s, cookie := newMockServer(t)
defer s.Close(t)
// add nodes
serverNode := &Node{"alan turnin", ServerNode, "192.168.1.100", 1080, "server_version"}
workerNode := &Node{"dennis ritchie", WorkerNode, "192.168.1.101", 1081, "worker_version"}
s.nodesInfo = make(map[string]*Node)
s.nodesInfo["alan turning"] = serverNode
s.nodesInfo["dennis ritchie"] = workerNode
serverNode := &meta.Node{
UUID: "alan turnin",
Hostname: "192.168.1.100",
Type: protocol.NodeType_Server.String(),
Version: "server_version",
UpdateTime: time.Now().UTC(),
}
workerNode := &meta.Node{
UUID: "dennis ritchie",
Hostname: "192.168.1.101",
Type: protocol.NodeType_Worker.String(),
Version: "worker_version",
UpdateTime: time.Now().UTC(),
}
err := s.metaStore.UpdateNode(serverNode)
assert.NoError(t, err)
err = s.metaStore.UpdateNode(workerNode)
assert.NoError(t, err)
// get nodes
apitest.New().
Handler(s.handler).
Get("/api/dashboard/cluster").
Header("Cookie", cookie).
Expect(t).
Status(http.StatusOK).
Body(marshal(t, []*Node{workerNode, serverNode})).
Body(marshal(t, []*meta.Node{serverNode, workerNode})).
End()
}

Expand Down
Loading

0 comments on commit beb120c

Please sign in to comment.