diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index b9e8eb311ff..36b45f885d6 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -56,14 +56,20 @@ type GroupManager struct { ctx context.Context cancel context.CancelFunc wg sync.WaitGroup - // the lock for the groups + sync.RWMutex // groups is the cache of keyspace group related information. // user kind -> keyspace group groups map[endpoint.UserKind]*indexedHeap + // patrolKeyspaceAssignmentOnce is used to indicate whether we have patrolled all keyspaces + // and assign them to the keyspace groups. + patrolKeyspaceAssignmentOnce bool // store is the storage for keyspace group related information. - store endpoint.KeyspaceGroupStorage + store interface { + endpoint.KeyspaceGroupStorage + endpoint.KeyspaceStorage + } client *clientv3.Client @@ -80,7 +86,15 @@ type GroupManager struct { } // NewKeyspaceGroupManager creates a Manager of keyspace group related data. -func NewKeyspaceGroupManager(ctx context.Context, store endpoint.KeyspaceGroupStorage, client *clientv3.Client, clusterID uint64) *GroupManager { +func NewKeyspaceGroupManager( + ctx context.Context, + store interface { + endpoint.KeyspaceGroupStorage + endpoint.KeyspaceStorage + }, + client *clientv3.Client, + clusterID uint64, +) *GroupManager { ctx, cancel := context.WithCancel(ctx) key := discovery.TSOPath(clusterID) groups := make(map[endpoint.UserKind]*indexedHeap) @@ -156,6 +170,38 @@ func (m *GroupManager) Close() { m.wg.Wait() } +// patrolKeyspaceAssignment is used to patrol all keyspaces and assign them to the keyspace groups. +func (m *GroupManager) patrolKeyspaceAssignment() error { + m.Lock() + defer m.Unlock() + if m.patrolKeyspaceAssignmentOnce { + return nil + } + keyspaces, err := m.store.LoadRangeKeyspace(utils.DefaultKeyspaceID, 0) + if err != nil { + return err + } + config, err := m.getKeyspaceConfigByKindLocked(endpoint.Basic) + if err != nil { + return err + } + for _, ks := range keyspaces { + if ks == nil { + continue + } + groupID, err := strconv.ParseUint(config[TSOKeyspaceGroupIDKey], 10, 64) + if err != nil { + return err + } + err = m.updateKeyspaceForGroupLocked(endpoint.Basic, groupID, ks.GetId(), opAdd) + if err != nil { + return err + } + } + m.patrolKeyspaceAssignmentOnce = true + return nil +} + func (m *GroupManager) allocNodesToAllKeyspaceGroups() { defer logutil.LogPanic() defer m.wg.Done() @@ -426,11 +472,18 @@ func (m *GroupManager) GetKeyspaceConfigByKind(userKind endpoint.UserKind) (map[ } m.RLock() defer m.RUnlock() + return m.getKeyspaceConfigByKindLocked(userKind) +} + +func (m *GroupManager) getKeyspaceConfigByKindLocked(userKind endpoint.UserKind) (map[string]string, error) { groups, ok := m.groups[userKind] if !ok { return map[string]string{}, errors.Errorf("user kind %s not found", userKind) } kg := groups.Top() + if kg == nil { + return map[string]string{}, errors.Errorf("no keyspace group for user kind %s", userKind) + } id := strconv.FormatUint(uint64(kg.ID), 10) config := map[string]string{ UserKindKey: userKind.String(), @@ -452,9 +505,13 @@ func (m *GroupManager) UpdateKeyspaceForGroup(userKind endpoint.UserKind, groupI m.Lock() defer m.Unlock() - kg := m.groups[userKind].Get(uint32(id)) + return m.updateKeyspaceForGroupLocked(userKind, id, keyspaceID, mutation) +} + +func (m *GroupManager) updateKeyspaceForGroupLocked(userKind endpoint.UserKind, groupID uint64, keyspaceID uint32, mutation int) error { + kg := m.groups[userKind].Get(uint32(groupID)) if kg == nil { - return errors.Errorf("keyspace group %d not found", id) + return errors.Errorf("keyspace group %d not found", groupID) } if kg.IsSplitting() { return ErrKeyspaceGroupInSplit @@ -535,11 +592,14 @@ func (m *GroupManager) UpdateKeyspaceGroup(oldGroupID, newGroupID string, oldUse // SplitKeyspaceGroupByID splits the keyspace group by ID into a new keyspace group with the given new ID. // And the keyspaces in the old keyspace group will be moved to the new keyspace group. func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint32, keyspaces []uint32) error { + err := m.patrolKeyspaceAssignment() + if err != nil { + return err + } var splitSourceKg, splitTargetKg *endpoint.KeyspaceGroup m.Lock() defer m.Unlock() - // TODO: avoid to split when the keyspaces is empty. - if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) { + if err = m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) { // Load the old keyspace group first. splitSourceKg, err = m.store.LoadKeyspaceGroup(txn, splitSourceID) if err != nil { @@ -564,13 +624,15 @@ func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint3 if splitTargetKg != nil { return ErrKeyspaceGroupExists } + keyspaceNum := len(keyspaces) + sourceKeyspaceNum := len(splitSourceKg.Keyspaces) // Check if the keyspaces are all in the old keyspace group. - if len(keyspaces) > len(splitSourceKg.Keyspaces) { + if keyspaceNum == 0 || keyspaceNum > sourceKeyspaceNum { return ErrKeyspaceNotInKeyspaceGroup } var ( - oldKeyspaceMap = make(map[uint32]struct{}, len(splitSourceKg.Keyspaces)) - newKeyspaceMap = make(map[uint32]struct{}, len(keyspaces)) + oldKeyspaceMap = make(map[uint32]struct{}, sourceKeyspaceNum) + newKeyspaceMap = make(map[uint32]struct{}, keyspaceNum) ) for _, keyspace := range splitSourceKg.Keyspaces { oldKeyspaceMap[keyspace] = struct{}{} @@ -582,7 +644,7 @@ func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint3 newKeyspaceMap[keyspace] = struct{}{} } // Get the split keyspace group for the old keyspace group. - splitKeyspaces := make([]uint32, 0, len(splitSourceKg.Keyspaces)-len(keyspaces)) + splitKeyspaces := make([]uint32, 0, sourceKeyspaceNum-keyspaceNum) for _, keyspace := range splitSourceKg.Keyspaces { if _, ok := newKeyspaceMap[keyspace]; !ok { splitKeyspaces = append(splitKeyspaces, keyspace) diff --git a/pkg/keyspace/tso_keyspace_group_test.go b/pkg/keyspace/tso_keyspace_group_test.go index 6286a71b3aa..cfe035578e2 100644 --- a/pkg/keyspace/tso_keyspace_group_test.go +++ b/pkg/keyspace/tso_keyspace_group_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/mock/mockcluster" @@ -251,6 +252,9 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() { // split the keyspace group 1 to 4 err = suite.kgm.SplitKeyspaceGroupByID(1, 4, []uint32{333}) re.ErrorIs(err, ErrKeyspaceGroupNotEnoughReplicas) + // split the keyspace group 2 to 4 without giving any keyspace + err = suite.kgm.SplitKeyspaceGroupByID(2, 4, []uint32{}) + re.ErrorIs(err, ErrKeyspaceNotInKeyspaceGroup) // split the keyspace group 2 to 4 err = suite.kgm.SplitKeyspaceGroupByID(2, 4, []uint32{333}) re.NoError(err) @@ -317,3 +321,31 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() { err = suite.kgm.SplitKeyspaceGroupByID(2, 5, []uint32{111, 222, 444}) re.ErrorIs(err, ErrKeyspaceNotInKeyspaceGroup) } + +func (suite *keyspaceGroupTestSuite) TestPatrolKeyspaceAssignment() { + re := suite.Require() + // Force the patrol to run once. + suite.kgm.patrolKeyspaceAssignmentOnce = false + // Create a keyspace group without any keyspace. + err := suite.kgm.CreateKeyspaceGroups([]*endpoint.KeyspaceGroup{ + { + ID: uint32(1), + UserKind: endpoint.Basic.String(), + Members: make([]endpoint.KeyspaceGroupMember, 2), + }, + }) + re.NoError(err) + // Create a keyspace without any keyspace group. + now := time.Now().Unix() + err = suite.kg.saveNewKeyspace(&keyspacepb.KeyspaceMeta{ + Id: 111, + Name: "111", + State: keyspacepb.KeyspaceState_ENABLED, + CreatedAt: now, + StateChangedAt: now, + }) + re.NoError(err) + // Split to see if the keyspace is attached to the group. + err = suite.kgm.SplitKeyspaceGroupByID(1, 2, []uint32{111}) + re.NoError(err) +} diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 9cd46ad2d41..38000c257ce 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -83,6 +83,10 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TearDownTest() { func cleanupKeyspaceGroups(re *require.Assertions, server *tests.TestServer) { for _, group := range handlersutil.MustLoadKeyspaceGroups(re, server, "0", "0") { + // Do not delete default keyspace group. + if group.ID == mcsutils.DefaultKeyspaceGroupID { + continue + } handlersutil.MustDeleteKeyspaceGroup(re, server, group.ID) } }