Skip to content

Commit

Permalink
support SQLite in cluster mode (#896)
Browse files Browse the repository at this point in the history
  • Loading branch information
zhenghaoz authored Dec 5, 2024
1 parent 057ee55 commit 4665b55
Show file tree
Hide file tree
Showing 9 changed files with 100 additions and 79 deletions.
8 changes: 2 additions & 6 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,9 +643,7 @@ func (config *Config) Validate(oneModel bool) error {
storage.ClickhousePrefix,
storage.CHHTTPPrefix,
storage.CHHTTPSPrefix,
}
if oneModel {
prefixes = append(prefixes, storage.SQLitePrefix)
storage.SQLitePrefix,
}
for _, prefix := range prefixes {
if strings.HasPrefix(fl.Field().String(), prefix) {
Expand All @@ -665,9 +663,7 @@ func (config *Config) Validate(oneModel bool) error {
storage.MySQLPrefix,
storage.PostgresPrefix,
storage.PostgreSQLPrefix,
}
if oneModel {
prefixes = append(prefixes, storage.SQLitePrefix)
storage.SQLitePrefix,
}
for _, prefix := range prefixes {
if strings.HasPrefix(fl.Field().String(), prefix) {
Expand Down
3 changes: 3 additions & 0 deletions config/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
# The database for caching, support Redis, MySQL, Postgres and MongoDB:
# redis://<user>:<password>@<host>:<port>/<db_number>
# rediss://<user>:<password>@<host>:<port>/<db_number>
# mysql://[username[:password]@][protocol[(address)]]/dbname[?param1=value1&...&paramN=valueN]
# postgres://bob:secret@1.2.3.4:5432/mydb?sslmode=verify-full
# postgresql://bob:secret@1.2.3.4:5432/mydb?sslmode=verify-full
# mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]]
# mongodb+srv://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]]
# sqlite://<path>
cache_store = "redis://localhost:6379/0"

# The database for persist data, support MySQL, Postgres, ClickHouse and MongoDB:
Expand All @@ -18,6 +20,7 @@ cache_store = "redis://localhost:6379/0"
# chhttps://user:password@host[:port]/database?param1=value1&...&paramN=valueN
# mongodb://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]]
# mongodb+srv://[username:password@]host1[:port1][,...hostN[:portN]][/[defaultauthdb][?options]]
# sqlite://<path>
data_store = "mysql://gorse:gorse_pass@tcp(localhost:3306)/gorse"

# The naming prefix for tables (collections, keys) in databases. The default value is empty.
Expand Down
2 changes: 2 additions & 0 deletions master/master.go
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ func (m *Master) Serve() {
}
m.grpcServer = grpc.NewServer(opts...)
protocol.RegisterMasterServer(m.grpcServer, m)
protocol.RegisterCacheStoreServer(m.grpcServer, cache.NewProxyServer(m.CacheClient))
protocol.RegisterDataStoreServer(m.grpcServer, data.NewProxyServer(m.DataClient))
if err = m.grpcServer.Serve(lis); err != nil {
log.Logger().Fatal("failed to start rpc server", zap.Error(err))
}
Expand Down
37 changes: 25 additions & 12 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"encoding/json"
"fmt"
"math/rand"
"strings"
"time"

"github.com/emicklei/go-restful/v3"
Expand All @@ -28,6 +29,7 @@ import (
"github.com/zhenghaoz/gorse/cmd/version"
"github.com/zhenghaoz/gorse/config"
"github.com/zhenghaoz/gorse/protocol"
"github.com/zhenghaoz/gorse/storage"
"github.com/zhenghaoz/gorse/storage/cache"
"github.com/zhenghaoz/gorse/storage/data"
"go.opentelemetry.io/otel"
Expand All @@ -45,6 +47,7 @@ type Server struct {
cachePrefix string
dataPath string
dataPrefix string
conn *grpc.ClientConn
masterClient protocol.MasterClient
serverName string
masterHost string
Expand Down Expand Up @@ -117,11 +120,11 @@ func (s *Server) Serve() {
} else {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
conn, err := grpc.Dial(fmt.Sprintf("%v:%v", s.masterHost, s.masterPort), opts...)
s.conn, err = grpc.Dial(fmt.Sprintf("%v:%v", s.masterHost, s.masterPort), opts...)
if err != nil {
log.Logger().Fatal("failed to connect master", zap.Error(err))
}
s.masterClient = protocol.NewMasterClient(conn)
s.masterClient = protocol.NewMasterClient(s.conn)

go s.Sync()
container := restful.NewContainer()
Expand Down Expand Up @@ -162,23 +165,33 @@ func (s *Server) Sync() {

// connect to data store
if s.dataPath != s.Config.Database.DataStore || s.dataPrefix != s.Config.Database.DataTablePrefix {
log.Logger().Info("connect data store",
zap.String("database", log.RedactDBURL(s.Config.Database.DataStore)))
if s.DataClient, err = data.Open(s.Config.Database.DataStore, s.Config.Database.DataTablePrefix); err != nil {
log.Logger().Error("failed to connect data store", zap.Error(err))
goto sleep
if strings.HasPrefix(s.Config.Database.DataStore, storage.SQLitePrefix) {
log.Logger().Info("connect cache store via master")
s.DataClient = data.NewProxyClient(s.conn)
} else {
log.Logger().Info("connect data store",
zap.String("database", log.RedactDBURL(s.Config.Database.DataStore)))
if s.DataClient, err = data.Open(s.Config.Database.DataStore, s.Config.Database.DataTablePrefix); err != nil {
log.Logger().Error("failed to connect data store", zap.Error(err))
goto sleep
}
}
s.dataPath = s.Config.Database.DataStore
s.dataPrefix = s.Config.Database.DataTablePrefix
}

// connect to cache store
if s.cachePath != s.Config.Database.CacheStore || s.cachePrefix != s.Config.Database.CacheTablePrefix {
log.Logger().Info("connect cache store",
zap.String("database", log.RedactDBURL(s.Config.Database.CacheStore)))
if s.CacheClient, err = cache.Open(s.Config.Database.CacheStore, s.Config.Database.CacheTablePrefix); err != nil {
log.Logger().Error("failed to connect cache store", zap.Error(err))
goto sleep
if strings.HasPrefix(s.Config.Database.CacheStore, storage.SQLitePrefix) {
log.Logger().Info("connect cache store via master")
s.CacheClient = cache.NewProxyClient(s.conn)
} else {
log.Logger().Info("connect cache store",
zap.String("database", log.RedactDBURL(s.Config.Database.CacheStore)))
if s.CacheClient, err = cache.Open(s.Config.Database.CacheStore, s.Config.Database.CacheTablePrefix); err != nil {
log.Logger().Error("failed to connect cache store", zap.Error(err))
goto sleep
}
}
s.cachePath = s.Config.Database.CacheStore
s.cachePrefix = s.Config.Database.CacheTablePrefix
Expand Down
16 changes: 6 additions & 10 deletions storage/cache/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,6 @@ func (p *ProxyServer) GetTimeSeriesPoints(ctx context.Context, request *protocol
}

type ProxyClient struct {
*grpc.ClientConn
protocol.CacheStoreClient
}

Expand All @@ -207,6 +206,10 @@ func (p ProxyClient) Ping() error {
return err
}

func (p ProxyClient) Close() error {
return nil
}

func (p ProxyClient) Init() error {
return errors.MethodNotAllowedf("init is not allowed in proxy client")
}
Expand Down Expand Up @@ -428,15 +431,8 @@ func (p ProxyClient) GetTimeSeriesPoints(ctx context.Context, name string, begin
return points, nil
}

func OpenProxyClient(address string) (*ProxyClient, error) {
// Create gRPC connection
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
return nil, err
}
// Create client
func NewProxyClient(conn *grpc.ClientConn) *ProxyClient {
return &ProxyClient{
ClientConn: conn,
CacheStoreClient: protocol.NewCacheStoreClient(conn),
}, nil
}
}
31 changes: 17 additions & 14 deletions storage/cache/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,53 +17,56 @@ package cache
import (
"fmt"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc"
"net"
"testing"
)

type ProxyTestSuite struct {
baseTestSuite
SQLite Database
Server *ProxyServer
sqlite Database
server *ProxyServer
clientConn *grpc.ClientConn
}

func (suite *ProxyTestSuite) SetupSuite() {
// create database
var err error
path := fmt.Sprintf("sqlite://%s/sqlite.db", suite.T().TempDir())
suite.SQLite, err = Open(path, "gorse_")
suite.sqlite, err = Open(path, "gorse_")
suite.NoError(err)
// create schema
err = suite.SQLite.Init()
err = suite.sqlite.Init()
suite.NoError(err)
// start server
lis, err := net.Listen("tcp", "localhost:0")
suite.NoError(err)
suite.Server = NewProxyServer(suite.SQLite)
suite.server = NewProxyServer(suite.sqlite)
go func() {
err = suite.Server.Serve(lis)
err = suite.server.Serve(lis)
suite.NoError(err)
}()
// create proxy
suite.Database, err = OpenProxyClient(lis.Addr().String())
// create proxy client
suite.clientConn, err = grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
suite.NoError(err)
suite.Database = NewProxyClient(suite.clientConn)
}

func (suite *ProxyTestSuite) TearDownSuite() {
suite.Server.Stop()
suite.NoError(suite.Database.Close())
suite.NoError(suite.SQLite.Close())
suite.server.Stop()
suite.NoError(suite.clientConn.Close())
suite.NoError(suite.sqlite.Close())
}

func (suite *ProxyTestSuite) SetupTest() {
err := suite.SQLite.Ping()
err := suite.sqlite.Ping()
suite.NoError(err)
err = suite.SQLite.Purge()
err = suite.sqlite.Purge()
suite.NoError(err)
}

func (suite *ProxyTestSuite) TearDownTest() {
err := suite.SQLite.Purge()
err := suite.sqlite.Purge()
suite.NoError(err)
}

Expand Down
14 changes: 3 additions & 11 deletions storage/data/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -453,21 +453,13 @@ func (p *ProxyServer) GetFeedbackStream(in *protocol.GetFeedbackStreamRequest, s
}

type ProxyClient struct {
*grpc.ClientConn
protocol.DataStoreClient
}

func OpenProxyClient(address string) (*ProxyClient, error) {
// Create gRPC connection
conn, err := grpc.Dial(address, grpc.WithInsecure())
if err != nil {
return nil, err
}
// Create client
func NewProxyClient(conn *grpc.ClientConn) *ProxyClient {
return &ProxyClient{
ClientConn: conn,
DataStoreClient: protocol.NewDataStoreClient(conn),
}, nil
}
}

func (p ProxyClient) Init() error {
Expand All @@ -480,7 +472,7 @@ func (p ProxyClient) Ping() error {
}

func (p ProxyClient) Close() error {
return p.ClientConn.Close()
return nil
}

func (p ProxyClient) Optimize() error {
Expand Down
31 changes: 17 additions & 14 deletions storage/data/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,53 +17,56 @@ package data
import (
"fmt"
"github.com/stretchr/testify/suite"
"google.golang.org/grpc"
"net"
"testing"
)

type ProxyTestSuite struct {
baseTestSuite
SQLite Database
Server *ProxyServer
sqlite Database
server *ProxyServer
clientConn *grpc.ClientConn
}

func (suite *ProxyTestSuite) SetupSuite() {
// create database
var err error
path := fmt.Sprintf("sqlite://%s/sqlite.db", suite.T().TempDir())
suite.SQLite, err = Open(path, "gorse_")
suite.sqlite, err = Open(path, "gorse_")
suite.NoError(err)
// create schema
err = suite.SQLite.Init()
err = suite.sqlite.Init()
suite.NoError(err)
// start server
lis, err := net.Listen("tcp", "localhost:0")
suite.NoError(err)
suite.Server = NewProxyServer(suite.SQLite)
suite.server = NewProxyServer(suite.sqlite)
go func() {
err = suite.Server.Serve(lis)
err = suite.server.Serve(lis)
suite.NoError(err)
}()
// create proxy
suite.Database, err = OpenProxyClient(lis.Addr().String())
// create proxy client
suite.clientConn, err = grpc.Dial(lis.Addr().String(), grpc.WithInsecure())
suite.NoError(err)
suite.Database = NewProxyClient(suite.clientConn)
}

func (suite *ProxyTestSuite) TearDownSuite() {
suite.Server.Stop()
suite.NoError(suite.Database.Close())
suite.NoError(suite.SQLite.Close())
suite.server.Stop()
suite.NoError(suite.clientConn.Close())
suite.NoError(suite.sqlite.Close())
}

func (suite *ProxyTestSuite) SetupTest() {
err := suite.SQLite.Ping()
err := suite.sqlite.Ping()
suite.NoError(err)
err = suite.SQLite.Purge()
err = suite.sqlite.Purge()
suite.NoError(err)
}

func (suite *ProxyTestSuite) TearDownTest() {
err := suite.SQLite.Purge()
err := suite.sqlite.Purge()
suite.NoError(err)
}

Expand Down
Loading

0 comments on commit 4665b55

Please sign in to comment.