Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

keyspace: patrol keyspace assignment before the first split #6388

Merged
merged 8 commits into from
Apr 26, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 73 additions & 11 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,20 @@ type GroupManager struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
// the lock for the groups

sync.RWMutex
// groups is the cache of keyspace group related information.
// user kind -> keyspace group
groups map[endpoint.UserKind]*indexedHeap
// patrolKeyspaceAssignmentOnce is used to indicate whether we have patrolled all keyspaces
// and assign them to the keyspace groups.
patrolKeyspaceAssignmentOnce bool

// store is the storage for keyspace group related information.
store endpoint.KeyspaceGroupStorage
store interface {
endpoint.KeyspaceGroupStorage
endpoint.KeyspaceStorage
}

client *clientv3.Client

Expand All @@ -80,7 +86,15 @@ type GroupManager struct {
}

// NewKeyspaceGroupManager creates a Manager of keyspace group related data.
func NewKeyspaceGroupManager(ctx context.Context, store endpoint.KeyspaceGroupStorage, client *clientv3.Client, clusterID uint64) *GroupManager {
func NewKeyspaceGroupManager(
ctx context.Context,
store interface {
endpoint.KeyspaceGroupStorage
endpoint.KeyspaceStorage
},
client *clientv3.Client,
clusterID uint64,
) *GroupManager {
ctx, cancel := context.WithCancel(ctx)
key := discovery.TSOPath(clusterID)
groups := make(map[endpoint.UserKind]*indexedHeap)
Expand Down Expand Up @@ -156,6 +170,38 @@ func (m *GroupManager) Close() {
m.wg.Wait()
}

// patrolKeyspaceAssignment is used to patrol all keyspaces and assign them to the keyspace groups.
func (m *GroupManager) patrolKeyspaceAssignment() error {
m.Lock()
defer m.Unlock()
if m.patrolKeyspaceAssignmentOnce {
return nil
}
keyspaces, err := m.store.LoadRangeKeyspace(utils.DefaultKeyspaceID, 0)
if err != nil {
return err
}
config, err := m.getKeyspaceConfigByKindLocked(endpoint.Basic)
if err != nil {
return err
}
for _, ks := range keyspaces {
if ks == nil {
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to check if ks.config map already contains the 'group' entry, i.e., already assigned keyspace group? If yes, then skip?

continue
}
groupID, err := strconv.ParseUint(config[TSOKeyspaceGroupIDKey], 10, 64)
if err != nil {
return err
}
err = m.updateKeyspaceForGroupLocked(endpoint.Basic, groupID, ks.GetId(), opAdd)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How many existing keyspaces could we have? If we have too many keyspaces, it could take some time, as for every keyspace, we call saveKeyspaceGroups once. . If this is the case, can we just assign all keyspaces to the keyspace groups first, then call saveKeyspaceGroups just once to flush all 'dirty' keyspace groups?

if err != nil {
return err
Copy link
Contributor

@binshi-bing binshi-bing Apr 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could fall into the situation that a keyspace being assigned to multiple keyspace groups.

SaveKeypsaceGroups transaction could fail and return here, which leaves none or some of keyspaces being assigned with keyspace groups. Because patrolKeyspaceAssignmentOnce is still false, next split will trigger patrolKeyspaceAssignment again. For those keyspaces which have been assigned groups in the last round, because we don't persistent group assignment in keyspace meta and don't check if it's already assigned, we could pick a different keyspace group and assign this keyspace to it, which results in this keyspace being assigned to multiple groups.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually every tso restart then becoming leader could cause a keyspace being assigned to multiple keyspace groups.

}
}
m.patrolKeyspaceAssignmentOnce = true
Copy link
Contributor

@binshi-bing binshi-bing Apr 26, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do we update keyspace meta (add 'group' entry in the config field) and persistent its group assignment? Do we need to do it? There are two side effects if we don't persistent this info:

  1. From the keyspace meta, you can't query its keyspace group directly.
  2. After API leader switch, the first split needs to go through this expensive process.

return nil
}

func (m *GroupManager) allocNodesToAllKeyspaceGroups() {
defer logutil.LogPanic()
defer m.wg.Done()
Expand Down Expand Up @@ -426,11 +472,18 @@ func (m *GroupManager) GetKeyspaceConfigByKind(userKind endpoint.UserKind) (map[
}
m.RLock()
defer m.RUnlock()
return m.getKeyspaceConfigByKindLocked(userKind)
}

func (m *GroupManager) getKeyspaceConfigByKindLocked(userKind endpoint.UserKind) (map[string]string, error) {
groups, ok := m.groups[userKind]
if !ok {
return map[string]string{}, errors.Errorf("user kind %s not found", userKind)
}
kg := groups.Top()
if kg == nil {
return map[string]string{}, errors.Errorf("no keyspace group for user kind %s", userKind)
}
id := strconv.FormatUint(uint64(kg.ID), 10)
config := map[string]string{
UserKindKey: userKind.String(),
Expand All @@ -452,9 +505,13 @@ func (m *GroupManager) UpdateKeyspaceForGroup(userKind endpoint.UserKind, groupI

m.Lock()
defer m.Unlock()
kg := m.groups[userKind].Get(uint32(id))
return m.updateKeyspaceForGroupLocked(userKind, id, keyspaceID, mutation)
}

func (m *GroupManager) updateKeyspaceForGroupLocked(userKind endpoint.UserKind, groupID uint64, keyspaceID uint32, mutation int) error {
kg := m.groups[userKind].Get(uint32(groupID))
if kg == nil {
return errors.Errorf("keyspace group %d not found", id)
return errors.Errorf("keyspace group %d not found", groupID)
}
if kg.IsSplitting() {
return ErrKeyspaceGroupInSplit
Expand Down Expand Up @@ -535,11 +592,14 @@ func (m *GroupManager) UpdateKeyspaceGroup(oldGroupID, newGroupID string, oldUse
// SplitKeyspaceGroupByID splits the keyspace group by ID into a new keyspace group with the given new ID.
// And the keyspaces in the old keyspace group will be moved to the new keyspace group.
func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint32, keyspaces []uint32) error {
err := m.patrolKeyspaceAssignment()
if err != nil {
return err
}
var splitSourceKg, splitTargetKg *endpoint.KeyspaceGroup
m.Lock()
defer m.Unlock()
// TODO: avoid to split when the keyspaces is empty.
if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) {
if err = m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) {
// Load the old keyspace group first.
splitSourceKg, err = m.store.LoadKeyspaceGroup(txn, splitSourceID)
if err != nil {
Expand All @@ -564,13 +624,15 @@ func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint3
if splitTargetKg != nil {
return ErrKeyspaceGroupExists
}
keyspaceNum := len(keyspaces)
sourceKeyspaceNum := len(splitSourceKg.Keyspaces)
// Check if the keyspaces are all in the old keyspace group.
if len(keyspaces) > len(splitSourceKg.Keyspaces) {
if keyspaceNum == 0 || keyspaceNum > sourceKeyspaceNum {
return ErrKeyspaceNotInKeyspaceGroup
}
var (
oldKeyspaceMap = make(map[uint32]struct{}, len(splitSourceKg.Keyspaces))
newKeyspaceMap = make(map[uint32]struct{}, len(keyspaces))
oldKeyspaceMap = make(map[uint32]struct{}, sourceKeyspaceNum)
newKeyspaceMap = make(map[uint32]struct{}, keyspaceNum)
)
for _, keyspace := range splitSourceKg.Keyspaces {
oldKeyspaceMap[keyspace] = struct{}{}
Expand All @@ -582,7 +644,7 @@ func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint3
newKeyspaceMap[keyspace] = struct{}{}
}
// Get the split keyspace group for the old keyspace group.
splitKeyspaces := make([]uint32, 0, len(splitSourceKg.Keyspaces)-len(keyspaces))
splitKeyspaces := make([]uint32, 0, sourceKeyspaceNum-keyspaceNum)
for _, keyspace := range splitSourceKg.Keyspaces {
if _, ok := newKeyspaceMap[keyspace]; !ok {
splitKeyspaces = append(splitKeyspaces, keyspace)
Expand Down
32 changes: 32 additions & 0 deletions pkg/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"
"time"

"github.com/pingcap/kvproto/pkg/keyspacepb"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/mock/mockcluster"
Expand Down Expand Up @@ -251,6 +252,9 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() {
// split the keyspace group 1 to 4
err = suite.kgm.SplitKeyspaceGroupByID(1, 4, []uint32{333})
re.ErrorIs(err, ErrKeyspaceGroupNotEnoughReplicas)
// split the keyspace group 2 to 4 without giving any keyspace
err = suite.kgm.SplitKeyspaceGroupByID(2, 4, []uint32{})
re.ErrorIs(err, ErrKeyspaceNotInKeyspaceGroup)
// split the keyspace group 2 to 4
err = suite.kgm.SplitKeyspaceGroupByID(2, 4, []uint32{333})
re.NoError(err)
Expand Down Expand Up @@ -317,3 +321,31 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() {
err = suite.kgm.SplitKeyspaceGroupByID(2, 5, []uint32{111, 222, 444})
re.ErrorIs(err, ErrKeyspaceNotInKeyspaceGroup)
}

func (suite *keyspaceGroupTestSuite) TestPatrolKeyspaceAssignment() {
re := suite.Require()
// Force the patrol to run once.
suite.kgm.patrolKeyspaceAssignmentOnce = false
// Create a keyspace group without any keyspace.
err := suite.kgm.CreateKeyspaceGroups([]*endpoint.KeyspaceGroup{
{
ID: uint32(1),
UserKind: endpoint.Basic.String(),
Members: make([]endpoint.KeyspaceGroupMember, 2),
},
})
re.NoError(err)
// Create a keyspace without any keyspace group.
now := time.Now().Unix()
err = suite.kg.saveNewKeyspace(&keyspacepb.KeyspaceMeta{
Id: 111,
Name: "111",
State: keyspacepb.KeyspaceState_ENABLED,
CreatedAt: now,
StateChangedAt: now,
})
re.NoError(err)
// Split to see if the keyspace is attached to the group.
err = suite.kgm.SplitKeyspaceGroupByID(1, 2, []uint32{111})
re.NoError(err)
}
4 changes: 4 additions & 0 deletions tests/integrations/mcs/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TearDownTest() {

func cleanupKeyspaceGroups(re *require.Assertions, server *tests.TestServer) {
for _, group := range handlersutil.MustLoadKeyspaceGroups(re, server, "0", "0") {
// Do not delete default keyspace group.
if group.ID == mcsutils.DefaultKeyspaceGroupID {
continue
}
handlersutil.MustDeleteKeyspaceGroup(re, server, group.ID)
}
}
Expand Down