Skip to content

Commit

Permalink
Add filter and batch handlng when loading keyspace groups meta from s…
Browse files Browse the repository at this point in the history
…torage

Signed-off-by: Bin Shi <binshi.bing@gmail.com>
  • Loading branch information
binshi-bing committed Mar 30, 2023
1 parent bae8be8 commit d6c17e7
Show file tree
Hide file tree
Showing 6 changed files with 189 additions and 70 deletions.
10 changes: 10 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,16 @@
# AUTOGENERATED BY github.com/pingcap/errors/errdoc-gen
# YOU CAN CHANGE THE 'description'/'workaround' FIELDS IF THEM ARE IMPROPER.

["ErrLoadKeyspaceGroupsTerminated"]
error = '''
load keyspace grops terminated
'''

["ErrLoadKeyspaceGroupsTimeout"]
error = '''
load keyspace grops timeout
'''

["PD:ErrEncryptionKMS"]
error = '''
KMS error
Expand Down
1 change: 1 addition & 0 deletions pd.code-workspace
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
],
"settings": {
"cSpell.words": [
"ksgs",
"safepoint"
]
}
Expand Down
22 changes: 12 additions & 10 deletions pkg/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,18 @@ var (

// tso errors
var (
ErrSetLocalTSOConfig = errors.Normalize("set local tso config failed, %s", errors.RFCCodeText("PD:tso:ErrSetLocalTSOConfig"))
ErrGetAllocator = errors.Normalize("get allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocator"))
ErrGetLocalAllocator = errors.Normalize("get local allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetLocalAllocator"))
ErrSyncMaxTS = errors.Normalize("sync max ts failed, %s", errors.RFCCodeText("PD:tso:ErrSyncMaxTS"))
ErrResetUserTimestamp = errors.Normalize("reset user timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrResetUserTimestamp"))
ErrGenerateTimestamp = errors.Normalize("generate timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrGenerateTimestamp"))
ErrLogicOverflow = errors.Normalize("logic part overflow", errors.RFCCodeText("PD:tso:ErrLogicOverflow"))
ErrProxyTSOTimeout = errors.Normalize("proxy tso timeout", errors.RFCCodeText("PD:tso:ErrProxyTSOTimeout"))
ErrKeyspaceGroupIDInvalid = errors.Normalize("the keyspace group id is invalid, %s", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupIDInvalid"))
ErrGetAllocatorManager = errors.Normalize("get allocator manager failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocatorManager"))
ErrSetLocalTSOConfig = errors.Normalize("set local tso config failed, %s", errors.RFCCodeText("PD:tso:ErrSetLocalTSOConfig"))
ErrGetAllocator = errors.Normalize("get allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocator"))
ErrGetLocalAllocator = errors.Normalize("get local allocator failed, %s", errors.RFCCodeText("PD:tso:ErrGetLocalAllocator"))
ErrSyncMaxTS = errors.Normalize("sync max ts failed, %s", errors.RFCCodeText("PD:tso:ErrSyncMaxTS"))
ErrResetUserTimestamp = errors.Normalize("reset user timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrResetUserTimestamp"))
ErrGenerateTimestamp = errors.Normalize("generate timestamp failed, %s", errors.RFCCodeText("PD:tso:ErrGenerateTimestamp"))
ErrLogicOverflow = errors.Normalize("logic part overflow", errors.RFCCodeText("PD:tso:ErrLogicOverflow"))
ErrProxyTSOTimeout = errors.Normalize("proxy tso timeout", errors.RFCCodeText("PD:tso:ErrProxyTSOTimeout"))
ErrKeyspaceGroupIDInvalid = errors.Normalize("the keyspace group id is invalid, %s", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupIDInvalid"))
ErrGetAllocatorManager = errors.Normalize("get allocator manager failed, %s", errors.RFCCodeText("PD:tso:ErrGetAllocatorManager"))
ErrLoadKeyspaceGroupsTimeout = errors.Normalize("load keyspace grops timeout", errors.RFCCodeText("ErrLoadKeyspaceGroupsTimeout"))
ErrLoadKeyspaceGroupsTerminated = errors.Normalize("load keyspace grops terminated", errors.RFCCodeText("ErrLoadKeyspaceGroupsTerminated"))
)

// member errors
Expand Down
15 changes: 10 additions & 5 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,10 @@ type Server struct {

// Callback functions for different stages
// startCallbacks will be called after the server is started.
startCallbacks []func()
startCallbacks []func()

// for service registry
serviceID *discovery.ServiceRegistryEntry
serviceRegister *discovery.ServiceRegister
}

Expand Down Expand Up @@ -435,8 +438,10 @@ func (s *Server) startServer() (err error) {
legacySvcRootPath := path.Join(pdRootPath, strconv.FormatUint(s.clusterID, 10))
tsoSvcRootPath := fmt.Sprintf(tsoSvcRootPathFormat, s.clusterID)
s.keyspaceGroupManager = tso.NewKeyspaceGroupManager(
s.serverLoopCtx, s.etcdClient, s.listenURL.Host, legacySvcRootPath, tsoSvcRootPath, s.cfg)
if err := s.keyspaceGroupManager.Initialize(); err != nil {
s.serverLoopCtx, s.serviceID, s.etcdClient, s.listenURL.Host, legacySvcRootPath, tsoSvcRootPath, s.cfg)
// The param `false` means that we don't initialize the keyspace group manager
// by loading the keyspace group meta from etcd.
if err := s.keyspaceGroupManager.Initialize(false); err != nil {
return err
}

Expand Down Expand Up @@ -466,8 +471,8 @@ func (s *Server) startServer() (err error) {
}

// Server has started.
entry := &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr}
serializedEntry, err := entry.Serialize()
s.serviceID = &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr}
serializedEntry, err := s.serviceID.Serialize()
if err != nil {
return err
}
Expand Down
203 changes: 150 additions & 53 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/discovery"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/member"
"github.com/tikv/pd/pkg/storage/endpoint"
Expand All @@ -36,28 +37,39 @@ import (
"go.uber.org/zap"
)

// primaryElectionSuffix is the suffix of the key for keyspace group primary election
const primaryElectionSuffix = "primary"
const (
// primaryElectionSuffix is the suffix of the key for keyspace group primary election
primaryElectionSuffix = "primary"
// defaultLoadKeyspaceGroupsTimeout is the default timeout for loading the initial
// keyspace group assignment
defaultLoadKeyspaceGroupsTimeout = 3 * time.Minute
defaultLoadKeyspaceGroupsBatchSize = 500
)

// KeyspaceGroupManager manages the members of the keyspace groups assigned to this host.
// The replicas campaign for the leaders which provide the tso service for the corresponding
// keyspace groups.
type KeyspaceGroupManager struct {
// mu protects ams being concurrently updated when the service shutdown and online keyspace
// group membership/distribution change happen simultaneously. The former will close all
// keyspace groups, while the latter could create new keyspace groups. Accessing ams[i] doesn't
// need to acquire this lock.
// mu protects the `ams` and `ksgs` data structures from concurrent addition/removal of
// keyspace groups during critical periods such as service shutdown, initial keyspace group
// assignment loading, and online keyspace group membership/distribution changes.
// It's important to note that accessing `ams[i]` does not require acquiring this lock.
mu sync.Mutex
// ams stores the allocator managers of the keyspace groups. Each keyspace group is assigned
// with an allocator manager managing its global/local tso allocators.
// Use a fixed size array to maximize the efficiency of concurrent access to
// different keyspace groups for tso service.
ams [mcsutils.MaxKeyspaceGroupCountInUse]atomic.Pointer[AllocatorManager]
// ksgs stores the keyspace groups' membership/distribution meta.
ksgs [mcsutils.MaxKeyspaceGroupCountInUse]atomic.Pointer[endpoint.KeyspaceGroup]

ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup

ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
etcdClient *clientv3.Client
// tsoServiceID is the service ID of the TSO service, registered in the service discovery
tsoServiceID *discovery.ServiceRegistryEntry
etcdClient *clientv3.Client
// electionNamePrefix is the name prefix to generate the unique name of a participant,
// which participate in the election of its keyspace group's primary, in the format of
// "electionNamePrefix:keyspace-group-id"
Expand Down Expand Up @@ -94,11 +106,14 @@ type KeyspaceGroupManager struct {
// cfg is the TSO config
cfg ServiceConfig
maxResetTSGap func() time.Duration
// loadKeyspaceGroupsTimeout is the timeout for loading the initial keyspace group assignment
loadKeyspaceGroupsTimeout time.Duration
}

// NewKeyspaceGroupManager creates a new Keyspace Group Manager.
func NewKeyspaceGroupManager(
ctx context.Context,
tsoServiceID *discovery.ServiceRegistryEntry,
etcdClient *clientv3.Client,
electionNamePrefix string,
legacySvcRootPath string,
Expand All @@ -113,33 +128,70 @@ func NewKeyspaceGroupManager(

ctx, cancel := context.WithCancel(ctx)
ksgMgr := &KeyspaceGroupManager{
ctx: ctx,
cancel: cancel,
etcdClient: etcdClient,
electionNamePrefix: electionNamePrefix,
legacySvcRootPath: legacySvcRootPath,
tsoSvcRootPath: tsoSvcRootPath,
cfg: cfg,
maxResetTSGap: func() time.Duration { return cfg.GetMaxResetTSGap() },
ctx: ctx,
cancel: cancel,
tsoServiceID: tsoServiceID,
etcdClient: etcdClient,
electionNamePrefix: electionNamePrefix,
legacySvcRootPath: legacySvcRootPath,
tsoSvcRootPath: tsoSvcRootPath,
cfg: cfg,
maxResetTSGap: func() time.Duration { return cfg.GetMaxResetTSGap() },
loadKeyspaceGroupsTimeout: defaultLoadKeyspaceGroupsTimeout,
}

return ksgMgr
}

// Initialize this KeyspaceGroupManager
func (kgm *KeyspaceGroupManager) Initialize() error {
func (kgm *KeyspaceGroupManager) Initialize(loadFromStorage bool) error {
kgm.storage = endpoint.NewStorageEndpoint(
kv.NewEtcdKVBase(kgm.etcdClient, kgm.legacySvcRootPath), nil)
if kgm.storage == nil {
return errors.New("failed to create storage endpoint for keyspace group metadata")
}

if err := kgm.loadKeyspaceGroups(); err != nil {
return err
if !loadFromStorage {
group := &endpoint.KeyspaceGroup{
ID: mcsutils.DefaultKeySpaceGroupID,
Members: []endpoint.KeyspaceGroupMember{
{
Location: kgm.tsoServiceID.ServiceAddr,
},
},
Keyspaces: []uint32{mcsutils.DefaultKeyspaceID},
}
kgm.initKeyspaceGroup(group)
return nil
}

// Load the initial keyspace group assignment with time limit, then start the loop to
// watch keyspace group membership/distribution meta changes and apply dynamically.
initAssignmentDoneCh := make(chan error, 1)
waitTicker := time.NewTicker(kgm.loadKeyspaceGroupsTimeout)
defer waitTicker.Stop()

kgm.wg.Add(1)
go kgm.updateKeyspaceGroupsLoop()
go kgm.loadKeyspaceGroupsLoop(initAssignmentDoneCh)

select {
case err, ok := <-initAssignmentDoneCh:
if ok {
log.Error("failed to initialize keyspace group manager", errs.ZapError(err))
close(initAssignmentDoneCh)
return err
}
// Channel closed and there is no error.
case <-waitTicker.C:
log.Error("failed to initialize keyspace group manager",
zap.Any("timeout-setting", kgm.loadKeyspaceGroupsTimeout),
errs.ZapError(errs.ErrLoadKeyspaceGroupsTimeout))
// We might partially load the keyspace groups. Close the manager to to clean up and
// to stop on-the-fly goroutines.
close(initAssignmentDoneCh)
kgm.Close()
return errs.ErrLoadKeyspaceGroupsTimeout
}

return nil
}
Expand All @@ -149,6 +201,10 @@ func (kgm *KeyspaceGroupManager) Close() {
kgm.cancel()
kgm.wg.Wait()

kgm.closeKeyspaceGroups()
}

func (kgm *KeyspaceGroupManager) closeKeyspaceGroups() {
kgm.mu.Lock()
defer kgm.mu.Unlock()

Expand All @@ -159,61 +215,102 @@ func (kgm *KeyspaceGroupManager) Close() {
}
}

// loadKeyspaceGroups loads initial keyspace group assignment from etcd
// loadKeyspaceGroups loads initial keyspace group assignment from storage
func (kgm *KeyspaceGroupManager) loadKeyspaceGroups() error {
// Load the keyspace groups from etcd
groups, err := kgm.storage.LoadKeyspaceGroups(0, 0)
if err != nil {
return err
}
kgm.mu.Lock()
defer kgm.mu.Unlock()

var (
err error
groups []*endpoint.KeyspaceGroup
keyspaceGroupsLoaded uint32
keyspaceGroupsApplied uint32
)

// Load all keyspace groups from etcd and apply the ones assigned to this tso service.
for {
groups, err = kgm.storage.LoadKeyspaceGroups(keyspaceGroupsLoaded, defaultLoadKeyspaceGroupsBatchSize)
if err != nil {
return err
}

keyspaceGroupsLoaded += uint32(len(groups))

// Initialize the keyspace groups
for _, group := range groups {
var assignedToMe = false
for _, member := range group.Members {
if len(member.Location) > 0 && member.Location == kgm.tsoServiceID.ServiceAddr {
assignedToMe = true
break
}
}
if !assignedToMe {
continue
}

// Initialize the keyspace groups
for _, group := range groups {
select {
case <-kgm.ctx.Done():
return nil
default:
select {
case <-kgm.ctx.Done():
return errs.ErrLoadKeyspaceGroupsTerminated
default:
}

keyspaceGroupsApplied++

kgm.initKeyspaceGroup(group)
}

kgm.mu.Lock()
defer kgm.mu.Unlock()
kgm.initKeyspaceGroup(group.ID)
if len(groups) < defaultLoadKeyspaceGroupsBatchSize {
break
}
}

return nil
log.Info("loaded keyspace groups",
zap.Uint32("keyspace-groups-loaded", keyspaceGroupsLoaded),
zap.Uint32("keyspace-groups-applied", keyspaceGroupsApplied))

return err
}

// loadKeyspaceGroupsLoop loads the initial keyspace group assignment then periodically check
// if there is any change in keyspace group membership/distribution.
func (kgm *KeyspaceGroupManager) loadKeyspaceGroupsLoop(loadInitAssignmentDoneCh chan<- error) {
defer logutil.LogPanic()
defer kgm.wg.Done()

// Load the initial keyspace group assignment
if err := kgm.loadKeyspaceGroups(); err != nil {
loadInitAssignmentDoneCh <- err
return
}
close(loadInitAssignmentDoneCh)

// TODO: dynamically update the keyspace group membership/distribution
}

// initKeyspaceGroup initializes the given keyspace group
func (kgm *KeyspaceGroupManager) initKeyspaceGroup(groupID uint32) {
uniqueName := fmt.Sprintf("%s-%05d", kgm.electionNamePrefix, groupID)
func (kgm *KeyspaceGroupManager) initKeyspaceGroup(group *endpoint.KeyspaceGroup) {
uniqueName := fmt.Sprintf("%s-%05d", kgm.electionNamePrefix, group.ID)
uniqueID := memberutil.GenerateUniqueID(uniqueName)
log.Info("joining primary election",
zap.Uint32("keyspace-group-id", group.ID),
zap.String("participant-name", uniqueName),
zap.Uint64("participant-id", uniqueID))

participant := member.NewParticipant(kgm.etcdClient)
participant.InitInfo(
uniqueName, uniqueID,
path.Join(kgm.tsoSvcRootPath, fmt.Sprintf("%05d", groupID)),
path.Join(kgm.tsoSvcRootPath, fmt.Sprintf("%05d", group.ID)),
primaryElectionSuffix, "keyspace group primary election", kgm.cfg.GetAdvertiseListenAddr())

kgm.ams[groupID].Store(
kgm.ams[group.ID].Store(
NewAllocatorManager(
kgm.ctx, true, groupID, participant,
kgm.ctx, true, group.ID, participant,
kgm.legacySvcRootPath, kgm.storage,
kgm.cfg.IsLocalTSOEnabled(), kgm.cfg.GetTSOSaveInterval(),
kgm.cfg.GetTSOUpdatePhysicalInterval(), kgm.cfg.GetLeaderLease(),
kgm.cfg.GetTLSConfig(), kgm.maxResetTSGap))
}

// updateKeyspaceGroupsLoop periodically check if there is any change in keyspace group membership/distribution.
func (kgm *KeyspaceGroupManager) updateKeyspaceGroupsLoop() {
defer logutil.LogPanic()
defer kgm.wg.Done()
_, cancel := context.WithCancel(kgm.ctx)
defer cancel()

// TODO: dynamically update the keyspace group membership/distribution
kgm.ksgs[group.ID].Store(group)
}

// GetAllocatorManager returns the AllocatorManager of the given keyspace group
Expand Down
Loading

0 comments on commit d6c17e7

Please sign in to comment.