From ea4af778cc397789f0b4f7d4864bb6bf0b7e9afe Mon Sep 17 00:00:00 2001 From: Bin Shi Date: Sun, 2 Apr 2023 17:42:46 -0700 Subject: [PATCH] Fix test failure Signed-off-by: Bin Shi --- errors.toml | 4 +- pkg/tso/keyspace_group_manager.go | 18 +++++- pkg/tso/keyspace_group_manager_test.go | 77 +++++++++++++++++--------- 3 files changed, 67 insertions(+), 32 deletions(-) diff --git a/errors.toml b/errors.toml index 3f18fe3f4e44..5b9ecd0a3458 100644 --- a/errors.toml +++ b/errors.toml @@ -3,12 +3,12 @@ ["ErrLoadKeyspaceGroupsTerminated"] error = ''' -load keyspace grops terminated +load keyspace groups terminated ''' ["ErrLoadKeyspaceGroupsTimeout"] error = ''' -load keyspace grops timeout +load keyspace groups timeout ''' ["PD:ErrEncryptionKMS"] diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 2c2a24c8d7cc..ef45988db890 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -194,21 +194,33 @@ func (kgm *KeyspaceGroupManager) Initialize(loadFromStorage bool) error { // Close this KeyspaceGroupManager func (kgm *KeyspaceGroupManager) Close() { + log.Info("closing keyspace group manager") kgm.cancel() kgm.wg.Wait() - kgm.closeKeyspaceGroups() + log.Info("keyspace group manager closed") } func (kgm *KeyspaceGroupManager) closeKeyspaceGroups() { kgm.mu.Lock() defer kgm.mu.Unlock() + log.Info("closing all keyspace groups") + + wg := sync.WaitGroup{} for i := range kgm.ams { - if mgr := kgm.ams[i].Load(); mgr != nil { - mgr.close() + if am := kgm.ams[i].Load(); am != nil { + wg.Add(1) + go func(am *AllocatorManager) { + defer wg.Done() + am.close() + log.Info("keyspace group closed", zap.Uint32("keyspace-group-id", am.ksgID)) + }(am) } } + wg.Wait() + + log.Info("All keyspace groups closed") } func (kgm *KeyspaceGroupManager) checkInitProgress(ctx context.Context, cancel context.CancelFunc, done chan struct{}) { diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index 5b3b0c27f486..9f1cf83cdb43 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -17,6 +17,7 @@ package tso import ( "context" "encoding/json" + "math/rand" "path" "strings" "sync" @@ -117,44 +118,45 @@ func (suite *keyspaceGroupManagerTestSuite) TestNewKeyspaceGroupManager() { // TestLoadKeyspaceGroupsAssignment tests the loading of the keyspace group assignment. func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsAssignment() { re := suite.Require() + maxCountInUse := int(mcsutils.MaxKeyspaceGroupCountInUse) + // maxCountInUse := int(1024) // Test the loading of empty keyspace group assignment. - runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, 0, 0) + runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, 0, 0, 100) // Test the loading of single keyspace group assignment. - runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, 1, 0) + runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, 1, 0, 100) // Test the loading of multiple keyspace group assignment. - runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, 3, 0) - runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, - int(mcsutils.MaxKeyspaceGroupCountInUse-1), 0) - runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, - int(mcsutils.MaxKeyspaceGroupCountInUse), 0) + runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, 3, 0, 100) + runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, maxCountInUse-1, 0, 10) + runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, maxCountInUse, 0, 10) // Test the loading of the keyspace group assignment which exceeds the maximum // keyspace group count. In this case, the manager should only load/serve the // first MaxKeyspaceGroupCountInUse keyspace groups and ignore the rest - runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, - int(mcsutils.MaxKeyspaceGroupCountInUse+1), 0) + runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, maxCountInUse+1, 0, 10) } // TestLoadWithDifferentBatchSize tests the loading of the keyspace group assignment with the different batch size. func (suite *keyspaceGroupManagerTestSuite) TestLoadWithDifferentBatchSize() { re := suite.Require() - maxCount := mcsutils.MaxKeyspaceGroupCountInUse + batchSize := int64(17) + maxCount := uint32(1024) params := []struct { - batchSize int64 - count int + batchSize int64 + count int + probabilityAssignToMe int // percentage of assigning keyspace groups to this host/pod }{ - {batchSize: 1, count: 1}, - {batchSize: 2, count: int(maxCount / 10)}, - {batchSize: 7, count: int(maxCount / 10)}, - {batchSize: defaultLoadKeyspaceGroupsBatchSize, count: int(defaultLoadKeyspaceGroupsBatchSize)}, - {batchSize: int64(maxCount / 13), count: int(maxCount / 13)}, - {batchSize: int64(maxCount), count: int(maxCount / 13)}, + {batchSize: 1, count: 1, probabilityAssignToMe: 100}, + {batchSize: 2, count: int(maxCount / 10), probabilityAssignToMe: 100}, + {batchSize: 7, count: int(maxCount / 10), probabilityAssignToMe: 100}, + {batchSize: batchSize, count: int(batchSize), probabilityAssignToMe: 50}, + {batchSize: int64(maxCount / 13), count: int(maxCount / 13), probabilityAssignToMe: 50}, + {batchSize: int64(maxCount), count: int(maxCount / 13), probabilityAssignToMe: 10}, } for _, param := range params { - runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, param.count-1, param.batchSize) - runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, param.count, param.batchSize) - runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, param.count+1, param.batchSize) + runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, param.count-1, param.batchSize, param.probabilityAssignToMe) + runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, param.count, param.batchSize, param.probabilityAssignToMe) + runTestLoadKeyspaceGroupsAssignment(suite.ctx, re, suite.etcdClient, suite.cfg, param.count+1, param.batchSize, param.probabilityAssignToMe) } } @@ -163,6 +165,7 @@ func runTestLoadKeyspaceGroupsAssignment( ctx context.Context, re *require.Assertions, etcdClient *clientv3.Client, cfg *TestServiceConfig, numberOfKeypaceGroupsToAdd int, loadKeyspaceGroupsBatchSize int64, // set to 0 to use the default value + probabilityAssignToMe int, // percentage of assigning keyspace groups to this host/pod ) { ids := make(map[uint32]bool, 0) mgr := newUniqueKeyspaceGroupManager(ctx, etcdClient, cfg, 0, loadKeyspaceGroupsBatchSize) @@ -170,18 +173,27 @@ func runTestLoadKeyspaceGroupsAssignment( defer mgr.Close() const step = 30 + keyspaceGroupsAdded := sync.Map{} wg := sync.WaitGroup{} for i := 0; i < numberOfKeypaceGroupsToAdd; i += step { wg.Add(1) go func(startID int) { defer wg.Done() + endID := startID + step if endID > numberOfKeypaceGroupsToAdd { endID = numberOfKeypaceGroupsToAdd } + + randomGen := rand.New(rand.NewSource(time.Now().UnixNano())) for j := startID; j < endID; j++ { + assignToMe := false + if randomGen.Intn(100) < probabilityAssignToMe { + assignToMe = true + keyspaceGroupsAdded.Store(uint32(j), struct{}{}) + } addKeyspaceGroupAssignment( - ctx, etcdClient, + ctx, etcdClient, assignToMe, mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(j)) } }(i) @@ -193,12 +205,16 @@ func runTestLoadKeyspaceGroupsAssignment( if numberOfKeypaceGroupsToAdd > len(mgr.ams) { numberOfKeypaceGroupsToAdd = len(mgr.ams) } - for i := 0; i < numberOfKeypaceGroupsToAdd; i++ { - ids[uint32(i)] = true - } + // Set the expected result. + keyspaceGroupsAdded.Range(func(key, _ interface{}) bool { + ids[key.(uint32)] = true + return true + }) err := mgr.Initialize(true) re.NoError(err) + + // Verify the keyspace group assignment. re.True(verifyKeyspaceGroupAssignment(ids, mgr)) } @@ -226,11 +242,18 @@ func newUniqueKeyspaceGroupManager( // addKeyspaceGroupAssignment adds a keyspace group assignment to etcd. func addKeyspaceGroupAssignment( - ctx context.Context, etcdClient *clientv3.Client, rootPath, svcAddr string, id uint32, + ctx context.Context, etcdClient *clientv3.Client, + assignToMe bool, rootPath, svcAddr string, id uint32, ) error { + var location string + if assignToMe { + location = svcAddr + } else { + location = uuid.NewString() + } group := &endpoint.KeyspaceGroup{ ID: id, - Members: []endpoint.KeyspaceGroupMember{{Location: svcAddr}}, + Members: []endpoint.KeyspaceGroupMember{{Location: location}}, Keyspaces: []uint32{id}, }