diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 8064c54bf39..529e9bcdd37 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -61,6 +61,7 @@ const ( // of the primaries on this TSO server/pod have changed. A goroutine will periodically check // do this check and re-distribute the primaries if necessary. defaultPrimaryPriorityCheckInterval = 10 * time.Second + groupPatrolInterval = time.Minute ) type state struct { @@ -74,13 +75,16 @@ type state struct { kgs [mcsutils.MaxKeyspaceGroupCountInUse]*endpoint.KeyspaceGroup // keyspaceLookupTable is a map from keyspace to the keyspace group to which it belongs. keyspaceLookupTable map[uint32]uint32 + // splittingGroups is the cache of splitting keyspace group related information. + splittingGroups map[uint32]struct{} } func (s *state) initialize() { s.keyspaceLookupTable = make(map[uint32]uint32) + s.splittingGroups = make(map[uint32]struct{}) } -func (s *state) deinitialize() { +func (s *state) deInitialize() { log.Info("closing all keyspace groups") s.Lock() @@ -398,8 +402,9 @@ func (kgm *KeyspaceGroupManager) Initialize() error { return errs.ErrLoadKeyspaceGroupsTerminated.Wrap(err) } - kgm.wg.Add(1) + kgm.wg.Add(2) go kgm.primaryPriorityCheckLoop() + go kgm.groupSplitPatroller() return nil } @@ -415,7 +420,7 @@ func (kgm *KeyspaceGroupManager) Close() { // added/initialized after that. kgm.cancel() kgm.wg.Wait() - kgm.state.deinitialize() + kgm.state.deInitialize() log.Info("keyspace group manager closed") } @@ -732,6 +737,10 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro } kgm.kgs[group.ID] = group kgm.ams[group.ID] = am + // If the group is the split target, add it to the splitting group map. + if group.IsSplitTarget() { + kgm.splittingGroups[group.ID] = struct{}{} + } kgm.Unlock() } @@ -859,6 +868,7 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroupMembership( // Check if the split is completed. if oldGroup != nil && oldGroup.IsSplitTarget() && !newGroup.IsSplitting() { kgm.ams[groupID].GetMember().(*member.Participant).SetCampaignChecker(nil) + delete(kgm.splittingGroups, groupID) } kgm.kgs[groupID] = newGroup } @@ -1322,3 +1332,58 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget return } } + +// groupSplitPatroller is used to patrol the groups that are in the on-going +// split state and to check if we could speed up the split process. +func (kgm *KeyspaceGroupManager) groupSplitPatroller() { + defer kgm.wg.Done() + patrolInterval := groupPatrolInterval + failpoint.Inject("fastGroupSplitPatroller", func() { + patrolInterval = 200 * time.Millisecond + }) + ticker := time.NewTicker(patrolInterval) + defer ticker.Stop() + log.Info("group split patroller is started", + zap.Duration("patrol-interval", patrolInterval)) + for { + select { + case <-kgm.ctx.Done(): + log.Info("group split patroller is exiting") + return + case <-ticker.C: + } + kgm.RLock() + if len(kgm.splittingGroups) == 0 { + kgm.RUnlock() + continue + } + var splittingGroups []uint32 + for id := range kgm.splittingGroups { + splittingGroups = append(splittingGroups, id) + } + kgm.RUnlock() + for _, groupID := range splittingGroups { + am, group := kgm.getKeyspaceGroupMeta(groupID) + if !am.IsLeader() { + continue + } + if len(group.Keyspaces) == 0 { + log.Warn("abnormal keyspace group with empty keyspace list", + zap.Uint32("keyspace-group-id", groupID)) + continue + } + log.Info("request tso for the splitting keyspace group", + zap.Uint32("keyspace-group-id", groupID), + zap.Uint32("keyspace-id", group.Keyspaces[0])) + // Request the TSO manually to speed up the split process. + _, _, err := kgm.HandleTSORequest(group.Keyspaces[0], groupID, GlobalDCLocation, 1) + if err != nil { + log.Warn("failed to request tso for the splitting keyspace group", + zap.Uint32("keyspace-group-id", groupID), + zap.Uint32("keyspace-id", group.Keyspaces[0]), + zap.Error(err)) + continue + } + } + } +} diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index 98c6b90ca28..a20eb33fb81 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -63,6 +63,7 @@ func TestTSOKeyspaceGroupManager(t *testing.T) { func (suite *tsoKeyspaceGroupManagerTestSuite) SetupSuite() { re := suite.Require() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller", `return(true)`)) var err error suite.ctx, suite.cancel = context.WithCancel(context.Background()) @@ -81,6 +82,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TearDownSuite() { suite.cancel() suite.tsoCluster.Destroy() suite.cluster.Destroy() + suite.Require().NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastGroupSplitPatroller")) } func (suite *tsoKeyspaceGroupManagerTestSuite) TearDownTest() { @@ -276,17 +278,15 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() { NewID: 2, Keyspaces: []uint32{222, 333}, }) - kg2 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 2) - re.Equal(uint32(2), kg2.ID) - re.Equal([]uint32{222, 333}, kg2.Keyspaces) - re.True(kg2.IsSplitTarget()) - // Check the split TSO from keyspace group 2. - var splitTS pdpb.Timestamp + // Wait for the split to complete automatically even there is no TSO request from the outside. testutil.Eventually(re, func() bool { - splitTS, err = suite.requestTSO(re, 222, 2) - return err == nil && tsoutil.CompareTimestamp(&splitTS, &pdpb.Timestamp{}) > 0 + kg2 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 2) + re.Equal(uint32(2), kg2.ID) + re.Equal([]uint32{222, 333}, kg2.Keyspaces) + return !kg2.IsSplitting() }) - splitTS, err = suite.requestTSO(re, 222, 2) + // Check the split TSO from keyspace group 2 now. + splitTS, err := suite.requestTSO(re, 222, 2) re.NoError(err) re.Greater(tsoutil.CompareTimestamp(&splitTS, &ts), 0) } @@ -356,8 +356,6 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitElection return len(member1.GetLeaderListenUrls()) > 0 && len(member2.GetLeaderListenUrls()) > 0 }) re.Equal(member1.GetLeaderListenUrls(), member2.GetLeaderListenUrls()) - // Finish the split. - handlersutil.MustFinishSplitKeyspaceGroup(re, suite.pdLeaderServer, 2) // Wait for the keyspace groups to finish the split. waitFinishSplit(re, suite.pdLeaderServer, 1, 2, []uint32{111}, []uint32{222, 333}) }