Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mcs: use memory storage #7061

Merged
merged 2 commits into from
Sep 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/basicserver/basic_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,5 +44,5 @@ type Server interface {
// IsServing returns whether the server is the leader, if there is embedded etcd, or the primary otherwise.
IsServing() bool
// AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise.
AddServiceReadyCallback(callbacks ...func(context.Context))
AddServiceReadyCallback(callbacks ...func(context.Context) error)
}
3 changes: 2 additions & 1 deletion pkg/mcs/resourcemanager/server/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (m *Manager) GetBasicServer() bs.Server {
}

// Init initializes the resource group manager.
func (m *Manager) Init(ctx context.Context) {
func (m *Manager) Init(ctx context.Context) error {
// Todo: If we can modify following configs in the future, we should reload these configs.
// Store the controller config into the storage.
m.storage.SaveControllerConfig(m.controllerConfig)
Expand Down Expand Up @@ -156,6 +156,7 @@ func (m *Manager) Init(ctx context.Context) {
m.persistLoop(ctx)
}()
log.Info("resource group manager finishes initialization")
return nil
}

// AddResourceGroup puts a resource group.
Expand Down
4 changes: 2 additions & 2 deletions pkg/mcs/resourcemanager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type Server struct {
service *Service

// primaryCallbacks will be called after the server becomes leader.
primaryCallbacks []func(context.Context)
primaryCallbacks []func(context.Context) error

serviceRegister *discovery.ServiceRegister
}
Expand Down Expand Up @@ -232,7 +232,7 @@ func (s *Server) IsClosed() bool {
}

// AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise.
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) {
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error) {
s.primaryCallbacks = append(s.primaryCallbacks, callbacks...)
}

Expand Down
18 changes: 13 additions & 5 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,19 +143,27 @@ func (c *Cluster) GetStoreConfig() sc.StoreConfigProvider { return c.persistConf

// AllocID allocates a new ID.
func (c *Cluster) AllocID() (uint64, error) {
cli := c.apiServerLeader.Load().(pdpb.PDClient)
if cli == nil {
c.checkMembershipCh <- struct{}{}
return 0, errors.New("API server leader is not found")
client, err := c.getAPIServerLeaderClient()
if err != nil {
return 0, err
}
resp, err := cli.AllocID(c.ctx, &pdpb.AllocIDRequest{Header: &pdpb.RequestHeader{ClusterId: c.clusterID}})
resp, err := client.AllocID(c.ctx, &pdpb.AllocIDRequest{Header: &pdpb.RequestHeader{ClusterId: c.clusterID}})
if err != nil {
c.checkMembershipCh <- struct{}{}
return 0, err
}
return resp.GetId(), nil
}

func (c *Cluster) getAPIServerLeaderClient() (pdpb.PDClient, error) {
cli := c.apiServerLeader.Load()
if cli == nil {
c.checkMembershipCh <- struct{}{}
return nil, errors.New("API server leader is not found")
}
return cli.(pdpb.PDClient), nil
}

// SwitchAPIServerLeader switches the API server leader.
func (c *Cluster) SwitchAPIServerLeader(new pdpb.PDClient) bool {
old := c.apiServerLeader.Load()
Expand Down
13 changes: 13 additions & 0 deletions pkg/mcs/scheduling/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/tikv/pd/pkg/core/storelimit"
"github.com/tikv/pd/pkg/mcs/utils"
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/configutil"
"github.com/tikv/pd/pkg/utils/grpcutil"
Expand Down Expand Up @@ -239,6 +240,18 @@ func (o *PersistConfig) SetScheduleConfig(cfg *sc.ScheduleConfig) {
o.schedule.Store(cfg)
}

// AdjustScheduleCfg adjusts the schedule config.
func (o *PersistConfig) AdjustScheduleCfg(scheduleCfg *sc.ScheduleConfig) {
// In case we add new default schedulers.
for _, ps := range sc.DefaultSchedulers {
if slice.NoneOf(scheduleCfg.Schedulers, func(i int) bool {
return scheduleCfg.Schedulers[i].Type == ps.Type
}) {
scheduleCfg.Schedulers = append(scheduleCfg.Schedulers, ps)
}
}
}

// GetReplicationConfig returns replication configurations.
func (o *PersistConfig) GetReplicationConfig() *sc.ReplicationConfig {
return o.replication.Load().(*sc.ReplicationConfig)
Expand Down
1 change: 1 addition & 0 deletions pkg/mcs/scheduling/server/config/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ func (cw *Watcher) initializeConfigWatcher() error {
zap.String("event-kv-key", string(kv.Key)), zap.Error(err))
return err
}
cw.AdjustScheduleCfg(&cfg.Schedule)
cw.SetClusterVersion(&cfg.ClusterVersion)
cw.SetScheduleConfig(&cfg.Schedule)
cw.SetReplicationConfig(&cfg.Replication)
Expand Down
66 changes: 45 additions & 21 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ type Server struct {
checkMembershipCh chan struct{}

// primaryCallbacks will be called after the server becomes leader.
primaryCallbacks []func(context.Context)
primaryCallbacks []func(context.Context) error
primaryExitCallbacks []func()

// for service registry
serviceID *discovery.ServiceRegistryEntry
Expand Down Expand Up @@ -164,6 +165,9 @@ func (s *Server) updateAPIServerMemberLoop() {
case <-ticker.C:
case <-s.checkMembershipCh:
}
if !s.IsServing() {
continue
}
members, err := s.GetClient().MemberList(ctx)
if err != nil {
log.Warn("failed to list members", errs.ZapError(err))
Expand Down Expand Up @@ -247,9 +251,16 @@ func (s *Server) campaignLeader() {

log.Info("triggering the primary callback functions")
for _, cb := range s.primaryCallbacks {
cb(ctx)
if err := cb(ctx); err != nil {
log.Error("failed to trigger the primary callback functions", errs.ZapError(err))
return
}
}

defer func() {
for _, cb := range s.primaryExitCallbacks {
cb()
}
}()
s.participant.EnableLeader()
log.Info("scheduling primary is ready to serve", zap.String("scheduling-primary-name", s.participant.Name()))

Expand Down Expand Up @@ -283,10 +294,6 @@ func (s *Server) Close() {
utils.StopHTTPServer(s)
utils.StopGRPCServer(s)
s.GetListener().Close()
s.GetCoordinator().Stop()
s.ruleWatcher.Close()
s.configWatcher.Close()
s.metaWatcher.Close()
s.serverLoopCancel()
s.serverLoopWg.Wait()

Expand All @@ -313,10 +320,15 @@ func (s *Server) IsClosed() bool {
}

// AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise.
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) {
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error) {
s.primaryCallbacks = append(s.primaryCallbacks, callbacks...)
}

// AddServiceExitCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise.
func (s *Server) AddServiceExitCallback(callbacks ...func()) {
s.primaryExitCallbacks = append(s.primaryExitCallbacks, callbacks...)
}

// GetTLSConfig gets the security config.
func (s *Server) GetTLSConfig() *grpcutil.TLSConfig {
return &s.cfg.Security.TLSConfig
Expand Down Expand Up @@ -381,20 +393,10 @@ func (s *Server) startServer() (err error) {
ListenUrls: []string{s.cfg.AdvertiseListenAddr},
}
s.participant.InitInfo(p, endpoint.SchedulingSvcRootPath(s.clusterID), utils.PrimaryKey, "primary election")
s.basicCluster = core.NewBasicCluster()
err = s.startWatcher()
if err != nil {
return err
}
s.storage = endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(s.GetClient(), endpoint.PDRootPath(s.clusterID)), nil)
s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), s.clusterID, s.basicCluster)
s.cluster, err = NewCluster(s.Context(), s.persistConfig, s.storage, s.basicCluster, s.hbStreams, s.clusterID, s.checkMembershipCh)
if err != nil {
return err
}

s.service = &Service{Server: s}
s.AddServiceReadyCallback(s.startCluster)
s.AddServiceExitCallback(s.stopCluster)
if err := s.InitListener(s.GetTLSConfig(), s.cfg.ListenAddr); err != nil {
return err
}
Expand All @@ -406,7 +408,6 @@ func (s *Server) startServer() (err error) {
go utils.StartGRPCAndHTTPServers(s, serverReadyChan, s.GetListener())
s.checkMembershipCh <- struct{}{}
<-serverReadyChan
go s.GetCoordinator().RunUntilStop()

// Run callbacks
log.Info("triggering the start callback functions")
Expand All @@ -429,6 +430,29 @@ func (s *Server) startServer() (err error) {
return nil
}

func (s *Server) startCluster(context.Context) error {
s.basicCluster = core.NewBasicCluster()
err := s.startWatcher()
if err != nil {
return err
}
s.storage = endpoint.NewStorageEndpoint(kv.NewMemoryKV(), nil)
s.hbStreams = hbstream.NewHeartbeatStreams(s.Context(), s.clusterID, s.basicCluster)
s.cluster, err = NewCluster(s.Context(), s.persistConfig, s.storage, s.basicCluster, s.hbStreams, s.clusterID, s.checkMembershipCh)
if err != nil {
return err
}
go s.GetCoordinator().RunUntilStop()
return nil
}

func (s *Server) stopCluster() {
s.GetCoordinator().Stop()
s.ruleWatcher.Close()
s.configWatcher.Close()
s.metaWatcher.Close()
}

func (s *Server) startWatcher() (err error) {
s.metaWatcher, err = meta.NewWatcher(s.Context(), s.GetClient(), s.clusterID, s.basicCluster)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ func (s *Server) ResignPrimary(keyspaceID, keyspaceGroupID uint32) error {

// AddServiceReadyCallback implements basicserver.
// It adds callbacks when it's ready for providing tso service.
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) {
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error) {
// Do nothing here. The primary of each keyspace group assigned to this host
// will respond to the requests accordingly.
}
Expand Down
4 changes: 2 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ type Server struct {
// startCallbacks will be called after the server is started.
startCallbacks []func()
// leaderCallbacks will be called after the server becomes leader.
leaderCallbacks []func(context.Context)
leaderCallbacks []func(context.Context) error
// closeCallbacks will be called before the server is closed.
closeCallbacks []func()

Expand Down Expand Up @@ -1547,7 +1547,7 @@ func (s *Server) IsServing() bool {
}

// AddServiceReadyCallback adds callbacks when the server becomes the leader if there is embedded etcd, or the primary otherwise.
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) {
func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context) error) {
s.leaderCallbacks = append(s.leaderCallbacks, callbacks...)
}

Expand Down
22 changes: 21 additions & 1 deletion tests/integrations/mcs/scheduling/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,16 @@ func (suite *serverTestSuite) TearDownSuite() {

func (suite *serverTestSuite) TestAllocID() {
re := suite.Require()
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember", `return(true)`))
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 1, suite.backendEndpoints)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
time.Sleep(200 * time.Millisecond)
id, err := tc.GetPrimaryServer().GetCluster().AllocID()
re.NoError(err)
re.NotEqual(uint64(0), id)
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember"))
}

func (suite *serverTestSuite) TestAllocIDAfterLeaderChange() {
Expand All @@ -83,15 +86,32 @@ func (suite *serverTestSuite) TestAllocIDAfterLeaderChange() {
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
time.Sleep(200 * time.Millisecond)
cluster := tc.GetPrimaryServer().GetCluster()
id, err := cluster.AllocID()
re.NoError(err)
re.NotEqual(uint64(0), id)
suite.cluster.ResignLeader()
suite.cluster.WaitLeader()
time.Sleep(time.Second)
time.Sleep(200 * time.Millisecond)
id1, err := cluster.AllocID()
re.NoError(err)
re.Greater(id1, id)
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/mcs/scheduling/server/fastUpdateMember"))
}

func (suite *serverTestSuite) TestPrimaryChange() {
re := suite.Require()
tc, err := tests.NewTestSchedulingCluster(suite.ctx, 2, suite.backendEndpoints)
re.NoError(err)
defer tc.Destroy()
tc.WaitForPrimaryServing(re)
primary := tc.GetPrimaryServer()
addr := primary.GetAddr()
re.Len(primary.GetCluster().GetCoordinator().GetSchedulersController().GetSchedulerNames(), 5)
primary.Close()
tc.WaitForPrimaryServing(re)
primary = tc.GetPrimaryServer()
re.NotEqual(addr, primary.GetAddr())
re.Len(primary.GetCluster().GetCoordinator().GetSchedulersController().GetSchedulerNames(), 5)
}
Loading