Skip to content

Commit

Permalink
resource_manager: implement the independent primary election (#6086)
Browse files Browse the repository at this point in the history
ref #5766

Implement the independent resource manager primary election.

Signed-off-by: JmPotato <ghzpotato@gmail.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
JmPotato and ti-chi-bot authored Mar 6, 2023
1 parent 312e5b4 commit bc5234c
Show file tree
Hide file tree
Showing 7 changed files with 267 additions and 50 deletions.
18 changes: 15 additions & 3 deletions pkg/mcs/resource_manager/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,18 @@ import (
)

const (
defaultName = "Resource Manager"
defaultBackendEndpoints = "127.0.0.1:2379"
defaultListenAddr = "127.0.0.1:3380"
defaultName = "Resource Manager"
// defaultBackendEndpoints is the default etcd endpoints for the resource manager.
defaultBackendEndpoints = "http://127.0.0.1:2379"
// defaultListenAddr is the default listening address for the resource manager.
defaultListenAddr = "http://127.0.0.1:3379"
defaultEnableGRPCGateway = true

defaultLogFormat = "text"
defaultDisableErrorVerbose = true

defaultLeaderLease = int64(3)

defaultReadBaseCost = 0.25
defaultWriteBaseCost = 1
// 1 RU = 64 KiB read bytes
Expand Down Expand Up @@ -66,6 +70,12 @@ type Config struct {

Security configutil.SecurityConfig `toml:"security" json:"security"`

// LeaderLease defines the time within which a Resource Manager primary/leader must
// update its TTL in etcd, otherwise etcd will expire the leader key and other servers
// can campaign the primary/leader again. Etcd only supports seconds TTL, so here is
// second too.
LeaderLease int64 `toml:"lease" json:"lease"`

// RequestUnit is the configuration determines the coefficients of the RRU and WRU cost.
// This configuration should be modified carefully.
RequestUnit RequestUnitConfig
Expand Down Expand Up @@ -180,6 +190,8 @@ func (c *Config) Adjust(meta *toml.MetaData, reloading bool) error {
c.Log.Format = defaultLogFormat
}

configutil.AdjustInt64(&c.LeaderLease, defaultLeaderLease)

c.RequestUnit.Adjust()

return nil
Expand Down
10 changes: 10 additions & 0 deletions pkg/mcs/resource_manager/server/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (

const (
namespace = "resource_manager"
serverSubsystem = "server"
ruSubsystem = "resource_unit"
resourceSubsystem = "resource"
resourceGroupNameLabel = "name"
Expand All @@ -29,6 +30,14 @@ const (
)

var (
// Meta & Server info.
serverInfo = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: namespace,
Subsystem: serverSubsystem,
Name: "info",
Help: "Indicate the resource manager server info, and the value is the start timestamp (s).",
}, []string{"version", "hash"})
// RU cost metrics.
readRequestUnitCost = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Expand Down Expand Up @@ -97,6 +106,7 @@ var (
)

func init() {
prometheus.MustRegister(serverInfo)
prometheus.MustRegister(readRequestUnitCost)
prometheus.MustRegister(writeRequestUnitCost)
prometheus.MustRegister(sqlLayerRequestUnitCost)
Expand Down
163 changes: 134 additions & 29 deletions pkg/mcs/resource_manager/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package server
import (
"context"
"crypto/tls"
"fmt"
"net"
"net/http"
"net/url"
Expand All @@ -35,8 +36,10 @@ import (
bs "github.com/tikv/pd/pkg/basicserver"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/discovery"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/memberutil"
"github.com/tikv/pd/pkg/utils/metricutil"
"github.com/tikv/pd/pkg/versioninfo"
"go.etcd.io/etcd/clientv3"
Expand All @@ -46,30 +49,37 @@ import (
)

const (
tcp = "tcp"
tcpNetworkStr = "tcp"
// resourceManagerKeyspaceGroupPrimaryElectionPrefix defines the key prefix for keyspace group primary election.
// The entire key is in the format of "/pd/<cluster-id>/microservice/resource-manager/keyspace-group-XXXXX/primary"
// in which XXXXX is 5 digits integer with leading zeros. For now we use 0 as the default cluster id.
resourceManagerKeyspaceGroupPrimaryElectionPrefix = "/pd/0/microservice/resource-manager/keyspace-group-"
// defaultGRPCGracefulStopTimeout is the default timeout to wait for grpc server to gracefully stop
defaultGRPCGracefulStopTimeout = 5 * time.Second
// defaultHTTPGracefulShutdownTimeout is the default timeout to wait for http server to gracefully shutdown
defaultHTTPGracefulShutdownTimeout = 5 * time.Second
defaultLeaseInSeconds = 3
leaderTickInterval = 50 * time.Millisecond
)

// Server is the resource manager server, and it implements bs.Server.
// nolint
type Server struct {
// Server state. 0 is not serving, 1 is serving.
isServing int64

ctx context.Context
serverLoopWg sync.WaitGroup
ctx context.Context
serverLoopCtx context.Context
serverLoopCancel func()
serverLoopWg sync.WaitGroup

cfg *Config
name string
backendURLs []url.URL
listenURL *url.URL
cfg *Config
name string
listenURL *url.URL

etcdClient *clientv3.Client
httpClient *http.Client
// for the primary election of resource manager
participant *member.Participant
etcdClient *clientv3.Client
httpClient *http.Client

muxListener net.Listener
service *Service
Expand All @@ -83,7 +93,7 @@ type Server struct {
serviceRegister *discovery.ServiceRegister
}

// Name returns the unique etcd Name for this server in etcd cluster.
// Name returns the unique etcd name for this server in etcd cluster.
func (s *Server) Name() string {
return s.name
}
Expand All @@ -93,12 +103,101 @@ func (s *Server) Context() context.Context {
return s.ctx
}

// Run runs the pd server.
func (s *Server) Run() error {
if err := s.initClient(); err != nil {
// Run runs the Resource Manager server.
func (s *Server) Run() (err error) {
if err = s.initClient(); err != nil {
return err
}
return s.startServer()
if err = s.startServer(); err != nil {
return err
}

s.startServerLoop()

return nil
}

func (s *Server) startServerLoop() {
s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx)
s.serverLoopWg.Add(1)
go s.primaryElectionLoop()
}

func (s *Server) primaryElectionLoop() {
defer logutil.LogPanic()
defer s.serverLoopWg.Done()

for {
if s.IsClosed() {
log.Info("server is closed, exit resource manager primary election loop")
return
}

primary, rev, checkAgain := s.participant.CheckLeader()
if checkAgain {
continue
}
if primary != nil {
log.Info("start to watch the primary/leader", zap.Stringer("resource-manager-primary", primary))
// WatchLeader will keep looping and never return unless the primary/leader has changed.
s.participant.WatchLeader(s.serverLoopCtx, primary, rev)
log.Info("the resource manager primary/leader has changed, try to re-campaign a primary/leader")
}

s.campaignLeader()
}
}

func (s *Server) campaignLeader() {
log.Info("start to campaign the primary/leader", zap.String("campaign-resource-manager-primary-name", s.participant.Member().Name))
if err := s.participant.CampaignLeader(s.cfg.LeaderLease); err != nil {
if err.Error() == errs.ErrEtcdTxnConflict.Error() {
log.Info("campaign resource manager primary/leader meets error due to txn conflict, another resource manager server may campaign successfully",
zap.String("campaign-resource-manager-primary-name", s.participant.Member().Name))
} else {
log.Error("campaign resource manager primary/leader meets error due to etcd error",
zap.String("campaign-resource-manager-primary-name", s.participant.Member().Name),
errs.ZapError(err))
}
return
}

// Start keepalive the leadership and enable Resource Manager service.
ctx, cancel := context.WithCancel(s.serverLoopCtx)
var resetLeaderOnce sync.Once
defer resetLeaderOnce.Do(func() {
cancel()
s.participant.ResetLeader()
})

// maintain the leadership, after this, Resource Manager could be ready to provide service.
s.participant.KeepLeader(ctx)
log.Info("campaign resource manager primary ok", zap.String("campaign-resource-manager-primary-name", s.participant.Member().Name))

log.Info("triggering the primary callback functions")
for _, cb := range s.primaryCallbacks {
cb(ctx)
}

s.participant.EnableLeader()
log.Info("resource manager primary is ready to serve", zap.String("resource-manager-primary-name", s.participant.Member().Name))

leaderTicker := time.NewTicker(leaderTickInterval)
defer leaderTicker.Stop()

for {
select {
case <-leaderTicker.C:
if !s.participant.IsLeader() {
log.Info("no longer a primary/leader because lease has expired, the resource manager primary/leader will step down")
return
}
case <-ctx.Done():
// Server is closed and it should return nil.
log.Info("server is closed")
return
}
}
}

// Close closes the server.
Expand All @@ -111,6 +210,7 @@ func (s *Server) Close() {
log.Info("closing resource manager server ...")
s.serviceRegister.Deregister()
s.muxListener.Close()
s.serverLoopCancel()
s.serverLoopWg.Wait()

if s.etcdClient != nil {
Expand Down Expand Up @@ -148,8 +248,7 @@ func (s *Server) AddStartCallback(callbacks ...func()) {

// IsServing returns whether the server is the leader, if there is embedded etcd, or the primary otherwise.
func (s *Server) IsServing() bool {
// TODO: implement this function with primary.
return atomic.LoadInt64(&s.isServing) == 1
return s.participant.IsLeader()
}

// IsClosed checks if the server loop is closed
Expand All @@ -171,8 +270,7 @@ func (s *Server) initClient() error {
if err != nil {
return err
}
s.backendURLs = []url.URL(u)
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, s.backendURLs)
s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u))
return err
}

Expand Down Expand Up @@ -253,15 +351,26 @@ func (s *Server) startGRPCAndHTTPServers(l net.Listener) {

// GetPrimary returns the primary member.
func (s *Server) GetPrimary() bs.MemberProvider {
// TODO: implement this function with primary.
return nil
return s.participant.GetLeader()
}

func (s *Server) startServer() error {
manager := NewManager[*Server](s)
// The independent Resource Manager service still reuses PD version info since PD and Resource Manager are just
// different service modes provided by the same pd-server binary
serverInfo.WithLabelValues(versioninfo.PDReleaseVersion, versioninfo.PDGitHash).Set(float64(time.Now().Unix()))

uniqueName := s.cfg.ListenAddr
uniqueID := memberutil.GenerateUniqueID(uniqueName)
log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID))
s.participant = member.NewParticipant(s.etcdClient, uniqueID)
s.participant.InitInfo(uniqueName, resourceManagerKeyspaceGroupPrimaryElectionPrefix+fmt.Sprintf("%05d", 0), "primary", "keyspace group primary election")
s.participant.SetMemberDeployPath(s.participant.ID())
s.participant.SetMemberBinaryVersion(s.participant.ID(), versioninfo.PDReleaseVersion)
s.participant.SetMemberGitHash(s.participant.ID(), versioninfo.PDGitHash)

s.service = &Service{
ctx: s.ctx,
manager: manager,
manager: NewManager[*Server](s),
}

tlsConfig, err := s.cfg.Security.ToTLSConfig()
Expand All @@ -273,9 +382,9 @@ func (s *Server) startServer() error {
return err
}
if tlsConfig != nil {
s.muxListener, err = tls.Listen(tcp, s.listenURL.Host, tlsConfig)
s.muxListener, err = tls.Listen(tcpNetworkStr, s.listenURL.Host, tlsConfig)
} else {
s.muxListener, err = net.Listen(tcp, s.listenURL.Host)
s.muxListener, err = net.Listen(tcpNetworkStr, s.listenURL.Host)
}
if err != nil {
return err
Expand All @@ -289,10 +398,6 @@ func (s *Server) startServer() error {
for _, cb := range s.startCallbacks {
cb()
}
// TODO: resolve callback for the primary
for _, cb := range s.primaryCallbacks {
cb(s.ctx)
}

// Server has started.
atomic.StoreInt64(&s.isServing, 1)
Expand Down
60 changes: 60 additions & 0 deletions pkg/mcs/resource_manager/server/testutil.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"context"
"os"

"github.com/pingcap/log"
"github.com/spf13/cobra"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/utils/logutil"
)

// CleanupFunc closes test resource manager server(s) and deletes any files left behind.
type CleanupFunc func()

// NewTestServer creates a resource manager server for testing.
func NewTestServer(ctx context.Context, re *require.Assertions, cfg *Config) (*Server, CleanupFunc, error) {
// New zap logger
err := logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog)
re.NoError(err)
log.ReplaceGlobals(cfg.Logger, cfg.LogProps)
// Flushing any buffered log entries
defer log.Sync()

s := NewServer(ctx, cfg)
if err = s.Run(); err != nil {
return nil, nil, err
}

cleanup := func() {
s.Close()
os.RemoveAll(cfg.DataDir)
}
return s, cleanup, nil
}

// NewTestDefaultConfig creates a new default config for testing.
func NewTestDefaultConfig() (*Config, error) {
cmd := &cobra.Command{
Use: "resource_manager",
Short: "Run the resource manager service",
}
cfg := NewConfig()
flagSet := cmd.Flags()
return cfg, cfg.Parse(flagSet)
}
Loading

0 comments on commit bc5234c

Please sign in to comment.