From 40866fdb50ddbe7d12d5dbdfdfa0635f4a5e1700 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 26 Jun 2023 15:12:42 +0800 Subject: [PATCH] Pick some tso mcs changes (#122) * Revert "mcs: pick some priority about member of keyspace group (#120)" This reverts commit e962b881459300fa44e7b569bbdb463829691300. * keyspace, apiv2: implement the keyspace group merging API (#6594) ref tikv/pd#6589 Implement the keyspace group merging API. Signed-off-by: JmPotato Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> * keyspace: prohibit merging the default keyspace group (#6606) ref tikv/pd#6589 Prohibit merging the default keyspace group. Signed-off-by: JmPotato * keyspace: add priority of tso node for the keyspace group (#6602) ref tikv/pd#6599 Signed-off-by: lhy1024 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> * tests: reduce unnecessary time.sleep in keyspace group (#6632) ref tikv/pd#6599 Signed-off-by: lhy1024 Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> * tools: add merge commands for pd-ctl (#6675) ref tikv/pd#6589 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> * mcs, tso: fix expensive async forwardTSORequest() and its timeout mechanism. (#6664) ref tikv/pd#6659 Fix expensive async forwardTSORequest() and its timeout mechanism. In order to handle the timeout case for forwardStream send/recv, the existing logic is to create context.withTimeout(forwardCtx,...) for every request, then start a new goroutine "forwardTSORequest", which is very expensive as shown by the profiling in #6659. This change create a watchDeadline routine per forward stream and reuse it for all the forward requests in which forwardTSORequest is called synchronously. Compared to the existing logic, the new change is much cheaper and the latency is much stable. Signed-off-by: Bin Shi * keyspace, apiv2: support to split keyspace group with the keyspace ID range (#6646) ref tikv/pd#6232 Support to split keyspace group with the keyspace ID range. Signed-off-by: JmPotato Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> * mcs: add log for finishing split (#6656) ref tikv/pd#5895 Signed-off-by: Ryan Leung Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> * tso: fix checkTSOSplit to finish split correctly (#6652) ref tikv/pd#6232 Fix `checkTSOSplit` to finish split correctly. Signed-off-by: JmPotato Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> * tests: fix TestTSOKeyspaceGroupSplitClient to avoid unexpected panic (#6655) close tikv/pd#6634 Fix `TestTSOKeyspaceGroupSplitClient` to avoid unexpected panic Signed-off-by: JmPotato * tso, tests: implement the keyspace group merge checker (#6625) ref tikv/pd#6589 Implement the keyspace group merge checker. Signed-off-by: JmPotato * mcs, tso: support weighted-election for TSO keyspace group primary election (#6617) close tikv/pd#6616 Add the tso server registry watch loop in tso's keyspace group manager. re-distribute TSO keyspace group primaries according to their replica priorities Signed-off-by: Bin Shi * Add keyspace group info in the timestamp fallback log in the client. (#6654) ref tikv/pd#5895 Add keyspace group info in the timestamp fallback log in the client. Signed-off-by: Bin Shi --------- Signed-off-by: JmPotato Signed-off-by: Bin Shi Co-authored-by: JmPotato Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> Co-authored-by: lhy1024 Co-authored-by: Bin Shi <39923490+binshi-bing@users.noreply.github.com> --- client/tso_client.go | 4 +- client/tso_dispatcher.go | 34 +- client/tso_stream.go | 8 +- errors.toml | 5 + pkg/errs/errno.go | 1 + pkg/keyspace/keyspace.go | 44 +- pkg/keyspace/keyspace_test.go | 55 +- pkg/keyspace/tso_keyspace_group.go | 248 +++++++-- pkg/keyspace/tso_keyspace_group_test.go | 198 ++++++- pkg/keyspace/util.go | 14 +- pkg/mcs/discovery/discover.go | 2 +- pkg/mcs/discovery/key_path.go | 8 +- pkg/mcs/discovery/register.go | 2 +- pkg/mcs/tso/server/server.go | 3 +- pkg/mcs/utils/constant.go | 3 + pkg/storage/endpoint/tso_keyspace_group.go | 24 + pkg/tso/allocator_manager.go | 13 + pkg/tso/global_allocator.go | 14 +- pkg/tso/keyspace_group_manager.go | 484 ++++++++++++++++-- pkg/tso/keyspace_group_manager_test.go | 325 +++++++++--- pkg/tso/local_allocator.go | 1 - pkg/tso/tso.go | 3 +- pkg/utils/tsoutil/tso_dispatcher.go | 33 +- server/apiv2/handlers/tso_keyspace_group.go | 82 ++- server/grpc_service.go | 122 +++-- tests/integrations/mcs/cluster.go | 3 +- .../mcs/tso/keyspace_group_manager_test.go | 217 ++++++-- tests/pdctl/keyspace/keyspace_group_test.go | 73 +++ tests/server/apiv2/handlers/testutil.go | 17 +- .../pdctl/command/keyspace_group_command.go | 92 ++++ 30 files changed, 1829 insertions(+), 303 deletions(-) diff --git a/client/tso_client.go b/client/tso_client.go index d4dfaa03a91..e0b0579bbfc 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -209,7 +209,7 @@ func (c *tsoClient) updateTSOLocalServAddrs(allocatorMap map[string]string) erro return err } c.tsoAllocators.Store(dcLocation, addr) - log.Info("[tso] switch dc tso allocator serving address", + log.Info("[tso] switch dc tso local allocator serving address", zap.String("dc-location", dcLocation), zap.String("new-address", addr), zap.String("old-address", oldAddr)) @@ -227,7 +227,7 @@ func (c *tsoClient) updateTSOLocalServAddrs(allocatorMap map[string]string) erro func (c *tsoClient) updateTSOGlobalServAddr(addr string) error { c.tsoAllocators.Store(globalDCLocation, addr) - log.Info("[tso] switch dc tso allocator serving address", + log.Info("[tso] switch dc tso global allocator serving address", zap.String("dc-location", globalDCLocation), zap.String("new-address", addr)) c.scheduleCheckTSODispatcher() diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 30e7e670e09..c1e94f0f230 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -41,8 +41,9 @@ type tsoDispatcher struct { } type lastTSO struct { - physical int64 - logical int64 + keyspaceGroupID uint32 + physical int64 + logical int64 } const ( @@ -708,7 +709,7 @@ func (c *tsoClient) processRequests( requests := tbc.getCollectedRequests() count := int64(len(requests)) - physical, logical, suffixBits, err := stream.processRequests( + respKeyspaceGroupID, physical, logical, suffixBits, err := stream.processRequests( c.svcDiscovery.GetClusterID(), c.svcDiscovery.GetKeyspaceID(), c.svcDiscovery.GetKeyspaceGroupID(), dcLocation, requests, tbc.batchStartTime) if err != nil { @@ -717,15 +718,19 @@ func (c *tsoClient) processRequests( } // `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request. firstLogical := tsoutil.AddLogical(logical, -count+1, suffixBits) - c.compareAndSwapTS(dcLocation, physical, firstLogical, suffixBits, count) + c.compareAndSwapTS(dcLocation, respKeyspaceGroupID, physical, firstLogical, suffixBits, count) c.finishRequest(requests, physical, firstLogical, suffixBits, nil) return nil } -func (c *tsoClient) compareAndSwapTS(dcLocation string, physical, firstLogical int64, suffixBits uint32, count int64) { +func (c *tsoClient) compareAndSwapTS( + dcLocation string, respKeyspaceGroupID uint32, + physical, firstLogical int64, suffixBits uint32, count int64, +) { largestLogical := tsoutil.AddLogical(firstLogical, count-1, suffixBits) lastTSOInterface, loaded := c.lastTSMap.LoadOrStore(dcLocation, &lastTSO{ - physical: physical, + keyspaceGroupID: respKeyspaceGroupID, + physical: physical, // Save the largest logical part here logical: largestLogical, }) @@ -733,17 +738,30 @@ func (c *tsoClient) compareAndSwapTS(dcLocation string, physical, firstLogical i return } lastTSOPointer := lastTSOInterface.(*lastTSO) + lastKeyspaceGroupID := lastTSOPointer.keyspaceGroupID lastPhysical := lastTSOPointer.physical lastLogical := lastTSOPointer.logical + + if lastKeyspaceGroupID != respKeyspaceGroupID { + log.Info("[tso] keyspace group changed", + zap.String("dc-location", dcLocation), + zap.Uint32("old-group-id", lastKeyspaceGroupID), + zap.Uint32("new-group-id", respKeyspaceGroupID)) + } + // The TSO we get is a range like [largestLogical-count+1, largestLogical], so we save the last TSO's largest logical // to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then // all TSOs we get will be [6, 7, 8, 9, 10]. if tsoutil.TSLessEqual(physical, firstLogical, lastPhysical, lastLogical) { panic(errors.Errorf( - "%s timestamp fallback, new ts (%d, %d) <= the last one (%d, %d). keyspace: %d, keyspace group: %d", + "%s timestamp fallback, new ts (%d, %d) <= the last one (%d, %d). "+ + "last keyspace group: %d, keyspace in request: %d, "+ + "keyspace group in request: %d, keyspace group in response: %d", dcLocation, physical, firstLogical, lastPhysical, lastLogical, - c.svcDiscovery.GetKeyspaceID(), c.svcDiscovery.GetKeyspaceGroupID())) + lastKeyspaceGroupID, c.svcDiscovery.GetKeyspaceID(), + c.svcDiscovery.GetKeyspaceGroupID(), respKeyspaceGroupID)) } + lastTSOPointer.keyspaceGroupID = respKeyspaceGroupID lastTSOPointer.physical = physical // Same as above, we save the largest logical part here. lastTSOPointer.logical = largestLogical diff --git a/client/tso_stream.go b/client/tso_stream.go index 5b658279cac..aaabbb1712e 100644 --- a/client/tso_stream.go +++ b/client/tso_stream.go @@ -102,7 +102,7 @@ type tsoStream interface { processRequests( clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, requests []*tsoRequest, batchStartTime time.Time, - ) (physical, logical int64, suffixBits uint32, err error) + ) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) } type pdTSOStream struct { @@ -111,7 +111,7 @@ type pdTSOStream struct { func (s *pdTSOStream) processRequests( clusterID uint64, _, _ uint32, dcLocation string, requests []*tsoRequest, batchStartTime time.Time, -) (physical, logical int64, suffixBits uint32, err error) { +) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) { start := time.Now() count := int64(len(requests)) req := &pdpb.TsoRequest{ @@ -149,6 +149,7 @@ func (s *pdTSOStream) processRequests( } ts := resp.GetTimestamp() + respKeyspaceGroupID = defaultKeySpaceGroupID physical, logical, suffixBits = ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits() return } @@ -160,7 +161,7 @@ type tsoTSOStream struct { func (s *tsoTSOStream) processRequests( clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, requests []*tsoRequest, batchStartTime time.Time, -) (physical, logical int64, suffixBits uint32, err error) { +) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) { start := time.Now() count := int64(len(requests)) req := &tsopb.TsoRequest{ @@ -200,6 +201,7 @@ func (s *tsoTSOStream) processRequests( } ts := resp.GetTimestamp() + respKeyspaceGroupID = resp.GetHeader().GetKeyspaceGroupId() physical, logical, suffixBits = ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits() return } diff --git a/errors.toml b/errors.toml index ee1b267c8d8..5af3ea47de8 100644 --- a/errors.toml +++ b/errors.toml @@ -741,6 +741,11 @@ error = ''' the keyspace group id is invalid, %s ''' +["PD:tso:ErrKeyspaceGroupIsMerging"] +error = ''' +the keyspace group %d is merging +''' + ["PD:tso:ErrKeyspaceGroupNotInitialized"] error = ''' the keyspace group %d isn't initialized diff --git a/pkg/errs/errno.go b/pkg/errs/errno.go index 0112d69645b..eb6ae4a8f7f 100644 --- a/pkg/errs/errno.go +++ b/pkg/errs/errno.go @@ -55,6 +55,7 @@ var ( ErrKeyspaceGroupNotInitialized = errors.Normalize("the keyspace group %d isn't initialized", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupNotInitialized")) ErrKeyspaceNotAssigned = errors.Normalize("the keyspace %d isn't assigned to any keyspace group", errors.RFCCodeText("PD:tso:ErrKeyspaceNotAssigned")) ErrGetMinTS = errors.Normalize("get min ts failed, %s", errors.RFCCodeText("PD:tso:ErrGetMinTS")) + ErrKeyspaceGroupIsMerging = errors.Normalize("the keyspace group %d is merging", errors.RFCCodeText("PD:tso:ErrKeyspaceGroupIsMerging")) ) // member errors diff --git a/pkg/keyspace/keyspace.go b/pkg/keyspace/keyspace.go index 62e741083cf..859e6b82486 100644 --- a/pkg/keyspace/keyspace.go +++ b/pkg/keyspace/keyspace.go @@ -49,10 +49,10 @@ const ( UserKindKey = "user_kind" // TSOKeyspaceGroupIDKey is the key for tso keyspace group id in keyspace config. TSOKeyspaceGroupIDKey = "tso_keyspace_group_id" - // keyspacePatrolBatchSize is the batch size for keyspace assignment patrol. - // the limit of etcd txn op is 128, keyspacePatrolBatchSize need to be less than it. + // maxEtcdTxnOps is the max value of operations in an etcd txn. The default limit of etcd txn op is 128. + // We use 120 here to leave some space for other operations. // See: https://github.com/etcd-io/etcd/blob/d3e43d4de6f6d9575b489dd7850a85e37e0f6b6c/server/embed/config.go#L61 - keyspacePatrolBatchSize = 120 + maxEtcdTxnOps = 120 ) // Config is the interface for keyspace config. @@ -652,7 +652,16 @@ func (manager *Manager) allocID() (uint32, error) { } // PatrolKeyspaceAssignment is used to patrol all keyspaces and assign them to the keyspace groups. -func (manager *Manager) PatrolKeyspaceAssignment() error { +func (manager *Manager) PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID uint32) error { + if startKeyspaceID > manager.nextPatrolStartID { + manager.nextPatrolStartID = startKeyspaceID + } + if endKeyspaceID != 0 && endKeyspaceID < manager.nextPatrolStartID { + log.Info("[keyspace] end keyspace id is smaller than the next patrol start id, skip patrol", + zap.Uint32("end-keyspace-id", endKeyspaceID), + zap.Uint32("next-patrol-start-id", manager.nextPatrolStartID)) + return nil + } var ( // Some statistics info. start = time.Now() @@ -670,7 +679,9 @@ func (manager *Manager) PatrolKeyspaceAssignment() error { zap.Duration("cost", time.Since(start)), zap.Uint64("patrolled-keyspace-count", patrolledKeyspaceCount), zap.Uint64("assigned-keyspace-count", assignedKeyspaceCount), - zap.Int("batch-size", keyspacePatrolBatchSize), + zap.Int("batch-size", maxEtcdTxnOps), + zap.Uint32("start-keyspace-id", startKeyspaceID), + zap.Uint32("end-keyspace-id", endKeyspaceID), zap.Uint32("current-start-id", currentStartID), zap.Uint32("next-start-id", nextStartID), ) @@ -689,7 +700,10 @@ func (manager *Manager) PatrolKeyspaceAssignment() error { if defaultKeyspaceGroup.IsSplitting() { return ErrKeyspaceGroupInSplit } - keyspaces, err := manager.store.LoadRangeKeyspace(txn, manager.nextPatrolStartID, keyspacePatrolBatchSize) + if defaultKeyspaceGroup.IsMerging() { + return ErrKeyspaceGroupInMerging + } + keyspaces, err := manager.store.LoadRangeKeyspace(txn, manager.nextPatrolStartID, maxEtcdTxnOps) if err != nil { return err } @@ -699,9 +713,9 @@ func (manager *Manager) PatrolKeyspaceAssignment() error { currentStartID = keyspaces[0].GetId() nextStartID = keyspaces[keyspaceNum-1].GetId() + 1 } - // If there are less than `keyspacePatrolBatchSize` keyspaces, - // we have reached the end of the keyspace list. - moreToPatrol = keyspaceNum == keyspacePatrolBatchSize + // If there are less than `maxEtcdTxnOps` keyspaces or the next start ID reaches the end, + // there is no need to patrol again. + moreToPatrol = keyspaceNum == maxEtcdTxnOps var ( assigned = false keyspaceIDsToUnlock = make([]uint32, 0, keyspaceNum) @@ -715,6 +729,10 @@ func (manager *Manager) PatrolKeyspaceAssignment() error { if ks == nil { continue } + if endKeyspaceID != 0 && ks.Id > endKeyspaceID { + moreToPatrol = false + break + } patrolledKeyspaceCount++ manager.metaLock.Lock(ks.Id) if ks.Config == nil { @@ -736,7 +754,9 @@ func (manager *Manager) PatrolKeyspaceAssignment() error { err = manager.store.SaveKeyspaceMeta(txn, ks) if err != nil { log.Error("[keyspace] failed to save keyspace meta during patrol", - zap.Int("batch-size", keyspacePatrolBatchSize), + zap.Int("batch-size", maxEtcdTxnOps), + zap.Uint32("start-keyspace-id", startKeyspaceID), + zap.Uint32("end-keyspace-id", endKeyspaceID), zap.Uint32("current-start-id", currentStartID), zap.Uint32("next-start-id", nextStartID), zap.Uint32("keyspace-id", ks.Id), zap.Error(err)) @@ -748,7 +768,9 @@ func (manager *Manager) PatrolKeyspaceAssignment() error { err = manager.kgm.store.SaveKeyspaceGroup(txn, defaultKeyspaceGroup) if err != nil { log.Error("[keyspace] failed to save default keyspace group meta during patrol", - zap.Int("batch-size", keyspacePatrolBatchSize), + zap.Int("batch-size", maxEtcdTxnOps), + zap.Uint32("start-keyspace-id", startKeyspaceID), + zap.Uint32("end-keyspace-id", endKeyspaceID), zap.Uint32("current-start-id", currentStartID), zap.Uint32("next-start-id", nextStartID), zap.Error(err)) return err diff --git a/pkg/keyspace/keyspace_test.go b/pkg/keyspace/keyspace_test.go index dadc2a2509f..b06921e48db 100644 --- a/pkg/keyspace/keyspace_test.go +++ b/pkg/keyspace/keyspace_test.go @@ -393,7 +393,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignment() { re.NotNil(defaultKeyspaceGroup) re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(111)) // Patrol the keyspace assignment. - err = suite.manager.PatrolKeyspaceAssignment() + err = suite.manager.PatrolKeyspaceAssignment(0, 0) re.NoError(err) // Check if the keyspace is attached to the default group. defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID) @@ -405,7 +405,7 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignment() { func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() { re := suite.Require() // Create some keyspaces without any keyspace group. - for i := 1; i < keyspacePatrolBatchSize*2+1; i++ { + for i := 1; i < maxEtcdTxnOps*2+1; i++ { now := time.Now().Unix() err := suite.manager.saveNewKeyspace(&keyspacepb.KeyspaceMeta{ Id: uint32(i), @@ -420,21 +420,64 @@ func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentInBatch() { defaultKeyspaceGroup, err := suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID) re.NoError(err) re.NotNil(defaultKeyspaceGroup) - for i := 1; i < keyspacePatrolBatchSize*2+1; i++ { + for i := 1; i < maxEtcdTxnOps*2+1; i++ { re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(i)) } // Patrol the keyspace assignment. - err = suite.manager.PatrolKeyspaceAssignment() + err = suite.manager.PatrolKeyspaceAssignment(0, 0) re.NoError(err) // Check if all the keyspaces are attached to the default group. defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID) re.NoError(err) re.NotNil(defaultKeyspaceGroup) - for i := 1; i < keyspacePatrolBatchSize*2+1; i++ { + for i := 1; i < maxEtcdTxnOps*2+1; i++ { re.Contains(defaultKeyspaceGroup.Keyspaces, uint32(i)) } } +func (suite *keyspaceTestSuite) TestPatrolKeyspaceAssignmentWithRange() { + re := suite.Require() + // Create some keyspaces without any keyspace group. + for i := 1; i < maxEtcdTxnOps*2+1; i++ { + now := time.Now().Unix() + err := suite.manager.saveNewKeyspace(&keyspacepb.KeyspaceMeta{ + Id: uint32(i), + Name: strconv.Itoa(i), + State: keyspacepb.KeyspaceState_ENABLED, + CreatedAt: now, + StateChangedAt: now, + }) + re.NoError(err) + } + // Check if all the keyspaces are not attached to the default group. + defaultKeyspaceGroup, err := suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID) + re.NoError(err) + re.NotNil(defaultKeyspaceGroup) + for i := 1; i < maxEtcdTxnOps*2+1; i++ { + re.NotContains(defaultKeyspaceGroup.Keyspaces, uint32(i)) + } + // Patrol the keyspace assignment with range [maxEtcdTxnOps/2, maxEtcdTxnOps/2+maxEtcdTxnOps+1] + // to make sure the range crossing the boundary of etcd transaction operation limit. + var ( + startKeyspaceID = uint32(maxEtcdTxnOps / 2) + endKeyspaceID = startKeyspaceID + maxEtcdTxnOps + 1 + ) + err = suite.manager.PatrolKeyspaceAssignment(startKeyspaceID, endKeyspaceID) + re.NoError(err) + // Check if only the keyspaces within the range are attached to the default group. + defaultKeyspaceGroup, err = suite.manager.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID) + re.NoError(err) + re.NotNil(defaultKeyspaceGroup) + for i := 1; i < maxEtcdTxnOps*2+1; i++ { + keyspaceID := uint32(i) + if keyspaceID >= startKeyspaceID && keyspaceID <= endKeyspaceID { + re.Contains(defaultKeyspaceGroup.Keyspaces, keyspaceID) + } else { + re.NotContains(defaultKeyspaceGroup.Keyspaces, keyspaceID) + } + } +} + // Benchmark the keyspace assignment patrol. func BenchmarkPatrolKeyspaceAssignment1000(b *testing.B) { benchmarkPatrolKeyspaceAssignmentN(1000, b) @@ -471,7 +514,7 @@ func benchmarkPatrolKeyspaceAssignmentN( // Benchmark the keyspace assignment patrol. b.ResetTimer() for i := 0; i < b.N; i++ { - err := suite.manager.PatrolKeyspaceAssignment() + err := suite.manager.PatrolKeyspaceAssignment(0, 0) re.NoError(err) } b.StopTimer() diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index d05e916e0f0..fe91443bb95 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -17,6 +17,7 @@ package keyspace import ( "context" "encoding/json" + "sort" "strconv" "strings" "sync" @@ -340,6 +341,9 @@ func (m *GroupManager) saveKeyspaceGroups(keyspaceGroups []*endpoint.KeyspaceGro if oldKG.IsSplitting() && overwrite { return ErrKeyspaceGroupInSplit } + if oldKG.IsMerging() && overwrite { + return ErrKeyspaceGroupInMerging + } newKG := &endpoint.KeyspaceGroup{ ID: keyspaceGroup.ID, UserKind: keyspaceGroup.UserKind, @@ -415,6 +419,9 @@ func (m *GroupManager) updateKeyspaceForGroupLocked(userKind endpoint.UserKind, if kg.IsSplitting() { return ErrKeyspaceGroupInSplit } + if kg.IsMerging() { + return ErrKeyspaceGroupInMerging + } changed := false @@ -469,6 +476,9 @@ func (m *GroupManager) UpdateKeyspaceGroup(oldGroupID, newGroupID string, oldUse if oldKG.IsSplitting() || newKG.IsSplitting() { return ErrKeyspaceGroupInSplit } + if oldKG.IsMerging() || newKG.IsMerging() { + return ErrKeyspaceGroupInMerging + } var updateOld, updateNew bool if !slice.Contains(newKG.Keyspaces, keyspaceID) { @@ -499,7 +509,10 @@ func (m *GroupManager) UpdateKeyspaceGroup(oldGroupID, newGroupID string, oldUse // SplitKeyspaceGroupByID splits the keyspace group by ID into a new keyspace group with the given new ID. // And the keyspaces in the old keyspace group will be moved to the new keyspace group. -func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint32, keyspaces []uint32) error { +func (m *GroupManager) SplitKeyspaceGroupByID( + splitSourceID, splitTargetID uint32, + keyspaces []uint32, keyspaceIDRange ...uint32, +) error { var splitSourceKg, splitTargetKg *endpoint.KeyspaceGroup m.Lock() defer m.Unlock() @@ -516,6 +529,10 @@ func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint3 if splitSourceKg.IsSplitting() { return ErrKeyspaceGroupInSplit } + // A keyspace group can not be split when it is in merging. + if splitSourceKg.IsMerging() { + return ErrKeyspaceGroupInMerging + } // Check if the source keyspace group has enough replicas. if len(splitSourceKg.Members) < utils.DefaultKeyspaceGroupReplicaCount { return ErrKeyspaceGroupNotEnoughReplicas @@ -528,34 +545,17 @@ func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint3 if splitTargetKg != nil { return ErrKeyspaceGroupExists } - keyspaceNum := len(keyspaces) - sourceKeyspaceNum := len(splitSourceKg.Keyspaces) - // Check if the keyspaces are all in the old keyspace group. - if keyspaceNum == 0 || keyspaceNum > sourceKeyspaceNum { - return ErrKeyspaceNotInKeyspaceGroup + var startKeyspaceID, endKeyspaceID uint32 + if len(keyspaceIDRange) >= 2 { + startKeyspaceID, endKeyspaceID = keyspaceIDRange[0], keyspaceIDRange[1] } - var ( - oldKeyspaceMap = make(map[uint32]struct{}, sourceKeyspaceNum) - newKeyspaceMap = make(map[uint32]struct{}, keyspaceNum) - ) - for _, keyspace := range splitSourceKg.Keyspaces { - oldKeyspaceMap[keyspace] = struct{}{} - } - for _, keyspace := range keyspaces { - if _, ok := oldKeyspaceMap[keyspace]; !ok { - return ErrKeyspaceNotInKeyspaceGroup - } - newKeyspaceMap[keyspace] = struct{}{} - } - // Get the split keyspace group for the old keyspace group. - splitKeyspaces := make([]uint32, 0, sourceKeyspaceNum-keyspaceNum) - for _, keyspace := range splitSourceKg.Keyspaces { - if _, ok := newKeyspaceMap[keyspace]; !ok { - splitKeyspaces = append(splitKeyspaces, keyspace) - } + splitSourceKeyspaces, splitTargetKeyspaces, err := buildSplitKeyspaces( + splitSourceKg.Keyspaces, keyspaces, startKeyspaceID, endKeyspaceID) + if err != nil { + return err } // Update the old keyspace group. - splitSourceKg.Keyspaces = splitKeyspaces + splitSourceKg.Keyspaces = splitSourceKeyspaces splitSourceKg.SplitState = &endpoint.SplitState{ SplitSource: splitSourceKg.ID, } @@ -567,7 +567,7 @@ func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint3 // Keep the same user kind and members as the old keyspace group. UserKind: splitSourceKg.UserKind, Members: splitSourceKg.Members, - Keyspaces: keyspaces, + Keyspaces: splitTargetKeyspaces, SplitState: &endpoint.SplitState{ SplitSource: splitSourceKg.ID, }, @@ -583,6 +583,64 @@ func (m *GroupManager) SplitKeyspaceGroupByID(splitSourceID, splitTargetID uint3 return nil } +func buildSplitKeyspaces( + // `old` is the original keyspace list which will be split out, + // `new` is the keyspace list which will be split from the old keyspace list. + old, new []uint32, + startKeyspaceID, endKeyspaceID uint32, +) ([]uint32, []uint32, error) { + oldNum, newNum := len(old), len(new) + // Split according to the new keyspace list. + if newNum != 0 { + if newNum > oldNum { + return nil, nil, ErrKeyspaceNotInKeyspaceGroup + } + var ( + oldKeyspaceMap = make(map[uint32]struct{}, oldNum) + newKeyspaceMap = make(map[uint32]struct{}, newNum) + ) + for _, keyspace := range old { + oldKeyspaceMap[keyspace] = struct{}{} + } + for _, keyspace := range new { + if _, ok := oldKeyspaceMap[keyspace]; !ok { + return nil, nil, ErrKeyspaceNotInKeyspaceGroup + } + newKeyspaceMap[keyspace] = struct{}{} + } + // Get the split keyspace list for the old keyspace group. + oldSplit := make([]uint32, 0, oldNum-newNum) + for _, keyspace := range old { + if _, ok := newKeyspaceMap[keyspace]; !ok { + oldSplit = append(oldSplit, keyspace) + } + } + return oldSplit, new, nil + } + // Split according to the start and end keyspace ID. + if startKeyspaceID == 0 && endKeyspaceID == 0 { + return nil, nil, ErrKeyspaceNotInKeyspaceGroup + } + var ( + newSplit = make([]uint32, 0, oldNum) + newKeyspaceMap = make(map[uint32]struct{}, newNum) + ) + for _, keyspace := range old { + if startKeyspaceID <= keyspace && keyspace <= endKeyspaceID { + newSplit = append(newSplit, keyspace) + newKeyspaceMap[keyspace] = struct{}{} + } + } + // Get the split keyspace list for the old keyspace group. + oldSplit := make([]uint32, 0, oldNum-len(newSplit)) + for _, keyspace := range old { + if _, ok := newKeyspaceMap[keyspace]; !ok { + oldSplit = append(oldSplit, keyspace) + } + } + return oldSplit, newSplit, nil +} + // FinishSplitKeyspaceByID finishes the split keyspace group by the split target ID. func (m *GroupManager) FinishSplitKeyspaceByID(splitTargetID uint32) error { var splitTargetKg, splitSourceKg *endpoint.KeyspaceGroup @@ -618,17 +676,14 @@ func (m *GroupManager) FinishSplitKeyspaceByID(splitTargetID uint32) error { if err != nil { return err } - err = m.store.SaveKeyspaceGroup(txn, splitSourceKg) - if err != nil { - return err - } - return nil + return m.store.SaveKeyspaceGroup(txn, splitSourceKg) }); err != nil { return err } // Update the keyspace group cache. m.groups[endpoint.StringUserKind(splitTargetKg.UserKind)].Put(splitTargetKg) m.groups[endpoint.StringUserKind(splitSourceKg.UserKind)].Put(splitSourceKg) + log.Info("finish split keyspace group", zap.Uint32("split-source-id", splitSourceKg.ID), zap.Uint32("split-target-id", splitTargetID)) return nil } @@ -663,6 +718,9 @@ func (m *GroupManager) AllocNodesForKeyspaceGroup(id uint32, desiredReplicaCount if kg.IsSplitting() { return ErrKeyspaceGroupInSplit } + if kg.IsMerging() { + return ErrKeyspaceGroupInMerging + } exists := make(map[string]struct{}) for _, member := range kg.Members { exists[member.Address] = struct{}{} @@ -722,6 +780,9 @@ func (m *GroupManager) SetNodesForKeyspaceGroup(id uint32, nodes []string) error if kg.IsSplitting() { return ErrKeyspaceGroupInSplit } + if kg.IsMerging() { + return ErrKeyspaceGroupInMerging + } members := make([]endpoint.KeyspaceGroupMember, 0, len(nodes)) for _, node := range nodes { members = append(members, endpoint.KeyspaceGroupMember{ @@ -756,6 +817,9 @@ func (m *GroupManager) SetPriorityForKeyspaceGroup(id uint32, node string, prior if kg.IsSplitting() { return ErrKeyspaceGroupInSplit } + if kg.IsMerging() { + return ErrKeyspaceGroupInMerging + } inKeyspaceGroup := false members := make([]endpoint.KeyspaceGroupMember, 0, len(kg.Members)) for _, member := range kg.Members { @@ -788,3 +852,123 @@ func (m *GroupManager) IsExistNode(addr string) bool { } return false } + +// MergeKeyspaceGroups merges the keyspace group in the list into the target keyspace group. +func (m *GroupManager) MergeKeyspaceGroups(mergeTargetID uint32, mergeList []uint32) error { + mergeListNum := len(mergeList) + if mergeListNum == 0 { + return nil + } + // The transaction below will: + // - Load and delete the keyspace groups in the merge list. + // - Load and update the target keyspace group. + // So we pre-check the number of operations to avoid exceeding the maximum number of etcd transaction. + if (mergeListNum+1)*2 > maxEtcdTxnOps { + return ErrExceedMaxEtcdTxnOps + } + if slice.Contains(mergeList, utils.DefaultKeyspaceGroupID) { + return ErrModifyDefaultKeyspaceGroup + } + var ( + groups = make(map[uint32]*endpoint.KeyspaceGroup, mergeListNum+1) + mergeTargetKg *endpoint.KeyspaceGroup + ) + m.Lock() + defer m.Unlock() + if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) { + // Load and check all keyspace groups first. + for _, kgID := range append(mergeList, mergeTargetID) { + kg, err := m.store.LoadKeyspaceGroup(txn, kgID) + if err != nil { + return err + } + if kg == nil { + return ErrKeyspaceGroupNotExists + } + // A keyspace group can not be merged if it's in splitting. + if kg.IsSplitting() { + return ErrKeyspaceGroupInSplit + } + // A keyspace group can not be split when it is in merging. + if kg.IsMerging() { + return ErrKeyspaceGroupInMerging + } + groups[kgID] = kg + } + mergeTargetKg = groups[mergeTargetID] + keyspaces := make(map[uint32]struct{}) + for _, keyspace := range mergeTargetKg.Keyspaces { + keyspaces[keyspace] = struct{}{} + } + // Delete the keyspace groups in merge list and move the keyspaces in it to the target keyspace group. + for _, kgID := range mergeList { + kg := groups[kgID] + for _, keyspace := range kg.Keyspaces { + keyspaces[keyspace] = struct{}{} + } + if err := m.store.DeleteKeyspaceGroup(txn, kg.ID); err != nil { + return err + } + } + mergedKeyspaces := make([]uint32, 0, len(keyspaces)) + for keyspace := range keyspaces { + mergedKeyspaces = append(mergedKeyspaces, keyspace) + } + sort.Slice(mergedKeyspaces, func(i, j int) bool { + return mergedKeyspaces[i] < mergedKeyspaces[j] + }) + mergeTargetKg.Keyspaces = mergedKeyspaces + // Update the merge state of the target keyspace group. + mergeTargetKg.MergeState = &endpoint.MergeState{ + MergeList: mergeList, + } + return m.store.SaveKeyspaceGroup(txn, mergeTargetKg) + }); err != nil { + return err + } + // Update the keyspace group cache. + m.groups[endpoint.StringUserKind(mergeTargetKg.UserKind)].Put(mergeTargetKg) + for _, kgID := range mergeList { + kg := groups[kgID] + m.groups[endpoint.StringUserKind(kg.UserKind)].Remove(kgID) + } + return nil +} + +// FinishMergeKeyspaceByID finishes the merging keyspace group by the merge target ID. +func (m *GroupManager) FinishMergeKeyspaceByID(mergeTargetID uint32) error { + var mergeTargetKg *endpoint.KeyspaceGroup + m.Lock() + defer m.Unlock() + if err := m.store.RunInTxn(m.ctx, func(txn kv.Txn) (err error) { + // Load the merge target keyspace group first. + mergeTargetKg, err = m.store.LoadKeyspaceGroup(txn, mergeTargetID) + if err != nil { + return err + } + if mergeTargetKg == nil { + return ErrKeyspaceGroupNotExists + } + // Check if it's in the merging state. + if !mergeTargetKg.IsMergeTarget() { + return ErrKeyspaceGroupNotInMerging + } + // Make sure all merging keyspace groups are deleted. + for _, kgID := range mergeTargetKg.MergeState.MergeList { + kg, err := m.store.LoadKeyspaceGroup(txn, kgID) + if err != nil { + return err + } + if kg != nil { + return ErrKeyspaceGroupNotInMerging + } + } + mergeTargetKg.MergeState = nil + return m.store.SaveKeyspaceGroup(txn, mergeTargetKg) + }); err != nil { + return err + } + // Update the keyspace group cache. + m.groups[endpoint.StringUserKind(mergeTargetKg.UserKind)].Put(mergeTargetKg) + return nil +} diff --git a/pkg/keyspace/tso_keyspace_group_test.go b/pkg/keyspace/tso_keyspace_group_test.go index fbd9d126eab..c5d678333e4 100644 --- a/pkg/keyspace/tso_keyspace_group_test.go +++ b/pkg/keyspace/tso_keyspace_group_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/mock/mockcluster" @@ -84,11 +85,12 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupOperations() { re.NoError(err) re.Len(kgs, 2) // get the default keyspace group - kg, err := suite.kgm.GetKeyspaceGroupByID(0) + kg, err := suite.kgm.GetKeyspaceGroupByID(utils.DefaultKeyspaceGroupID) re.NoError(err) re.Equal(uint32(0), kg.ID) re.Equal(endpoint.Basic.String(), kg.UserKind) re.False(kg.IsSplitting()) + // get the keyspace group 3 kg, err = suite.kgm.GetKeyspaceGroupByID(3) re.NoError(err) re.Equal(uint32(3), kg.ID) @@ -320,3 +322,197 @@ func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplit() { err = suite.kgm.SplitKeyspaceGroupByID(2, 5, []uint32{111, 222, 444}) re.ErrorIs(err, ErrKeyspaceNotInKeyspaceGroup) } + +func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupSplitRange() { + re := suite.Require() + + keyspaceGroups := []*endpoint.KeyspaceGroup{ + { + ID: uint32(1), + UserKind: endpoint.Basic.String(), + }, + { + ID: uint32(2), + UserKind: endpoint.Standard.String(), + Keyspaces: []uint32{111, 333, 444, 555, 666}, + Members: make([]endpoint.KeyspaceGroupMember, utils.DefaultKeyspaceGroupReplicaCount), + }, + } + err := suite.kgm.CreateKeyspaceGroups(keyspaceGroups) + re.NoError(err) + // split the keyspace group 2 to 4 with keyspace range [222, 555] + err = suite.kgm.SplitKeyspaceGroupByID(2, 4, nil, 222, 555) + re.NoError(err) + kg2, err := suite.kgm.GetKeyspaceGroupByID(2) + re.NoError(err) + re.Equal(uint32(2), kg2.ID) + re.Equal([]uint32{111, 666}, kg2.Keyspaces) + re.True(kg2.IsSplitSource()) + re.Equal(kg2.ID, kg2.SplitSource()) + kg4, err := suite.kgm.GetKeyspaceGroupByID(4) + re.NoError(err) + re.Equal(uint32(4), kg4.ID) + re.Equal([]uint32{333, 444, 555}, kg4.Keyspaces) + re.True(kg4.IsSplitTarget()) + re.Equal(kg2.ID, kg4.SplitSource()) + re.Equal(kg2.UserKind, kg4.UserKind) + re.Equal(kg2.Members, kg4.Members) + // finish the split of keyspace group 4 + err = suite.kgm.FinishSplitKeyspaceByID(4) + re.NoError(err) + kg2, err = suite.kgm.GetKeyspaceGroupByID(2) + re.NoError(err) + re.Equal(uint32(2), kg2.ID) + re.Equal([]uint32{111, 666}, kg2.Keyspaces) + re.False(kg2.IsSplitting()) + kg4, err = suite.kgm.GetKeyspaceGroupByID(4) + re.NoError(err) + re.Equal(uint32(4), kg4.ID) + re.Equal([]uint32{333, 444, 555}, kg4.Keyspaces) + re.False(kg4.IsSplitting()) + re.Equal(kg2.UserKind, kg4.UserKind) + re.Equal(kg2.Members, kg4.Members) +} + +func (suite *keyspaceGroupTestSuite) TestKeyspaceGroupMerge() { + re := suite.Require() + + keyspaceGroups := []*endpoint.KeyspaceGroup{ + { + ID: uint32(1), + UserKind: endpoint.Basic.String(), + Keyspaces: []uint32{111, 222, 333}, + Members: make([]endpoint.KeyspaceGroupMember, utils.DefaultKeyspaceGroupReplicaCount), + }, + { + ID: uint32(3), + UserKind: endpoint.Basic.String(), + Keyspaces: []uint32{444, 555}, + }, + } + err := suite.kgm.CreateKeyspaceGroups(keyspaceGroups) + re.NoError(err) + // split the keyspace group 1 to 2 + err = suite.kgm.SplitKeyspaceGroupByID(1, 2, []uint32{333}) + re.NoError(err) + // finish the split of the keyspace group 2 + err = suite.kgm.FinishSplitKeyspaceByID(2) + re.NoError(err) + // check the keyspace group 1 and 2 + kg1, err := suite.kgm.GetKeyspaceGroupByID(1) + re.NoError(err) + re.Equal(uint32(1), kg1.ID) + re.Equal([]uint32{111, 222}, kg1.Keyspaces) + re.False(kg1.IsSplitting()) + re.False(kg1.IsMerging()) + kg2, err := suite.kgm.GetKeyspaceGroupByID(2) + re.NoError(err) + re.Equal(uint32(2), kg2.ID) + re.Equal([]uint32{333}, kg2.Keyspaces) + re.False(kg2.IsSplitting()) + re.False(kg2.IsMerging()) + re.Equal(kg1.UserKind, kg2.UserKind) + re.Equal(kg1.Members, kg2.Members) + // merge the keyspace group 2 and 3 back into 1 + err = suite.kgm.MergeKeyspaceGroups(1, []uint32{2, 3}) + re.NoError(err) + // check the keyspace group 2 and 3 + kg2, err = suite.kgm.GetKeyspaceGroupByID(2) + re.NoError(err) + re.Nil(kg2) + kg3, err := suite.kgm.GetKeyspaceGroupByID(3) + re.NoError(err) + re.Nil(kg3) + // check the keyspace group 1 + kg1, err = suite.kgm.GetKeyspaceGroupByID(1) + re.NoError(err) + re.Equal(uint32(1), kg1.ID) + re.Equal([]uint32{111, 222, 333, 444, 555}, kg1.Keyspaces) + re.False(kg1.IsSplitting()) + re.True(kg1.IsMerging()) + // finish the merging + err = suite.kgm.FinishMergeKeyspaceByID(1) + re.NoError(err) + kg1, err = suite.kgm.GetKeyspaceGroupByID(1) + re.NoError(err) + re.Equal(uint32(1), kg1.ID) + re.Equal([]uint32{111, 222, 333, 444, 555}, kg1.Keyspaces) + re.False(kg1.IsSplitting()) + re.False(kg1.IsMerging()) + + // merge a non-existing keyspace group + err = suite.kgm.MergeKeyspaceGroups(4, []uint32{5}) + re.ErrorIs(err, ErrKeyspaceGroupNotExists) + // merge with the number of keyspace groups exceeds the limit + err = suite.kgm.MergeKeyspaceGroups(1, make([]uint32, maxEtcdTxnOps/2)) + re.ErrorIs(err, ErrExceedMaxEtcdTxnOps) + // merge the default keyspace group + err = suite.kgm.MergeKeyspaceGroups(1, []uint32{utils.DefaultKeyspaceGroupID}) + re.ErrorIs(err, ErrModifyDefaultKeyspaceGroup) +} + +func TestBuildSplitKeyspaces(t *testing.T) { + re := require.New(t) + testCases := []struct { + old []uint32 + new []uint32 + startKeyspaceID uint32 + endKeyspaceID uint32 + expectedOld []uint32 + expectedNew []uint32 + err error + }{ + { + old: []uint32{1, 2, 3, 4, 5}, + new: []uint32{1, 2, 3, 4, 5}, + expectedOld: []uint32{}, + expectedNew: []uint32{1, 2, 3, 4, 5}, + }, + { + old: []uint32{1, 2, 3, 4, 5}, + new: []uint32{1}, + expectedOld: []uint32{2, 3, 4, 5}, + expectedNew: []uint32{1}, + }, + { + old: []uint32{1, 2, 3, 4, 5}, + new: []uint32{6}, + err: ErrKeyspaceNotInKeyspaceGroup, + }, + { + old: []uint32{1, 2, 3, 4, 5}, + startKeyspaceID: 2, + endKeyspaceID: 4, + expectedOld: []uint32{1, 5}, + expectedNew: []uint32{2, 3, 4}, + }, + { + old: []uint32{1, 2, 3, 4, 5}, + startKeyspaceID: 2, + endKeyspaceID: 6, + expectedOld: []uint32{1}, + expectedNew: []uint32{2, 3, 4, 5}, + }, + { + old: []uint32{1, 2, 3, 4, 5}, + startKeyspaceID: 0, + endKeyspaceID: 6, + expectedOld: []uint32{}, + expectedNew: []uint32{1, 2, 3, 4, 5}, + }, + { + old: []uint32{1, 2, 3, 4, 5}, + err: ErrKeyspaceNotInKeyspaceGroup, + }, + } + for idx, testCase := range testCases { + old, new, err := buildSplitKeyspaces(testCase.old, testCase.new, testCase.startKeyspaceID, testCase.endKeyspaceID) + if testCase.err != nil { + re.ErrorIs(testCase.err, err, "test case %d", idx) + } else { + re.NoError(err, "test case %d", idx) + re.Equal(testCase.expectedOld, old, "test case %d", idx) + re.Equal(testCase.expectedNew, new, "test case %d", idx) + } + } +} diff --git a/pkg/keyspace/util.go b/pkg/keyspace/util.go index c9f94b6c437..f6212efc2a0 100644 --- a/pkg/keyspace/util.go +++ b/pkg/keyspace/util.go @@ -54,16 +54,24 @@ var ( ErrKeyspaceGroupInSplit = errors.New("keyspace group is in split state") // ErrKeyspaceGroupNotInSplit is used to indicate target keyspace group is not in split state. ErrKeyspaceGroupNotInSplit = errors.New("keyspace group is not in split state") + // ErrKeyspaceGroupInMerging is used to indicate target keyspace group is in merging state. + ErrKeyspaceGroupInMerging = errors.New("keyspace group is in merging state") + // ErrKeyspaceGroupNotInMerging is used to indicate target keyspace group is not in merging state. + ErrKeyspaceGroupNotInMerging = errors.New("keyspace group is not in merging state") // ErrKeyspaceNotInKeyspaceGroup is used to indicate target keyspace is not in this keyspace group. ErrKeyspaceNotInKeyspaceGroup = errors.New("keyspace is not in this keyspace group") // ErrNodeNotInKeyspaceGroup is used to indicate the tso node is not in this keyspace group. ErrNodeNotInKeyspaceGroup = errors.New("the tso node is not in this keyspace group") // ErrKeyspaceGroupNotEnoughReplicas is used to indicate not enough replicas in the keyspace group. ErrKeyspaceGroupNotEnoughReplicas = errors.New("not enough replicas in the keyspace group") + // ErrModifyDefaultKeyspaceGroup is used to indicate that default keyspace group cannot be modified. + ErrModifyDefaultKeyspaceGroup = errors.New("default keyspace group cannot be modified") // ErrNoAvailableNode is used to indicate no available node in the keyspace group. - ErrNoAvailableNode = errors.New("no available node") - errModifyDefault = errors.New("cannot modify default keyspace's state") - errIllegalOperation = errors.New("unknown operation") + ErrNoAvailableNode = errors.New("no available node") + // ErrExceedMaxEtcdTxnOps is used to indicate the number of etcd txn operations exceeds the limit. + ErrExceedMaxEtcdTxnOps = errors.New("exceed max etcd txn operations") + errModifyDefault = errors.New("cannot modify default keyspace's state") + errIllegalOperation = errors.New("unknown operation") // stateTransitionTable lists all allowed next state for the given current state. // Note that transit from any state to itself is allowed for idempotence. diff --git a/pkg/mcs/discovery/discover.go b/pkg/mcs/discovery/discover.go index d3c06ad2cc8..6d939fde540 100644 --- a/pkg/mcs/discovery/discover.go +++ b/pkg/mcs/discovery/discover.go @@ -21,7 +21,7 @@ import ( // Discover is used to get all the service instances of the specified service name. func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, error) { - key := discoveryPath(clusterID, serviceName) + "/" + key := ServicePath(clusterID, serviceName) + "/" endKey := clientv3.GetPrefixRangeEnd(key) withRange := clientv3.WithRange(endKey) diff --git a/pkg/mcs/discovery/key_path.go b/pkg/mcs/discovery/key_path.go index 0e53b21c9fe..4eb339dd5db 100644 --- a/pkg/mcs/discovery/key_path.go +++ b/pkg/mcs/discovery/key_path.go @@ -24,15 +24,17 @@ const ( registryKey = "registry" ) -func registryPath(clusterID, serviceName, serviceAddr string) string { +// RegistryPath returns the full path to store microservice addresses. +func RegistryPath(clusterID, serviceName, serviceAddr string) string { return strings.Join([]string{registryPrefix, clusterID, serviceName, registryKey, serviceAddr}, "/") } -func discoveryPath(clusterID, serviceName string) string { +// ServicePath returns the path to store microservice addresses. +func ServicePath(clusterID, serviceName string) string { return strings.Join([]string{registryPrefix, clusterID, serviceName, registryKey}, "/") } // TSOPath returns the path to store TSO addresses. func TSOPath(clusterID uint64) string { - return discoveryPath(strconv.FormatUint(clusterID, 10), "tso") + "/" + return ServicePath(strconv.FormatUint(clusterID, 10), "tso") + "/" } diff --git a/pkg/mcs/discovery/register.go b/pkg/mcs/discovery/register.go index 617c1520b8d..3e08d9b49cf 100644 --- a/pkg/mcs/discovery/register.go +++ b/pkg/mcs/discovery/register.go @@ -41,7 +41,7 @@ type ServiceRegister struct { // NewServiceRegister creates a new ServiceRegister. func NewServiceRegister(ctx context.Context, cli *clientv3.Client, clusterID, serviceName, serviceAddr, serializedValue string, ttl int64) *ServiceRegister { cctx, cancel := context.WithCancel(ctx) - serviceKey := registryPath(clusterID, serviceName, serviceAddr) + serviceKey := RegistryPath(clusterID, serviceName, serviceAddr) return &ServiceRegister{ ctx: cctx, cancel: cancel, diff --git a/pkg/mcs/tso/server/server.go b/pkg/mcs/tso/server/server.go index 2186eaff671..c534ba70958 100644 --- a/pkg/mcs/tso/server/server.go +++ b/pkg/mcs/tso/server/server.go @@ -540,7 +540,8 @@ func (s *Server) startServer() (err error) { tsoSvcRootPath := fmt.Sprintf(tsoSvcRootPathFormat, s.clusterID) s.serviceID = &discovery.ServiceRegistryEntry{ServiceAddr: s.cfg.AdvertiseListenAddr} s.keyspaceGroupManager = tso.NewKeyspaceGroupManager( - s.serverLoopCtx, s.serviceID, s.etcdClient, s.httpClient, s.cfg.AdvertiseListenAddr, legacySvcRootPath, tsoSvcRootPath, s.cfg) + s.serverLoopCtx, s.serviceID, s.etcdClient, s.httpClient, s.cfg.AdvertiseListenAddr, + discovery.TSOPath(s.clusterID), legacySvcRootPath, tsoSvcRootPath, s.cfg) if err := s.keyspaceGroupManager.Initialize(); err != nil { return err } diff --git a/pkg/mcs/utils/constant.go b/pkg/mcs/utils/constant.go index 21a4a655afe..c87cec16a64 100644 --- a/pkg/mcs/utils/constant.go +++ b/pkg/mcs/utils/constant.go @@ -76,5 +76,8 @@ const ( DefaultKeyspaceGroupReplicaCount = 2 // DefaultKeyspaceGroupReplicaPriority is the default priority of a keyspace group replica. + // It's used in keyspace group primary weighted-election to balance primaries' distribution. + // Among multiple replicas of a keyspace group, the higher the priority, the more likely + // the replica is to be elected as primary. DefaultKeyspaceGroupReplicaPriority = 0 ) diff --git a/pkg/storage/endpoint/tso_keyspace_group.go b/pkg/storage/endpoint/tso_keyspace_group.go index b9d55b3b994..498cd878887 100644 --- a/pkg/storage/endpoint/tso_keyspace_group.go +++ b/pkg/storage/endpoint/tso_keyspace_group.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" + "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/storage/kv" "go.etcd.io/etcd/clientv3" ) @@ -87,12 +88,20 @@ type SplitState struct { SplitSource uint32 `json:"split-source"` } +// MergeState defines the merging state of a keyspace group. +type MergeState struct { + // MergeList is the list of keyspace group IDs which are merging to this target keyspace group. + MergeList []uint32 `json:"merge-list"` +} + // KeyspaceGroup is the keyspace group. type KeyspaceGroup struct { ID uint32 `json:"id"` UserKind string `json:"user-kind"` // SplitState is the current split state of the keyspace group. SplitState *SplitState `json:"split-state,omitempty"` + // MergeState is the current merging state of the keyspace group. + MergeState *MergeState `json:"merge-state,omitempty"` // Members are the election members which campaign for the primary of the keyspace group. Members []KeyspaceGroupMember `json:"members"` // Keyspaces are the keyspace IDs which belong to the keyspace group. @@ -126,6 +135,21 @@ func (kg *KeyspaceGroup) SplitSource() uint32 { return 0 } +// IsMerging checks if the keyspace group is in merging state. +func (kg *KeyspaceGroup) IsMerging() bool { + return kg != nil && kg.MergeState != nil +} + +// IsMergeTarget checks if the keyspace group is in merging state and is the merge target. +func (kg *KeyspaceGroup) IsMergeTarget() bool { + return kg.IsMerging() && !slice.Contains(kg.MergeState.MergeList, kg.ID) +} + +// IsMergeSource checks if the keyspace group is in merging state and is the merge source. +func (kg *KeyspaceGroup) IsMergeSource() bool { + return kg.IsMerging() && slice.Contains(kg.MergeState.MergeList, kg.ID) +} + // KeyspaceGroupStorage is the interface for keyspace group storage. type KeyspaceGroupStorage interface { LoadKeyspaceGroups(startID uint32, limit int) ([]*KeyspaceGroup, error) diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index ed881b1c7df..754f69cb664 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -1404,3 +1404,16 @@ func (am *AllocatorManager) GetLeaderAddr() string { } return leaderAddrs[0] } + +// Construct the timestampOracle path prefix, which is: +// 1. for the default keyspace group: +// "" in /pd/{cluster_id}/timestamp +// 2. for the non-default keyspace groups: +// {group}/gta in /ms/{cluster_id}/tso/{group}/gta/timestamp +func (am *AllocatorManager) getKeyspaceGroupTSPath(groupID uint32) string { + tsPath := "" + if am.kgID != mcsutils.DefaultKeyspaceGroupID { + tsPath = path.Join(fmt.Sprintf("%05d", groupID), globalTSOAllocatorEtcdPrefix) + } + return tsPath +} diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 284d7dc316a..2c715d0cc7c 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -18,7 +18,6 @@ import ( "context" "errors" "fmt" - "path" "sync" "sync/atomic" "time" @@ -89,16 +88,6 @@ func NewGlobalTSOAllocator( am *AllocatorManager, startGlobalLeaderLoop bool, ) Allocator { - // Construct the timestampOracle path prefix, which is: - // 1. for the default keyspace group: - // "" in /pd/{cluster_id}/timestamp - // 2. for the non-default keyspace groups: - // {group}/gta in /ms/{cluster_id}/tso/{group}/gta/timestamp - tsPath := "" - if am.kgID != mcsutils.DefaultKeyspaceGroupID { - tsPath = path.Join(fmt.Sprintf("%05d", am.kgID), globalTSOAllocatorEtcdPrefix) - } - ctx, cancel := context.WithCancel(ctx) gta := &GlobalTSOAllocator{ ctx: ctx, @@ -107,8 +96,7 @@ func NewGlobalTSOAllocator( member: am.member, timestampOracle: ×tampOracle{ client: am.member.GetLeadership().GetClient(), - rootPath: am.rootPath, - tsPath: tsPath, + tsPath: am.getKeyspaceGroupTSPath(am.kgID), storage: am.storage, saveInterval: am.saveInterval, updatePhysicalInterval: am.updatePhysicalInterval, diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 62a6986422c..a82376430fa 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -18,6 +18,7 @@ import ( "context" "encoding/json" "fmt" + "math" "net/http" "path" "regexp" @@ -27,6 +28,7 @@ import ( "time" perrors "github.com/pingcap/errors" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/election" @@ -34,6 +36,7 @@ import ( "github.com/tikv/pd/pkg/mcs/discovery" mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/member" + "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/storage/kv" "github.com/tikv/pd/pkg/utils/apiutil" @@ -41,6 +44,7 @@ import ( "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/memberutil" "github.com/tikv/pd/pkg/utils/tsoutil" + "github.com/tikv/pd/pkg/utils/typeutil" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/zap" @@ -50,6 +54,13 @@ const ( keyspaceGroupsElectionPath = mcsutils.KeyspaceGroupsKey + "/election" // primaryKey is the key for keyspace group primary election. primaryKey = "primary" + // mergingCheckInterval is the interval for merging check to see if the keyspace groups + // merging process could be moved forward. + mergingCheckInterval = 5 * time.Second + // defaultPrimaryPriorityCheckInterval is the default interval for checking if the priorities + // of the primaries on this TSO server/pod have changed. A goroutine will periodically check + // do this check and re-distribute the primaries if necessary. + defaultPrimaryPriorityCheckInterval = 10 * time.Second ) type state struct { @@ -148,6 +159,41 @@ func (s *state) getKeyspaceGroupMetaWithCheck( mcsutils.DefaultKeyspaceGroupID, nil } +func (s *state) getNextPrimaryToReset( + groupID int, localAddress string, +) (member ElectionMember, kg *endpoint.KeyspaceGroup, localPriority, nextGroupID int) { + s.RLock() + defer s.RUnlock() + + // Both s.ams and s.kgs are arrays with the fixed size defined by the const value MaxKeyspaceGroupCountInUse. + groupSize := int(mcsutils.MaxKeyspaceGroupCountInUse) + groupID %= groupSize + for j := 0; j < groupSize; groupID, j = (groupID+1)%groupSize, j+1 { + am := s.ams[groupID] + kg := s.kgs[groupID] + if am != nil && kg != nil && am.GetMember().IsLeader() { + maxPriority := math.MinInt32 + localPriority := math.MaxInt32 + for _, member := range kg.Members { + if member.Priority > maxPriority { + maxPriority = member.Priority + } + if member.Address == localAddress { + localPriority = member.Priority + } + } + + if localPriority < maxPriority { + // return here and reset the primary outside of the critical section + // as resetting the primary may take some time. + return am.GetMember(), kg, localPriority, (groupID + 1) % groupSize + } + } + } + + return nil, nil, 0, groupID +} + // kgPrimaryPathBuilder builds the path for keyspace group primary election. // default keyspace group: "/ms/{cluster_id}/tso/00000/primary". // non-default keyspace group: "/ms/{cluster_id}/tso/keyspace_groups/election/{group}/primary". @@ -193,6 +239,10 @@ type KeyspaceGroupManager struct { // which participate in the election of its keyspace group's primary, in the format of // "electionNamePrefix:keyspace-group-id" electionNamePrefix string + // tsoServiceKey is the path for storing the registered tso servers. + // Key: /ms/{cluster_id}/tso/registry/{tsoServerAddress} + // Value: discover.ServiceRegistryEntry + tsoServiceKey string // legacySvcRootPath defines the legacy root path for all etcd paths which derives from // the PD/API service. It's in the format of "/pd/{cluster_id}". // The main paths for different usages include: @@ -233,14 +283,26 @@ type KeyspaceGroupManager struct { loadKeyspaceGroupsBatchSize int64 loadFromEtcdMaxRetryTimes int - // compiledKGMembershipIDRegexp is the compiled regular expression for matching keyspace group id in the - // keyspace group membership path. + // compiledKGMembershipIDRegexp is the compiled regular expression for matching keyspace group id + // in the keyspace group membership path. compiledKGMembershipIDRegexp *regexp.Regexp // groupUpdateRetryList is the list of keyspace groups which failed to update and need to retry. groupUpdateRetryList map[uint32]*endpoint.KeyspaceGroup groupWatcher *etcdutil.LoopWatcher - primaryPathBuilder *kgPrimaryPathBuilder + // mergeCheckerCancelMap is the cancel function map for the merge checker of each keyspace group. + mergeCheckerCancelMap sync.Map // GroupID -> context.CancelFunc + + primaryPathBuilder *kgPrimaryPathBuilder + primaryPriorityCheckInterval time.Duration + + // tsoNodes is the registered tso servers. + tsoNodes sync.Map // store as map[string]struct{} + // serviceRegistryMap stores the mapping from the service registry key to the service address. + // Note: it is only used in tsoNodesWatcher. + serviceRegistryMap map[string]string + // tsoNodesWatcher is the watcher for the registered tso servers. + tsoNodesWatcher *etcdutil.LoopWatcher } // NewKeyspaceGroupManager creates a new Keyspace Group Manager. @@ -250,6 +312,7 @@ func NewKeyspaceGroupManager( etcdClient *clientv3.Client, httpClient *http.Client, electionNamePrefix string, + tsoServiceKey string, legacySvcRootPath string, tsoSvcRootPath string, cfg ServiceConfig, @@ -262,16 +325,19 @@ func NewKeyspaceGroupManager( ctx, cancel := context.WithCancel(ctx) kgm := &KeyspaceGroupManager{ - ctx: ctx, - cancel: cancel, - tsoServiceID: tsoServiceID, - etcdClient: etcdClient, - httpClient: httpClient, - electionNamePrefix: electionNamePrefix, - legacySvcRootPath: legacySvcRootPath, - tsoSvcRootPath: tsoSvcRootPath, - cfg: cfg, - groupUpdateRetryList: make(map[uint32]*endpoint.KeyspaceGroup), + ctx: ctx, + cancel: cancel, + tsoServiceID: tsoServiceID, + etcdClient: etcdClient, + httpClient: httpClient, + electionNamePrefix: electionNamePrefix, + tsoServiceKey: tsoServiceKey, + legacySvcRootPath: legacySvcRootPath, + tsoSvcRootPath: tsoSvcRootPath, + primaryPriorityCheckInterval: defaultPrimaryPriorityCheckInterval, + cfg: cfg, + groupUpdateRetryList: make(map[uint32]*endpoint.KeyspaceGroup), + serviceRegistryMap: make(map[string]string), } kgm.legacySvcStorage = endpoint.NewStorageEndpoint( kv.NewEtcdKVBase(kgm.etcdClient, kgm.legacySvcRootPath), nil) @@ -288,6 +354,100 @@ func NewKeyspaceGroupManager( // Initialize this KeyspaceGroupManager func (kgm *KeyspaceGroupManager) Initialize() error { + if err := kgm.InitializeTSOServerWatchLoop(); err != nil { + log.Error("failed to initialize tso server watch loop", zap.Error(err)) + kgm.Close() // Close the manager to clean up the allocated resources. + return errs.ErrLoadKeyspaceGroupsTerminated.Wrap(err) + } + if err := kgm.InitializeGroupWatchLoop(); err != nil { + log.Error("failed to initialize group watch loop", zap.Error(err)) + kgm.Close() // Close the manager to clean up the loaded keyspace groups. + return errs.ErrLoadKeyspaceGroupsTerminated.Wrap(err) + } + + kgm.wg.Add(1) + go kgm.primaryPriorityCheckLoop() + + return nil +} + +// Close this KeyspaceGroupManager +func (kgm *KeyspaceGroupManager) Close() { + log.Info("closing keyspace group manager") + + // Note: don't change the order. We need to cancel all service loops in the keyspace group manager + // before closing all keyspace groups. It's to prevent concurrent addition/removal of keyspace groups + // during critical periods such as service shutdown and online keyspace group, while the former requires + // snapshot isolation to ensure all keyspace groups are properly closed and no new keyspace group is + // added/initialized after that. + kgm.cancel() + kgm.wg.Wait() + kgm.state.deinitialize() + + log.Info("keyspace group manager closed") +} + +// GetServiceConfig returns the service config. +func (kgm *KeyspaceGroupManager) GetServiceConfig() ServiceConfig { + return kgm.cfg +} + +// InitializeTSOServerWatchLoop initializes the watch loop monitoring the path for storing the +// registered tso servers. +// Key: /ms/{cluster_id}/tso/registry/{tsoServerAddress} +// Value: discover.ServiceRegistryEntry +func (kgm *KeyspaceGroupManager) InitializeTSOServerWatchLoop() error { + tsoServiceEndKey := clientv3.GetPrefixRangeEnd(kgm.tsoServiceKey) + "/" + + putFn := func(kv *mvccpb.KeyValue) error { + s := &discovery.ServiceRegistryEntry{} + if err := json.Unmarshal(kv.Value, s); err != nil { + log.Warn("failed to unmarshal service registry entry", + zap.String("event-kv-key", string(kv.Key)), zap.Error(err)) + return err + } + kgm.tsoNodes.Store(s.ServiceAddr, struct{}{}) + kgm.serviceRegistryMap[string(kv.Key)] = s.ServiceAddr + return nil + } + deleteFn := func(kv *mvccpb.KeyValue) error { + key := string(kv.Key) + if serviceAddr, ok := kgm.serviceRegistryMap[key]; ok { + delete(kgm.serviceRegistryMap, key) + kgm.tsoNodes.Delete(serviceAddr) + return nil + } + return perrors.Errorf("failed to find the service address for key %s", key) + } + + kgm.tsoNodesWatcher = etcdutil.NewLoopWatcher( + kgm.ctx, + &kgm.wg, + kgm.etcdClient, + "tso-nodes-watcher", + kgm.tsoServiceKey, + putFn, + deleteFn, + func() error { return nil }, + clientv3.WithRange(tsoServiceEndKey), + ) + + kgm.wg.Add(1) + go kgm.tsoNodesWatcher.StartWatchLoop() + + if err := kgm.tsoNodesWatcher.WaitLoad(); err != nil { + log.Error("failed to load the registered tso servers", errs.ZapError(err)) + return err + } + + return nil +} + +// InitializeGroupWatchLoop initializes the watch loop monitoring the path for storing keyspace group +// membership/distribution metadata. +// Key: /pd/{cluster_id}/tso/keyspace_groups/membership/{group} +// Value: endpoint.KeyspaceGroup +func (kgm *KeyspaceGroupManager) InitializeGroupWatchLoop() error { rootPath := kgm.legacySvcRootPath startKey := strings.Join([]string{rootPath, endpoint.KeyspaceGroupIDPath(mcsutils.DefaultKeyspaceGroupID)}, "/") endKey := strings.Join( @@ -367,29 +527,72 @@ func (kgm *KeyspaceGroupManager) Initialize() error { return nil } -// Close this KeyspaceGroupManager -func (kgm *KeyspaceGroupManager) Close() { - log.Info("closing keyspace group manager") +func (kgm *KeyspaceGroupManager) primaryPriorityCheckLoop() { + defer logutil.LogPanic() + defer kgm.wg.Done() - // Note: don't change the order. We need to cancel all service loops in the keyspace group manager - // before closing all keyspace groups. It's to prevent concurrent addition/removal of keyspace groups - // during critical periods such as service shutdown and online keyspace group, while the former requires - // snapshot isolation to ensure all keyspace groups are properly closed and no new keyspace group is - // added/initialized after that. - kgm.cancel() - kgm.wg.Wait() - kgm.state.deinitialize() + failpoint.Inject("fastPrimaryPriorityCheck", func() { + kgm.primaryPriorityCheckInterval = 200 * time.Millisecond + }) - log.Info("keyspace group manager closed") + ctx, cancel := context.WithCancel(kgm.ctx) + defer cancel() + groupID := 0 + for { + select { + case <-ctx.Done(): + log.Info("exit primary priority check loop") + return + case <-time.After(kgm.primaryPriorityCheckInterval): + // Every primaryPriorityCheckInterval, we only reset the primary of one keyspace group + member, kg, localPriority, nextGroupID := kgm.getNextPrimaryToReset(groupID, kgm.tsoServiceID.ServiceAddr) + if member != nil { + aliveTSONodes := make(map[string]struct{}) + kgm.tsoNodes.Range(func(key, _ interface{}) bool { + aliveTSONodes[key.(string)] = struct{}{} + return true + }) + if len(aliveTSONodes) == 0 { + log.Warn("no alive tso node", zap.String("local-address", kgm.tsoServiceID.ServiceAddr)) + continue + } + // If there is a alive member with higher priority, reset the leader. + resetLeader := false + for _, member := range kg.Members { + if member.Priority <= localPriority { + continue + } + if _, ok := aliveTSONodes[member.Address]; ok { + resetLeader = true + break + } + } + if resetLeader { + select { + case <-ctx.Done(): + default: + member.ResetLeader() + log.Info("reset primary", + zap.String("local-address", kgm.tsoServiceID.ServiceAddr), + zap.Uint32("keyspace-group-id", kg.ID), + zap.Int("local-priority", localPriority)) + } + } else { + log.Warn("no need to reset primary as the replicas with higher priority are offline", + zap.String("local-address", kgm.tsoServiceID.ServiceAddr), + zap.Uint32("keyspace-group-id", kg.ID), + zap.Int("local-priority", localPriority)) + } + } + groupID = nextGroupID + } + } } func (kgm *KeyspaceGroupManager) isAssignedToMe(group *endpoint.KeyspaceGroup) bool { - for _, member := range group.Members { - if member.Address == kgm.tsoServiceID.ServiceAddr { - return true - } - } - return false + return slice.AnyOf(group.Members, func(i int) bool { + return group.Members[i].Address == kgm.tsoServiceID.ServiceAddr + }) } // updateKeyspaceGroup applies the given keyspace group. If the keyspace group is just assigned to @@ -416,9 +619,25 @@ func (kgm *KeyspaceGroupManager) updateKeyspaceGroup(group *endpoint.KeyspaceGro return } + oldAM, oldGroup := kgm.getKeyspaceGroupMeta(group.ID) + // If this host owns a replica of the keyspace group which is the merge target, + // it should run the merging checker when the merge state first time changes. + if !oldGroup.IsMergeTarget() && group.IsMergeTarget() { + ctx, cancel := context.WithCancel(kgm.ctx) + kgm.mergeCheckerCancelMap.Store(group.ID, cancel) + kgm.wg.Add(1) + go kgm.mergingChecker(ctx, group.ID, group.MergeState.MergeList) + } + // If the merge state has been finished, cancel its merging checker. + if oldGroup.IsMergeTarget() && !group.IsMergeTarget() { + if cancel, loaded := kgm.mergeCheckerCancelMap.LoadAndDelete(group.ID); loaded && cancel != nil { + cancel.(context.CancelFunc)() + } + } + // If this host is already assigned a replica of this keyspace group, i.e., the election member // is already initialized, just update the meta. - if oldAM, oldGroup := kgm.getKeyspaceGroupMeta(group.ID); oldAM != nil { + if oldAM != nil { kgm.updateKeyspaceGroupMembership(oldGroup, group, true) return } @@ -738,6 +957,10 @@ func (kgm *KeyspaceGroupManager) HandleTSORequest( if err != nil { return pdpb.Timestamp{}, curKeyspaceGroupID, err } + err = kgm.checkTSOMerge(curKeyspaceGroupID) + if err != nil { + return pdpb.Timestamp{}, curKeyspaceGroupID, err + } ts, err = am.HandleRequest(dcLocation, count) return ts, curKeyspaceGroupID, err } @@ -837,23 +1060,31 @@ func (kgm *KeyspaceGroupManager) checkTSOSplit( if err != nil { return err } + // If the split source TSO is not greater than the newly split TSO, we don't need to do anything. if tsoutil.CompareTimestamp(&splitSourceTSO, &splitTSO) <= 0 { - log.Debug("the split source TSO is not greater than the newly split TSO", + log.Info("the split source tso is less than the newly split tso", zap.Int64("split-source-tso-physical", splitSourceTSO.Physical), zap.Int64("split-source-tso-logical", splitSourceTSO.Logical), zap.Int64("split-tso-physical", splitTSO.Physical), - zap.Int64("split-tso-logical", splitTSO.Logical), - ) - return nil + zap.Int64("split-tso-logical", splitTSO.Logical)) + // Finish the split state directly. + return kgm.finishSplitKeyspaceGroup(keyspaceGroupID) } // If the split source TSO is greater than the newly split TSO, we need to update the split // TSO to make sure the following TSO will be greater than the split keyspaces ever had // in the past. - splitSourceTSO.Physical += 1 - err = splitAllocator.SetTSO(tsoutil.GenerateTS(&splitSourceTSO), true, true) + err = splitAllocator.SetTSO(tsoutil.GenerateTS(&pdpb.Timestamp{ + Physical: splitSourceTSO.Physical + 1, + Logical: splitSourceTSO.Logical, + }), true, true) if err != nil { return err } + log.Info("the split source tso is greater than the newly split tso", + zap.Int64("split-source-tso-physical", splitSourceTSO.Physical), + zap.Int64("split-source-tso-logical", splitSourceTSO.Logical), + zap.Int64("split-tso-physical", splitTSO.Physical), + zap.Int64("split-tso-logical", splitTSO.Logical)) // Finish the split state. return kgm.finishSplitKeyspaceGroup(keyspaceGroupID) } @@ -890,3 +1121,180 @@ func (kgm *KeyspaceGroupManager) finishSplitKeyspaceGroup(id uint32) error { kgm.kgs[id] = splitGroup return nil } + +func (kgm *KeyspaceGroupManager) finishMergeKeyspaceGroup(id uint32) error { + kgm.Lock() + defer kgm.Unlock() + // Check if the keyspace group is in the merging state. + mergeTarget := kgm.kgs[id] + if !mergeTarget.IsMergeTarget() { + return nil + } + // Check if the HTTP client is initialized. + if kgm.httpClient == nil { + return nil + } + statusCode, err := apiutil.DoDelete( + kgm.httpClient, + kgm.cfg.GeBackendEndpoints()+keyspaceGroupsAPIPrefix+fmt.Sprintf("/%d/merge", id)) + if err != nil { + return err + } + if statusCode != http.StatusOK { + log.Warn("failed to finish merging keyspace group", + zap.Uint32("keyspace-group-id", id), + zap.Int("status-code", statusCode)) + return errs.ErrSendRequest.FastGenByArgs() + } + // Pre-update the split keyspace group split state in memory. + mergeTarget.MergeState = nil + kgm.kgs[id] = mergeTarget + return nil +} + +// mergingChecker is used to check if the keyspace group is in merge state, and if so, it will +// make sure the newly merged TSO keep consistent with the original ones. +func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTargetID uint32, mergeList []uint32) { + log.Info("start to merge the keyspace group", + zap.String("member", kgm.tsoServiceID.ServiceAddr), + zap.Uint32("merge-target-id", mergeTargetID), + zap.Any("merge-list", mergeList)) + defer logutil.LogPanic() + defer kgm.wg.Done() + + checkTicker := time.NewTicker(mergingCheckInterval) + defer checkTicker.Stop() + // Prepare the merge map. + mergeMap := make(map[uint32]struct{}, len(mergeList)) + for _, id := range mergeList { + mergeMap[id] = struct{}{} + } + + for { + select { + case <-ctx.Done(): + log.Info("merging checker is closed", + zap.String("member", kgm.tsoServiceID.ServiceAddr), + zap.Uint32("merge-target-id", mergeTargetID), + zap.Any("merge-list", mergeList)) + return + case <-checkTicker.C: + } + // Check if current TSO node is the merge target TSO primary node. + am, err := kgm.GetAllocatorManager(mergeTargetID) + if err != nil { + log.Warn("unable to get the merge target allocator manager", + zap.String("member", kgm.tsoServiceID.ServiceAddr), + zap.Uint32("keyspace-group-id", mergeTargetID), + zap.Any("merge-list", mergeList), + zap.Error(err)) + continue + } + // If the current TSO node is not the merge target TSO primary node, + // we still need to keep this loop running to avoid unexpected primary changes. + if !am.IsLeader() { + log.Debug("current tso node is not the merge target primary", + zap.String("member", kgm.tsoServiceID.ServiceAddr), + zap.Uint32("merge-target-id", mergeTargetID), + zap.Any("merge-list", mergeList)) + continue + } + // Check if the keyspace group primaries in the merge map are all gone. + if len(mergeMap) != 0 { + for id := range mergeMap { + leaderPath := path.Join(kgm.primaryPathBuilder.getKeyspaceGroupIDPath(id), primaryKey) + val, err := kgm.tsoSvcStorage.Load(leaderPath) + if err != nil { + log.Error("failed to check if the keyspace group primary in the merge list has gone", + zap.String("member", kgm.tsoServiceID.ServiceAddr), + zap.Uint32("merge-target-id", mergeTargetID), + zap.Any("merge-list", mergeList), + zap.Uint32("merge-id", id), + zap.Any("remaining", mergeMap), + zap.Error(err)) + continue + } + if len(val) == 0 { + delete(mergeMap, id) + } + } + } + if len(mergeMap) > 0 { + continue + } + // All the keyspace group primaries in the merge list are gone, + // update the newly merged TSO to make sure it is greater than the original ones. + var mergedTS time.Time + for _, id := range mergeList { + ts, err := kgm.tsoSvcStorage.LoadTimestamp(am.getKeyspaceGroupTSPath(id)) + if err != nil || ts == typeutil.ZeroTime { + log.Error("failed to load the keyspace group TSO", + zap.String("member", kgm.tsoServiceID.ServiceAddr), + zap.Uint32("merge-target-id", mergeTargetID), + zap.Any("merge-list", mergeList), + zap.Uint32("merge-id", id), + zap.Time("ts", ts), + zap.Error(err)) + mergedTS = typeutil.ZeroTime + break + } + if ts.After(mergedTS) { + mergedTS = ts + } + } + if mergedTS == typeutil.ZeroTime { + continue + } + // Update the newly merged TSO. + // TODO: support the Local TSO Allocator. + allocator, err := am.GetAllocator(GlobalDCLocation) + if err != nil { + log.Error("failed to get the allocator", + zap.String("member", kgm.tsoServiceID.ServiceAddr), + zap.Uint32("merge-target-id", mergeTargetID), + zap.Any("merge-list", mergeList), + zap.Error(err)) + continue + } + err = allocator.SetTSO( + tsoutil.GenerateTS(tsoutil.GenerateTimestamp(mergedTS, 1)), + true, true) + if err != nil { + log.Error("failed to update the newly merged TSO", + zap.String("member", kgm.tsoServiceID.ServiceAddr), + zap.Uint32("merge-target-id", mergeTargetID), + zap.Any("merge-list", mergeList), + zap.Time("merged-ts", mergedTS), + zap.Error(err)) + continue + } + // Finish the merge. + err = kgm.finishMergeKeyspaceGroup(mergeTargetID) + if err != nil { + log.Error("failed to finish the merge", + zap.String("member", kgm.tsoServiceID.ServiceAddr), + zap.Uint32("merge-target-id", mergeTargetID), + zap.Any("merge-list", mergeList), + zap.Error(err)) + continue + } + log.Info("finished merging keyspace group", + zap.String("member", kgm.tsoServiceID.ServiceAddr), + zap.Uint32("merge-target-id", mergeTargetID), + zap.Any("merge-list", mergeList), + zap.Time("merged-ts", mergedTS)) + return + } +} + +// Reject any request if the keyspace group is in merging state, +// we need to wait for the merging checker to finish the TSO merging. +func (kgm *KeyspaceGroupManager) checkTSOMerge( + keyspaceGroupID uint32, +) error { + _, group := kgm.getKeyspaceGroupMeta(keyspaceGroupID) + if !group.IsMerging() { + return nil + } + return errs.ErrKeyspaceGroupIsMerging.FastGenByArgs(keyspaceGroupID) +} diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index 083bf566bf7..9792ac54728 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -22,7 +22,6 @@ import ( "path" "reflect" "sort" - "strconv" "strings" "sync" "testing" @@ -36,8 +35,9 @@ import ( "github.com/tikv/pd/pkg/mcs/discovery" mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/storage/endpoint" - "github.com/tikv/pd/pkg/utils/memberutil" + "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/testutil" + "github.com/tikv/pd/pkg/utils/tsoutil" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/goleak" @@ -51,6 +51,7 @@ type keyspaceGroupManagerTestSuite struct { suite.Suite ctx context.Context cancel context.CancelFunc + ClusterID uint64 backendEndpoints string etcdClient *clientv3.Client clean func() @@ -64,13 +65,23 @@ func TestKeyspaceGroupManagerTestSuite(t *testing.T) { func (suite *keyspaceGroupManagerTestSuite) SetupSuite() { t := suite.T() suite.ctx, suite.cancel = context.WithCancel(context.Background()) + suite.ClusterID = rand.Uint64() suite.backendEndpoints, suite.etcdClient, suite.clean = startEmbeddedEtcd(t) + suite.cfg = suite.createConfig() +} + +func (suite *keyspaceGroupManagerTestSuite) TearDownSuite() { + suite.clean() + suite.cancel() +} - suite.cfg = &TestServiceConfig{ - Name: "tso-test-name", +func (suite *keyspaceGroupManagerTestSuite) createConfig() *TestServiceConfig { + addr := tempurl.Alloc() + return &TestServiceConfig{ + Name: "tso-test-name-default", BackendEndpoints: suite.backendEndpoints, - ListenAddr: "http://127.0.0.1:3379", - AdvertiseListenAddr: "http://127.0.0.1:3379", + ListenAddr: addr, + AdvertiseListenAddr: addr, LeaderLease: mcsutils.DefaultLeaderLease, LocalTSOEnabled: false, TSOUpdatePhysicalInterval: 50 * time.Millisecond, @@ -80,11 +91,6 @@ func (suite *keyspaceGroupManagerTestSuite) SetupSuite() { } } -func (suite *keyspaceGroupManagerTestSuite) TearDownSuite() { - suite.clean() - suite.cancel() -} - // TestNewKeyspaceGroupManager tests the initialization of KeyspaceGroupManager. // It should initialize the allocator manager with the desired configurations and parameters. func (suite *keyspaceGroupManagerTestSuite) TestNewKeyspaceGroupManager() { @@ -92,11 +98,15 @@ func (suite *keyspaceGroupManagerTestSuite) TestNewKeyspaceGroupManager() { tsoServiceID := &discovery.ServiceRegistryEntry{ServiceAddr: suite.cfg.AdvertiseListenAddr} guid := uuid.New().String() + tsoServiceKey := discovery.ServicePath(guid, "tso") + "/" legacySvcRootPath := path.Join("/pd", guid) tsoSvcRootPath := path.Join("/ms", guid, "tso") electionNamePrefix := "tso-server-" + guid - kgm := suite.newKeyspaceGroupManager(tsoServiceID, electionNamePrefix, legacySvcRootPath, tsoSvcRootPath) + kgm := NewKeyspaceGroupManager( + suite.ctx, tsoServiceID, suite.etcdClient, nil, electionNamePrefix, + tsoServiceKey, legacySvcRootPath, tsoSvcRootPath, suite.cfg) + defer kgm.Close() err := kgm.Initialize() re.NoError(err) @@ -116,8 +126,6 @@ func (suite *keyspaceGroupManagerTestSuite) TestNewKeyspaceGroupManager() { re.Equal(legacySvcRootPath, am.rootPath) re.Equal(time.Duration(mcsutils.DefaultLeaderLease)*time.Second, am.saveInterval) re.Equal(time.Duration(50)*time.Millisecond, am.updatePhysicalInterval) - - kgm.Close() } // TestLoadKeyspaceGroupsAssignment tests the loading of the keyspace group assignment. @@ -174,8 +182,8 @@ func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsTimeout() { defer mgr.Close() addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, true, - mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(0), []uint32{0}) + suite.ctx, suite.etcdClient, uint32(0), mgr.legacySvcRootPath, + []string{mgr.tsoServiceID.ServiceAddr}, []int{0}, []uint32{0}) // Set the timeout to 1 second and inject the delayLoad to return 3 seconds to let // the loading sleep 3 seconds. @@ -197,8 +205,8 @@ func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsSucceedWithTem defer mgr.Close() addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, true, - mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(0), []uint32{0}) + suite.ctx, suite.etcdClient, uint32(0), mgr.legacySvcRootPath, + []string{mgr.tsoServiceID.ServiceAddr}, []int{0}, []uint32{0}) // Set the max retry times to 3 and inject the loadTemporaryFail to return 2 to let // loading from etcd fail 2 times but the whole initialization still succeeds. @@ -219,8 +227,8 @@ func (suite *keyspaceGroupManagerTestSuite) TestLoadKeyspaceGroupsFailed() { defer mgr.Close() addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, true, - mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, uint32(0), []uint32{0}) + suite.ctx, suite.etcdClient, uint32(0), mgr.legacySvcRootPath, + []string{mgr.tsoServiceID.ServiceAddr}, []int{0}, []uint32{0}) // Set the max retry times to 3 and inject the loadTemporaryFail to return 3 to let // loading from etcd fail 3 times which should cause the whole initialization to fail. @@ -388,9 +396,8 @@ func (suite *keyspaceGroupManagerTestSuite) TestGetKeyspaceGroupMetaWithCheck() // Create keyspace group 0 which contains keyspace 0, 1, 2. addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, true, - mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, - uint32(0), []uint32{0, 1, 2}) + suite.ctx, suite.etcdClient, uint32(0), mgr.legacySvcRootPath, + []string{mgr.tsoServiceID.ServiceAddr}, []int{0}, []uint32{0, 1, 2}) err = mgr.Initialize() re.NoError(err) @@ -461,14 +468,12 @@ func (suite *keyspaceGroupManagerTestSuite) TestDefaultMembershipRestriction() { // Create keyspace group 0 which contains keyspace 0, 1, 2. addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, true, - mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, - mcsutils.DefaultKeyspaceGroupID, []uint32{mcsutils.DefaultKeyspaceID, 1, 2}) + suite.ctx, suite.etcdClient, mcsutils.DefaultKeyspaceGroupID, rootPath, + []string{svcAddr}, []int{0}, []uint32{mcsutils.DefaultKeyspaceID, 1, 2}) // Create keyspace group 3 which contains keyspace 3, 4. addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, true, - mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, - uint32(3), []uint32{3, 4}) + suite.ctx, suite.etcdClient, uint32(3), mgr.legacySvcRootPath, + []string{mgr.tsoServiceID.ServiceAddr}, []int{0}, []uint32{3, 4}) err = mgr.Initialize() re.NoError(err) @@ -536,14 +541,12 @@ func (suite *keyspaceGroupManagerTestSuite) TestKeyspaceMovementConsistency() { // Create keyspace group 0 which contains keyspace 0, 1, 2. addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, true, - mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, - mcsutils.DefaultKeyspaceGroupID, []uint32{mcsutils.DefaultKeyspaceID, 10, 20}) + suite.ctx, suite.etcdClient, mcsutils.DefaultKeyspaceGroupID, + rootPath, []string{svcAddr}, []int{0}, []uint32{mcsutils.DefaultKeyspaceID, 10, 20}) // Create keyspace group 1 which contains keyspace 3, 4. addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, true, - mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, - uint32(1), []uint32{11, 21}) + suite.ctx, suite.etcdClient, uint32(1), rootPath, + []string{svcAddr}, []int{0}, []uint32{11, 21}) err = mgr.Initialize() re.NoError(err) @@ -591,9 +594,8 @@ func (suite *keyspaceGroupManagerTestSuite) TestHandleTSORequestWithWrongMembers // Create keyspace group 0 which contains keyspace 0, 1, 2. addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, true, - mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, - uint32(0), []uint32{0, 1, 2}) + suite.ctx, suite.etcdClient, uint32(0), mgr.legacySvcRootPath, + []string{mgr.tsoServiceID.ServiceAddr}, []int{0}, []uint32{0, 1, 2}) err := mgr.Initialize() re.NoError(err) @@ -659,15 +661,6 @@ func (suite *keyspaceGroupManagerTestSuite) applyEtcdEvents( } } -func (suite *keyspaceGroupManagerTestSuite) newKeyspaceGroupManager( - tsoServiceID *discovery.ServiceRegistryEntry, - electionNamePrefix, legacySvcRootPath, tsoSvcRootPath string, -) *KeyspaceGroupManager { - return NewKeyspaceGroupManager( - suite.ctx, tsoServiceID, suite.etcdClient, nil, electionNamePrefix, - legacySvcRootPath, tsoSvcRootPath, suite.cfg) -} - // runTestLoadMultipleKeyspaceGroupsAssignment tests the loading of multiple keyspace group assignment. func (suite *keyspaceGroupManagerTestSuite) runTestLoadKeyspaceGroupsAssignment( re *require.Assertions, @@ -705,10 +698,16 @@ func (suite *keyspaceGroupManagerTestSuite) runTestLoadKeyspaceGroupsAssignment( expectedGroupIDs = append(expectedGroupIDs, uint32(j)) mux.Unlock() } + + svcAddrs := make([]string, 0) + if assignToMe { + svcAddrs = append(svcAddrs, mgr.tsoServiceID.ServiceAddr) + } else { + svcAddrs = append(svcAddrs, uuid.NewString()) + } addKeyspaceGroupAssignment( - suite.ctx, suite.etcdClient, - assignToMe, mgr.legacySvcRootPath, mgr.tsoServiceID.ServiceAddr, - uint32(j), []uint32{uint32(j)}) + suite.ctx, suite.etcdClient, uint32(j), mgr.legacySvcRootPath, + svcAddrs, []int{0}, []uint32{uint32(j)}) } }(i) } @@ -734,19 +733,27 @@ func (suite *keyspaceGroupManagerTestSuite) runTestLoadKeyspaceGroupsAssignment( func (suite *keyspaceGroupManagerTestSuite) newUniqueKeyspaceGroupManager( loadKeyspaceGroupsBatchSize int64, // set to 0 to use the default value ) *KeyspaceGroupManager { - tsoServiceID := &discovery.ServiceRegistryEntry{ServiceAddr: suite.cfg.AdvertiseListenAddr} - uniqueID := memberutil.GenerateUniqueID(uuid.New().String()) - uniqueStr := strconv.FormatUint(uniqueID, 10) + return suite.newKeyspaceGroupManager(loadKeyspaceGroupsBatchSize, uuid.New().String(), suite.cfg) +} + +func (suite *keyspaceGroupManagerTestSuite) newKeyspaceGroupManager( + loadKeyspaceGroupsBatchSize int64, // set to 0 to use the default value + uniqueStr string, + cfg *TestServiceConfig, +) *KeyspaceGroupManager { + tsoServiceID := &discovery.ServiceRegistryEntry{ServiceAddr: cfg.GetAdvertiseListenAddr()} + tsoServiceKey := discovery.ServicePath(uniqueStr, "tso") + "/" legacySvcRootPath := path.Join("/pd", uniqueStr) tsoSvcRootPath := path.Join("/ms", uniqueStr, "tso") - electionNamePrefix := "kgm-test-" + uniqueStr - - keyspaceGroupManager := suite.newKeyspaceGroupManager(tsoServiceID, electionNamePrefix, legacySvcRootPath, tsoSvcRootPath) + electionNamePrefix := "kgm-test-" + cfg.GetAdvertiseListenAddr() + kgm := NewKeyspaceGroupManager( + suite.ctx, tsoServiceID, suite.etcdClient, nil, electionNamePrefix, + tsoServiceKey, legacySvcRootPath, tsoSvcRootPath, cfg) if loadKeyspaceGroupsBatchSize != 0 { - keyspaceGroupManager.loadKeyspaceGroupsBatchSize = loadKeyspaceGroupsBatchSize + kgm.loadKeyspaceGroupsBatchSize = loadKeyspaceGroupsBatchSize } - return keyspaceGroupManager + return kgm } // putKeyspaceGroupToEtcd puts a keyspace group to etcd. @@ -783,19 +790,21 @@ func deleteKeyspaceGroupInEtcd( // addKeyspaceGroupAssignment adds a keyspace group assignment to etcd. func addKeyspaceGroupAssignment( - ctx context.Context, etcdClient *clientv3.Client, - assignToMe bool, rootPath, svcAddr string, - groupID uint32, keyspaces []uint32, + ctx context.Context, + etcdClient *clientv3.Client, + groupID uint32, + rootPath string, + svcAddrs []string, + priorites []int, + keyspaces []uint32, ) error { - var location string - if assignToMe { - location = svcAddr - } else { - location = uuid.NewString() + members := make([]endpoint.KeyspaceGroupMember, len(svcAddrs)) + for i, svcAddr := range svcAddrs { + members[i] = endpoint.KeyspaceGroupMember{Address: svcAddr, Priority: priorites[i]} } group := &endpoint.KeyspaceGroup{ ID: groupID, - Members: []endpoint.KeyspaceGroupMember{{Address: location}}, + Members: members, Keyspaces: keyspaces, } @@ -968,3 +977,185 @@ func (suite *keyspaceGroupManagerTestSuite) TestGroupSplitUpdateRetry() { return reflect.DeepEqual(expectedGroupIDs, assignedGroupIDs) }) } + +// TestPrimaryPriorityChange tests the case that the primary priority of a keyspace group changes +// and the locations of the primaries should be updated accordingly. +func (suite *keyspaceGroupManagerTestSuite) TestPrimaryPriorityChange() { + re := suite.Require() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastPrimaryPriorityCheck", `return(true)`)) + defer func() { + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastPrimaryPriorityCheck")) + }() + + var err error + defaultPriority := mcsutils.DefaultKeyspaceGroupReplicaPriority + uniqueStr := uuid.New().String() + rootPath := path.Join("/pd", uniqueStr) + cfg1 := suite.createConfig() + cfg2 := suite.createConfig() + svcAddr1 := cfg1.GetAdvertiseListenAddr() + svcAddr2 := cfg2.GetAdvertiseListenAddr() + + // Register TSO server 1 + err = suite.registerTSOServer(re, uniqueStr, svcAddr1, cfg1) + re.NoError(err) + defer func() { + re.NoError(suite.deregisterTSOServer(uniqueStr, svcAddr1)) + }() + + // Create three keyspace groups on two TSO servers with default replica priority. + ids := []uint32{0, mcsutils.MaxKeyspaceGroupCountInUse / 2, mcsutils.MaxKeyspaceGroupCountInUse - 1} + for _, id := range ids { + addKeyspaceGroupAssignment( + suite.ctx, suite.etcdClient, id, rootPath, + []string{svcAddr1, svcAddr2}, []int{defaultPriority, defaultPriority}, []uint32{id}) + } + + // Create the first TSO server which loads all three keyspace groups created above. + // All primaries should be on the first TSO server. + mgr1 := suite.newKeyspaceGroupManager(1, uniqueStr, cfg1) + re.NotNil(mgr1) + defer mgr1.Close() + err = mgr1.Initialize() + re.NoError(err) + // Wait until all keyspace groups are ready for serving tso requests. + waitForPrimariesServing(re, []*KeyspaceGroupManager{mgr1, mgr1, mgr1}, ids) + + // We increase the priority of the TSO server 2 which hasn't started yet. The primaries + // on the TSO server 1 shouldn't move. + for _, id := range ids { + addKeyspaceGroupAssignment( + suite.ctx, suite.etcdClient, id, rootPath, + []string{svcAddr1, svcAddr2}, []int{defaultPriority, defaultPriority + 1}, []uint32{id}) + } + + // And the primaries on TSO Server 1 should continue to serve TSO requests without any failures. + for i := 0; i < 100; i++ { + for _, id := range ids { + _, keyspaceGroupBelongTo, err := mgr1.HandleTSORequest(id, id, GlobalDCLocation, 1) + re.NoError(err) + re.Equal(id, keyspaceGroupBelongTo) + } + } + + // Continually sending TSO requests to the TSO server 1 to make sure the primaries will move back + // to it at the end of test + ctx, cancel := context.WithCancel(context.Background()) + var wg sync.WaitGroup + checkTSO(ctx, re, &wg, mgr1, ids) + + // Create the Second TSO server. + err = suite.registerTSOServer(re, uniqueStr, svcAddr2, cfg2) + re.NoError(err) + mgr2 := suite.newKeyspaceGroupManager(1, uniqueStr, cfg2) + re.NotNil(mgr2) + err = mgr2.Initialize() + re.NoError(err) + // All primaries should eventually move to the second TSO server because of the higher priority. + waitForPrimariesServing(re, []*KeyspaceGroupManager{mgr2, mgr2, mgr2}, ids) + + // Shutdown the second TSO server. + mgr2.Close() + re.NoError(suite.deregisterTSOServer(uniqueStr, svcAddr2)) + // The primaries should move back to the first TSO server. + waitForPrimariesServing(re, []*KeyspaceGroupManager{mgr1, mgr1, mgr1}, ids) + + // Restart the Second TSO server. + err = suite.registerTSOServer(re, uniqueStr, svcAddr2, cfg2) + re.NoError(err) + defer func() { + re.NoError(suite.deregisterTSOServer(uniqueStr, svcAddr2)) + }() + mgr2 = suite.newKeyspaceGroupManager(1, uniqueStr, cfg2) + re.NotNil(mgr2) + defer mgr2.Close() + err = mgr2.Initialize() + re.NoError(err) + // All primaries should eventually move to the second TSO server because of the higher priority. + waitForPrimariesServing(re, []*KeyspaceGroupManager{mgr2, mgr2, mgr2}, ids) + + mgrs := []*KeyspaceGroupManager{mgr2, mgr2, mgr2} + for i, id := range ids { + // Set the keyspace group replica on the first TSO server to have higher priority. + addKeyspaceGroupAssignment( + suite.ctx, suite.etcdClient, id, rootPath, + []string{svcAddr1, svcAddr2}, []int{defaultPriority - 1, defaultPriority - 2}, []uint32{id}) + // The primary of this keyspace group should move back to the first TSO server. + mgrs[i] = mgr1 + waitForPrimariesServing(re, mgrs, ids) + } + + cancel() + wg.Wait() +} + +// Register TSO server. +func (suite *keyspaceGroupManagerTestSuite) registerTSOServer( + re *require.Assertions, clusterID, svcAddr string, cfg *TestServiceConfig, +) error { + // Register TSO server 1 + serviceID := &discovery.ServiceRegistryEntry{ServiceAddr: cfg.GetAdvertiseListenAddr()} + serializedEntry, err := serviceID.Serialize() + re.NoError(err) + serviceKey := discovery.RegistryPath(clusterID, mcsutils.TSOServiceName, svcAddr) + _, err = suite.etcdClient.Put(suite.ctx, serviceKey, serializedEntry) + return err +} + +// Deregister TSO server. +func (suite *keyspaceGroupManagerTestSuite) deregisterTSOServer(clusterID, svcAddr string) error { + serviceKey := discovery.RegistryPath(clusterID, mcsutils.TSOServiceName, svcAddr) + if _, err := suite.etcdClient.Delete(suite.ctx, serviceKey); err != nil { + return err + } + return nil +} + +func checkTSO( + ctx context.Context, re *require.Assertions, wg *sync.WaitGroup, + mgr *KeyspaceGroupManager, ids []uint32, +) { + wg.Add(len(ids)) + for _, id := range ids { + go func(id uint32) { + defer wg.Done() + var ts, lastTS uint64 + for { + select { + case <-ctx.Done(): + // Make sure the lastTS is not empty + re.NotEmpty(lastTS) + return + default: + } + respTS, respGroupID, err := mgr.HandleTSORequest(id, id, GlobalDCLocation, 1) + // omit the error check since there are many kinds of errors during primaries movement + if err != nil { + continue + } + re.Equal(id, respGroupID) + ts = tsoutil.ComposeTS(respTS.Physical, respTS.Logical) + re.Less(lastTS, ts) + lastTS = ts + } + }(id) + } +} + +func waitForPrimariesServing( + re *require.Assertions, mgrs []*KeyspaceGroupManager, ids []uint32, +) { + testutil.Eventually(re, func() bool { + for i := 0; i < 100; i++ { + for j, id := range ids { + if member, err := mgrs[j].GetElectionMember(id, id); err != nil || !member.IsLeader() { + return false + } + if _, _, err := mgrs[j].HandleTSORequest(id, id, GlobalDCLocation, 1); err != nil { + return false + } + } + } + return true + }, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond)) +} diff --git a/pkg/tso/local_allocator.go b/pkg/tso/local_allocator.go index 9c2867966bc..9995d5cec3f 100644 --- a/pkg/tso/local_allocator.go +++ b/pkg/tso/local_allocator.go @@ -72,7 +72,6 @@ func NewLocalTSOAllocator( leadership: leadership, timestampOracle: ×tampOracle{ client: leadership.GetClient(), - rootPath: am.rootPath, tsPath: tsPath, storage: am.storage, saveInterval: am.saveInterval, diff --git a/pkg/tso/tso.go b/pkg/tso/tso.go index aa1a424d8cd..54f0cb927be 100644 --- a/pkg/tso/tso.go +++ b/pkg/tso/tso.go @@ -60,8 +60,7 @@ type tsoObject struct { // timestampOracle is used to maintain the logic of TSO. type timestampOracle struct { - client *clientv3.Client - rootPath string + client *clientv3.Client // When tsPath is empty, it means that it is a global timestampOracle. tsPath string storage endpoint.TSOStorage diff --git a/pkg/utils/tsoutil/tso_dispatcher.go b/pkg/utils/tsoutil/tso_dispatcher.go index 152a3996538..69baf4b1e41 100644 --- a/pkg/utils/tsoutil/tso_dispatcher.go +++ b/pkg/utils/tsoutil/tso_dispatcher.go @@ -58,7 +58,7 @@ func NewTSODispatcher(tsoProxyHandleDuration, tsoProxyBatchSize prometheus.Histo return tsoDispatcher } -// DispatchRequest is the entry point for dispatching/forwarding a tso request to the detination host +// DispatchRequest is the entry point for dispatching/forwarding a tso request to the destination host func (s *TSODispatcher) DispatchRequest( ctx context.Context, req Request, @@ -69,9 +69,9 @@ func (s *TSODispatcher) DispatchRequest( val, loaded := s.dispatchChs.LoadOrStore(req.getForwardedHost(), make(chan Request, maxMergeRequests)) reqCh := val.(chan Request) if !loaded { - tsDeadlineCh := make(chan deadline, 1) + tsDeadlineCh := make(chan *TSDeadline, 1) go s.dispatch(ctx, tsoProtoFactory, req.getForwardedHost(), req.getClientConn(), reqCh, tsDeadlineCh, doneCh, errCh, tsoPrimaryWatchers...) - go watchTSDeadline(ctx, tsDeadlineCh) + go WatchTSDeadline(ctx, tsDeadlineCh) } reqCh <- req } @@ -82,7 +82,7 @@ func (s *TSODispatcher) dispatch( forwardedHost string, clientConn *grpc.ClientConn, tsoRequestCh <-chan Request, - tsDeadlineCh chan<- deadline, + tsDeadlineCh chan<- *TSDeadline, doneCh <-chan struct{}, errCh chan<- error, tsoPrimaryWatchers ...*etcdutil.LoopWatcher) { @@ -121,11 +121,7 @@ func (s *TSODispatcher) dispatch( requests[i] = <-tsoRequestCh } done := make(chan struct{}) - dl := deadline{ - timer: time.After(DefaultTSOProxyTimeout), - done: done, - cancel: cancel, - } + dl := NewTSDeadline(DefaultTSOProxyTimeout, done, cancel) select { case tsDeadlineCh <- dl: case <-dispatcherCtx.Done(): @@ -199,13 +195,28 @@ func (s *TSODispatcher) finishRequest(requests []Request, physical, firstLogical return nil } -type deadline struct { +// TSDeadline is used to watch the deadline of each tso request. +type TSDeadline struct { timer <-chan time.Time done chan struct{} cancel context.CancelFunc } -func watchTSDeadline(ctx context.Context, tsDeadlineCh <-chan deadline) { +// NewTSDeadline creates a new TSDeadline. +func NewTSDeadline( + timeout time.Duration, + done chan struct{}, + cancel context.CancelFunc, +) *TSDeadline { + return &TSDeadline{ + timer: time.After(timeout), + done: done, + cancel: cancel, + } +} + +// WatchTSDeadline watches the deadline of each tso request. +func WatchTSDeadline(ctx context.Context, tsDeadlineCh <-chan *TSDeadline) { defer logutil.LogPanic() ctx, cancel := context.WithCancel(ctx) defer cancel() diff --git a/server/apiv2/handlers/tso_keyspace_group.go b/server/apiv2/handlers/tso_keyspace_group.go index 7e4eb49964e..e7c06bf608b 100644 --- a/server/apiv2/handlers/tso_keyspace_group.go +++ b/server/apiv2/handlers/tso_keyspace_group.go @@ -44,6 +44,8 @@ func RegisterTSOKeyspaceGroup(r *gin.RouterGroup) { router.POST("/:id/priority", SetPriorityForKeyspaceGroup) router.POST("/:id/split", SplitKeyspaceGroupByID) router.DELETE("/:id/split", FinishSplitKeyspaceByID) + router.POST("/:id/merge", MergeKeyspaceGroups) + router.DELETE("/:id/merge", FinishMergeKeyspaceByID) } // CreateKeyspaceGroupParams defines the params for creating keyspace groups. @@ -159,6 +161,9 @@ func DeleteKeyspaceGroupByID(c *gin.Context) { type SplitKeyspaceGroupByIDParams struct { NewID uint32 `json:"new-id"` Keyspaces []uint32 `json:"keyspaces"` + // StartKeyspaceID and EndKeyspaceID are used to indicate the range of keyspaces to be split. + StartKeyspaceID uint32 `json:"start-keyspace-id"` + EndKeyspaceID uint32 `json:"end-keyspace-id"` } var patrolKeyspaceAssignmentState struct { @@ -184,10 +189,15 @@ func SplitKeyspaceGroupByID(c *gin.Context) { c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id") return } - if len(splitParams.Keyspaces) == 0 { + if len(splitParams.Keyspaces) == 0 && splitParams.StartKeyspaceID == 0 && splitParams.EndKeyspaceID == 0 { c.AbortWithStatusJSON(http.StatusBadRequest, "invalid empty keyspaces") return } + if splitParams.StartKeyspaceID < utils.DefaultKeyspaceID || + splitParams.StartKeyspaceID > splitParams.EndKeyspaceID { + c.AbortWithStatusJSON(http.StatusBadRequest, "invalid start/end keyspace id") + return + } svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) patrolKeyspaceAssignmentState.Lock() @@ -198,7 +208,7 @@ func SplitKeyspaceGroupByID(c *gin.Context) { c.AbortWithStatusJSON(http.StatusInternalServerError, managerUninitializedErr) return } - err = manager.PatrolKeyspaceAssignment() + err = manager.PatrolKeyspaceAssignment(splitParams.StartKeyspaceID, splitParams.EndKeyspaceID) if err != nil { c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) patrolKeyspaceAssignmentState.Unlock() @@ -207,13 +217,15 @@ func SplitKeyspaceGroupByID(c *gin.Context) { patrolKeyspaceAssignmentState.patrolled = true } patrolKeyspaceAssignmentState.Unlock() - // Split keyspace group. groupManager := svr.GetKeyspaceGroupManager() if groupManager == nil { c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr) return } - err = groupManager.SplitKeyspaceGroupByID(id, splitParams.NewID, splitParams.Keyspaces) + // Split keyspace group. + err = groupManager.SplitKeyspaceGroupByID( + id, splitParams.NewID, + splitParams.Keyspaces, splitParams.StartKeyspaceID, splitParams.EndKeyspaceID) if err != nil { c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) return @@ -239,6 +251,68 @@ func FinishSplitKeyspaceByID(c *gin.Context) { c.JSON(http.StatusOK, nil) } +// MergeKeyspaceGroupsParams defines the params for merging the keyspace groups. +type MergeKeyspaceGroupsParams struct { + MergeList []uint32 `json:"merge-list"` +} + +// MergeKeyspaceGroups merges the keyspace groups in the merge list into the target keyspace group. +func MergeKeyspaceGroups(c *gin.Context) { + id, err := validateKeyspaceGroupID(c) + if err != nil { + c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id") + return + } + mergeParams := &MergeKeyspaceGroupsParams{} + err = c.BindJSON(mergeParams) + if err != nil { + c.AbortWithStatusJSON(http.StatusBadRequest, errs.ErrBindJSON.Wrap(err).GenWithStackByCause()) + return + } + if len(mergeParams.MergeList) == 0 { + c.AbortWithStatusJSON(http.StatusBadRequest, "invalid empty merge list") + return + } + for _, mergeID := range mergeParams.MergeList { + if !isValid(mergeID) { + c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id") + return + } + } + + svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) + groupManager := svr.GetKeyspaceGroupManager() + if groupManager == nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, groupManagerUninitializedErr) + return + } + // Merge keyspace group. + err = groupManager.MergeKeyspaceGroups(id, mergeParams.MergeList) + if err != nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) + return + } + c.JSON(http.StatusOK, nil) +} + +// FinishMergeKeyspaceByID finishes merging keyspace group by ID. +func FinishMergeKeyspaceByID(c *gin.Context) { + id, err := validateKeyspaceGroupID(c) + if err != nil { + c.AbortWithStatusJSON(http.StatusBadRequest, "invalid keyspace group id") + return + } + + svr := c.MustGet(middlewares.ServerContextKey).(*server.Server) + manager := svr.GetKeyspaceGroupManager() + err = manager.FinishMergeKeyspaceByID(id) + if err != nil { + c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error()) + return + } + c.JSON(http.StatusOK, nil) +} + // AllocNodesForKeyspaceGroupParams defines the params for allocating nodes for keyspace groups. type AllocNodesForKeyspaceGroupParams struct { Replica int `json:"replica"` diff --git a/server/grpc_service.go b/server/grpc_service.go index 1f14ebed52e..86998f7d2c8 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -404,16 +404,17 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error { var ( server = &tsoServer{stream: stream} forwardStream tsopb.TSO_TsoClient - cancel context.CancelFunc + forwardCtx context.Context + cancelForward context.CancelFunc lastForwardedHost string ) defer func() { s.concurrentTSOProxyStreamings.Add(-1) - // cancel the forward stream - if cancel != nil { - cancel() + if cancelForward != nil { + cancelForward() } }() + maxConcurrentTSOProxyStreamings := int32(s.GetMaxConcurrentTSOProxyStreamings()) if maxConcurrentTSOProxyStreamings >= 0 { if newCount := s.concurrentTSOProxyStreamings.Add(1); newCount > maxConcurrentTSOProxyStreamings { @@ -421,6 +422,9 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error { } } + tsDeadlineCh := make(chan *tsoutil.TSDeadline, 1) + go tsoutil.WatchTSDeadline(stream.Context(), tsDeadlineCh) + for { select { case <-s.ctx.Done(): @@ -447,22 +451,24 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error { return errors.WithStack(ErrNotFoundTSOAddr) } if forwardStream == nil || lastForwardedHost != forwardedHost { - if cancel != nil { - cancel() + if cancelForward != nil { + cancelForward() } clientConn, err := s.getDelegateClient(s.ctx, forwardedHost) if err != nil { return errors.WithStack(err) } - forwardStream, cancel, err = s.createTSOForwardStream(clientConn) + forwardStream, forwardCtx, cancelForward, err = + s.createTSOForwardStream(stream.Context(), clientConn) if err != nil { return errors.WithStack(err) } lastForwardedHost = forwardedHost } - tsopbResp, err := s.forwardTSORequestWithDeadLine(stream.Context(), request, forwardStream) + tsopbResp, err := s.forwardTSORequestWithDeadLine( + forwardCtx, cancelForward, forwardStream, request, tsDeadlineCh) if err != nil { return errors.WithStack(err) } @@ -500,37 +506,39 @@ func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error { } func (s *GrpcServer) forwardTSORequestWithDeadLine( - ctx context.Context, request *pdpb.TsoRequest, forwardStream tsopb.TSO_TsoClient, + forwardCtx context.Context, + cancelForward context.CancelFunc, + forwardStream tsopb.TSO_TsoClient, + request *pdpb.TsoRequest, + tsDeadlineCh chan<- *tsoutil.TSDeadline, ) (*tsopb.TsoResponse, error) { - defer logutil.LogPanic() - // Create a context with deadline for forwarding TSO request to TSO service. - ctxTimeout, cancel := context.WithTimeout(ctx, tsoutil.DefaultTSOProxyTimeout) - defer cancel() - - tsoProxyBatchSize.Observe(float64(request.GetCount())) + done := make(chan struct{}) + dl := tsoutil.NewTSDeadline(tsoutil.DefaultTSOProxyTimeout, done, cancelForward) + select { + case tsDeadlineCh <- dl: + case <-forwardCtx.Done(): + return nil, forwardCtx.Err() + } - // used to receive the result from doSomething function - tsoRespCh := make(chan *tsopbTSOResponse, 1) start := time.Now() - go s.forwardTSORequestAsync(ctxTimeout, request, forwardStream, tsoRespCh) - select { - case <-ctxTimeout.Done(): - tsoProxyForwardTimeoutCounter.Inc() - return nil, ErrForwardTSOTimeout - case tsoResp := <-tsoRespCh: - if tsoResp.err == nil { - tsoProxyHandleDuration.Observe(time.Since(start).Seconds()) + resp, err := s.forwardTSORequest(forwardCtx, request, forwardStream) + close(done) + if err != nil { + if strings.Contains(err.Error(), errs.NotLeaderErr) { + s.tsoPrimaryWatcher.ForceLoad() } - return tsoResp.response, tsoResp.err + return nil, err } + tsoProxyBatchSize.Observe(float64(request.GetCount())) + tsoProxyHandleDuration.Observe(time.Since(start).Seconds()) + return resp, nil } -func (s *GrpcServer) forwardTSORequestAsync( - ctxTimeout context.Context, +func (s *GrpcServer) forwardTSORequest( + ctx context.Context, request *pdpb.TsoRequest, forwardStream tsopb.TSO_TsoClient, - tsoRespCh chan<- *tsopbTSOResponse, -) { +) (*tsopb.TsoResponse, error) { tsopbReq := &tsopb.TsoRequest{ Header: &tsopb.RequestHeader{ ClusterId: request.GetHeader().GetClusterId(), @@ -543,46 +551,32 @@ func (s *GrpcServer) forwardTSORequestAsync( } failpoint.Inject("tsoProxySendToTSOTimeout", func() { - <-ctxTimeout.Done() - failpoint.Return() + // block until watchDeadline routine cancels the context. + <-ctx.Done() }) - if err := forwardStream.Send(tsopbReq); err != nil { - select { - case <-ctxTimeout.Done(): - return - case tsoRespCh <- &tsopbTSOResponse{err: err}: - } - return - } - select { - case <-ctxTimeout.Done(): - return + case <-ctx.Done(): + return nil, ctx.Err() default: } + if err := forwardStream.Send(tsopbReq); err != nil { + return nil, err + } + failpoint.Inject("tsoProxyRecvFromTSOTimeout", func() { - <-ctxTimeout.Done() - failpoint.Return() + // block until watchDeadline routine cancels the context. + <-ctx.Done() }) - response, err := forwardStream.Recv() - if err != nil { - if strings.Contains(err.Error(), errs.NotLeaderErr) { - s.tsoPrimaryWatcher.ForceLoad() - } - } select { - case <-ctxTimeout.Done(): - return - case tsoRespCh <- &tsopbTSOResponse{response: response, err: err}: + case <-ctx.Done(): + return nil, ctx.Err() + default: } -} -type tsopbTSOResponse struct { - response *tsopb.TsoResponse - err error + return forwardStream.Recv() } // tsoServer wraps PD_TsoServer to ensure when any error @@ -2138,13 +2132,15 @@ func forwardRegionHeartbeatClientToServer(forwardStream pdpb.PD_RegionHeartbeatC } } -func (s *GrpcServer) createTSOForwardStream(client *grpc.ClientConn) (tsopb.TSO_TsoClient, context.CancelFunc, error) { +func (s *GrpcServer) createTSOForwardStream( + ctx context.Context, client *grpc.ClientConn, +) (tsopb.TSO_TsoClient, context.Context, context.CancelFunc, error) { done := make(chan struct{}) - ctx, cancel := context.WithCancel(s.ctx) - go checkStream(ctx, cancel, done) - forwardStream, err := tsopb.NewTSOClient(client).Tso(ctx) + forwardCtx, cancelForward := context.WithCancel(ctx) + go checkStream(forwardCtx, cancelForward, done) + forwardStream, err := tsopb.NewTSOClient(client).Tso(forwardCtx) done <- struct{}{} - return forwardStream, cancel, err + return forwardStream, forwardCtx, cancelForward, err } func (s *GrpcServer) createReportBucketsForwardStream(client *grpc.ClientConn) (pdpb.PD_ReportBucketsClient, context.CancelFunc, error) { diff --git a/tests/integrations/mcs/cluster.go b/tests/integrations/mcs/cluster.go index dbc9964b62b..961f26728c6 100644 --- a/tests/integrations/mcs/cluster.go +++ b/tests/integrations/mcs/cluster.go @@ -151,7 +151,8 @@ func (tc *TestTSOCluster) GetServers() map[string]*tso.Server { func (tc *TestTSOCluster) GetKeyspaceGroupMember() (members []endpoint.KeyspaceGroupMember) { for _, server := range tc.servers { members = append(members, endpoint.KeyspaceGroupMember{ - Address: server.GetAddr(), + Address: server.GetAddr(), + Priority: mcsutils.DefaultKeyspaceGroupReplicaPriority, }) } return diff --git a/tests/integrations/mcs/tso/keyspace_group_manager_test.go b/tests/integrations/mcs/tso/keyspace_group_manager_test.go index ed3bfe35280..98c6b90ca28 100644 --- a/tests/integrations/mcs/tso/keyspace_group_manager_test.go +++ b/tests/integrations/mcs/tso/keyspace_group_manager_test.go @@ -29,6 +29,7 @@ import ( "github.com/stretchr/testify/suite" pd "github.com/tikv/pd/client" "github.com/tikv/pd/pkg/election" + "github.com/tikv/pd/pkg/errs" mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/storage/endpoint" @@ -263,7 +264,7 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() { err error ) testutil.Eventually(re, func() bool { - ts, err = suite.requestTSO(re, 1, 222, 1) + ts, err = suite.requestTSO(re, 222, 1) return err == nil && tsoutil.CompareTimestamp(&ts, &pdpb.Timestamp{}) > 0 }) ts.Physical += time.Hour.Milliseconds() @@ -282,22 +283,22 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplit() { // Check the split TSO from keyspace group 2. var splitTS pdpb.Timestamp testutil.Eventually(re, func() bool { - splitTS, err = suite.requestTSO(re, 1, 222, 2) + splitTS, err = suite.requestTSO(re, 222, 2) return err == nil && tsoutil.CompareTimestamp(&splitTS, &pdpb.Timestamp{}) > 0 }) - splitTS, err = suite.requestTSO(re, 1, 222, 2) + splitTS, err = suite.requestTSO(re, 222, 2) + re.NoError(err) re.Greater(tsoutil.CompareTimestamp(&splitTS, &ts), 0) } func (suite *tsoKeyspaceGroupManagerTestSuite) requestTSO( re *require.Assertions, - count, keyspaceID, keyspaceGroupID uint32, + keyspaceID, keyspaceGroupID uint32, ) (pdpb.Timestamp, error) { primary := suite.tsoCluster.WaitForPrimaryServing(re, keyspaceID, keyspaceGroupID) kgm := primary.GetKeyspaceGroupManager() re.NotNil(kgm) - ts, _, err := kgm.HandleTSORequest(keyspaceID, keyspaceGroupID, tsopkg.GlobalDCLocation, count) - re.NoError(err) + ts, _, err := kgm.HandleTSORequest(keyspaceID, keyspaceGroupID, tsopkg.GlobalDCLocation, 1) return ts, err } @@ -357,31 +358,58 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitElection re.Equal(member1.GetLeaderListenUrls(), member2.GetLeaderListenUrls()) // Finish the split. handlersutil.MustFinishSplitKeyspaceGroup(re, suite.pdLeaderServer, 2) + // Wait for the keyspace groups to finish the split. + waitFinishSplit(re, suite.pdLeaderServer, 1, 2, []uint32{111}, []uint32{222, 333}) +} + +func waitFinishSplit( + re *require.Assertions, + server *tests.TestServer, + splitSourceID, splitTargetID uint32, + splitSourceKeyspaces, splitTargetKeyspaces []uint32, +) { + testutil.Eventually(re, func() bool { + kg := handlersutil.MustLoadKeyspaceGroupByID(re, server, splitTargetID) + re.Equal(splitTargetID, kg.ID) + re.Equal(splitTargetKeyspaces, kg.Keyspaces) + return !kg.IsSplitTarget() + }) + testutil.Eventually(re, func() bool { + kg := handlersutil.MustLoadKeyspaceGroupByID(re, server, splitSourceID) + re.Equal(splitSourceID, kg.ID) + re.Equal(splitSourceKeyspaces, kg.Keyspaces) + return !kg.IsSplitSource() + }) } func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitClient() { re := suite.Require() - // Create the keyspace group 1 with keyspaces [111, 222, 333]. + // Enable the failpoint to slow down the system time to test whether the TSO is monotonic. + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/systemTimeSlow", `return(true)`)) + // Create the keyspace group 1 with keyspaces [444, 555, 666]. handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ KeyspaceGroups: []*endpoint.KeyspaceGroup{ { ID: 1, UserKind: endpoint.Standard.String(), Members: suite.tsoCluster.GetKeyspaceGroupMember(), - Keyspaces: []uint32{111, 222, 333}, + Keyspaces: []uint32{444, 555, 666}, }, }, }) kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 1) re.Equal(uint32(1), kg1.ID) - re.Equal([]uint32{111, 222, 333}, kg1.Keyspaces) + re.Equal([]uint32{444, 555, 666}, kg1.Keyspaces) re.False(kg1.IsSplitting()) - // Prepare the client for keyspace 222. - var tsoClient pd.TSOClient - tsoClient, err := pd.NewClientWithKeyspace(suite.ctx, 222, []string{suite.pdLeaderServer.GetAddr()}, pd.SecurityOption{}) + // Make sure the leader of the keyspace group 1 is elected. + member, err := suite.tsoCluster.WaitForPrimaryServing(re, 555, 1).GetMember(555, 1) + re.NoError(err) + re.NotNil(member) + // Prepare the client for keyspace 555. + tsoClient, err := pd.NewClientWithKeyspace(suite.ctx, 555, []string{suite.pdLeaderServer.GetAddr()}, pd.SecurityOption{}) re.NoError(err) re.NotNil(tsoClient) - // Request the TSO for keyspace 222 concurrently. + // Request the TSO for keyspace 555 concurrently. var ( wg sync.WaitGroup ctx, cancel = context.WithCancel(suite.ctx) @@ -421,19 +449,14 @@ func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupSplitClient() // Split the keyspace group 1 to 2. handlersutil.MustSplitKeyspaceGroup(re, suite.pdLeaderServer, 1, &handlers.SplitKeyspaceGroupByIDParams{ NewID: 2, - Keyspaces: []uint32{222, 333}, + Keyspaces: []uint32{555, 666}, }) - kg2 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 2) - re.Equal(uint32(2), kg2.ID) - re.Equal([]uint32{222, 333}, kg2.Keyspaces) - re.True(kg2.IsSplitTarget()) - // Finish the split. - handlersutil.MustFinishSplitKeyspaceGroup(re, suite.pdLeaderServer, 2) - // Wait for a while to make sure the client has received the new TSO. - time.Sleep(time.Second) + // Wait for the keyspace groups to finish the split. + waitFinishSplit(re, suite.pdLeaderServer, 1, 2, []uint32{444}, []uint32{555, 666}) // Stop the client. cancel() wg.Wait() + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/systemTimeSlow")) } func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMembers() { @@ -474,7 +497,7 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) { re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) // Init api server config but not start. - tc, err := tests.NewTestAPICluster(ctx, 1, func(conf *config.Config, serverName string) { + tc, err := tests.NewTestAPICluster(ctx, 1, func(conf *config.Config, _ string) { conf.Keyspace.PreAlloc = []string{ "keyspace_a", "keyspace_b", } @@ -513,7 +536,7 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) { // Wait pd clients are ready. testutil.Eventually(re, func() bool { count := 0 - clients.Range(func(key, value interface{}) bool { + clients.Range(func(_, _ interface{}) bool { count++ return true }) @@ -535,10 +558,9 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) { // Trigger checkTSOSplit to ensure the split is finished. testutil.Eventually(re, func() bool { _, _, err = clientB.(pd.Client).GetTS(ctx) - re.NoError(err) - kg := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 0) - return !kg.IsSplitting() + return err == nil }) + waitFinishSplit(re, leaderServer, 0, 1, []uint32{mcsutils.DefaultKeyspaceID, 1}, []uint32{2}) clientB.(pd.Client).Close() // Then split keyspace group 0 to 2 with keyspace 1. @@ -550,10 +572,9 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) { // Trigger checkTSOSplit to ensure the split is finished. testutil.Eventually(re, func() bool { _, _, err = clientA.(pd.Client).GetTS(ctx) - re.NoError(err) - kg := handlersutil.MustLoadKeyspaceGroupByID(re, leaderServer, 0) - return !kg.IsSplitting() + return err == nil }) + waitFinishSplit(re, leaderServer, 0, 2, []uint32{mcsutils.DefaultKeyspaceID}, []uint32{1}) clientA.(pd.Client).Close() // Check the keyspace group 0 is split to 1 and 2. @@ -569,3 +590,139 @@ func TestTwiceSplitKeyspaceGroup(t *testing.T) { re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes")) } + +func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMerge() { + re := suite.Require() + // Create the keyspace group 1 and 2 with keyspaces [111, 222] and [333]. + handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ + KeyspaceGroups: []*endpoint.KeyspaceGroup{ + { + ID: 1, + UserKind: endpoint.Standard.String(), + Members: suite.tsoCluster.GetKeyspaceGroupMember(), + Keyspaces: []uint32{111, 222}, + }, + { + ID: 2, + UserKind: endpoint.Standard.String(), + Members: suite.tsoCluster.GetKeyspaceGroupMember(), + Keyspaces: []uint32{333}, + }, + }, + }) + // Get a TSO from the keyspace group 1. + var ( + ts pdpb.Timestamp + err error + ) + testutil.Eventually(re, func() bool { + ts, err = suite.requestTSO(re, 222, 1) + return err == nil && tsoutil.CompareTimestamp(&ts, &pdpb.Timestamp{}) > 0 + }) + ts.Physical += time.Hour.Milliseconds() + // Set the TSO of the keyspace group 1 to a large value. + err = suite.tsoCluster.GetPrimaryServer(222, 1).GetHandler().ResetTS(tsoutil.GenerateTS(&ts), false, true, 1) + re.NoError(err) + // Merge the keyspace group 1 and 2 to the default keyspace group. + handlersutil.MustMergeKeyspaceGroup(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, &handlers.MergeKeyspaceGroupsParams{ + MergeList: []uint32{1, 2}, + }) + // Check the keyspace group 1 and 2 are merged to the default keyspace group. + kg := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID) + re.Equal(mcsutils.DefaultKeyspaceGroupID, kg.ID) + for _, keyspaceID := range []uint32{111, 222, 333} { + re.Contains(kg.Keyspaces, keyspaceID) + } + re.True(kg.IsMergeTarget()) + // Check the merged TSO from the default keyspace group is greater than the TSO from the keyspace group 1. + var mergedTS pdpb.Timestamp + testutil.Eventually(re, func() bool { + mergedTS, err = suite.requestTSO(re, 333, mcsutils.DefaultKeyspaceGroupID) + if err != nil { + re.ErrorIs(err, errs.ErrKeyspaceGroupIsMerging) + } + return err == nil && tsoutil.CompareTimestamp(&mergedTS, &pdpb.Timestamp{}) > 0 + }, testutil.WithTickInterval(5*time.Second), testutil.WithWaitFor(time.Minute)) + re.Greater(tsoutil.CompareTimestamp(&mergedTS, &ts), 0) +} + +func (suite *tsoKeyspaceGroupManagerTestSuite) TestTSOKeyspaceGroupMergeClient() { + re := suite.Require() + // Create the keyspace group 1 with keyspaces [111, 222, 333]. + handlersutil.MustCreateKeyspaceGroup(re, suite.pdLeaderServer, &handlers.CreateKeyspaceGroupParams{ + KeyspaceGroups: []*endpoint.KeyspaceGroup{ + { + ID: 1, + UserKind: endpoint.Standard.String(), + Members: suite.tsoCluster.GetKeyspaceGroupMember(), + Keyspaces: []uint32{111, 222, 333}, + }, + }, + }) + kg1 := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, 1) + re.Equal(uint32(1), kg1.ID) + re.Equal([]uint32{111, 222, 333}, kg1.Keyspaces) + re.False(kg1.IsMerging()) + // Make sure the leader of the keyspace group 1 is elected. + member, err := suite.tsoCluster.WaitForPrimaryServing(re, 222, 1).GetMember(222, 1) + re.NoError(err) + re.NotNil(member) + // Prepare the client for keyspace 222. + tsoClient, err := pd.NewClientWithKeyspace(suite.ctx, 222, []string{suite.pdLeaderServer.GetAddr()}, pd.SecurityOption{}) + re.NoError(err) + re.NotNil(tsoClient) + // Request the TSO for keyspace 222 concurrently. + var ( + wg sync.WaitGroup + ctx, cancel = context.WithCancel(suite.ctx) + lastPhysical, lastLogical int64 + ) + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-ctx.Done(): + // Make sure at least one TSO request is successful. + re.NotEmpty(lastPhysical) + return + default: + } + physical, logical, err := tsoClient.GetTS(ctx) + if err != nil { + errMsg := err.Error() + // Ignore the errors caused by the merge and context cancellation. + if strings.Contains(errMsg, "context canceled") || + strings.Contains(errMsg, "not leader") || + strings.Contains(errMsg, "not served") || + strings.Contains(errMsg, "ErrKeyspaceNotAssigned") || + strings.Contains(errMsg, "ErrKeyspaceGroupIsMerging") { + continue + } + re.FailNow(errMsg) + } + if physical == lastPhysical { + re.Greater(logical, lastLogical) + } else { + re.Greater(physical, lastPhysical) + } + lastPhysical, lastLogical = physical, logical + } + }() + // Merge the keyspace group 1 to the default keyspace group. + handlersutil.MustMergeKeyspaceGroup(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID, &handlers.MergeKeyspaceGroupsParams{ + MergeList: []uint32{1}, + }) + // Wait for the default keyspace group to finish the merge. + testutil.Eventually(re, func() bool { + kg := handlersutil.MustLoadKeyspaceGroupByID(re, suite.pdLeaderServer, mcsutils.DefaultKeyspaceGroupID) + re.Equal(mcsutils.DefaultKeyspaceGroupID, kg.ID) + for _, keyspaceID := range []uint32{111, 222, 333} { + re.Contains(kg.Keyspaces, keyspaceID) + } + return !kg.IsMergeTarget() + }) + // Stop the client. + cancel() + wg.Wait() +} diff --git a/tests/pdctl/keyspace/keyspace_group_test.go b/tests/pdctl/keyspace/keyspace_group_test.go index 0c44700f48e..80f9eaa8420 100644 --- a/tests/pdctl/keyspace/keyspace_group_test.go +++ b/tests/pdctl/keyspace/keyspace_group_test.go @@ -277,3 +277,76 @@ func TestSetNodeAndPriorityKeyspaceGroup(t *testing.T) { re.NoError(err) re.Contains(string(output), "Failed to parse the priority") } + +func TestMergeKeyspaceGroup(t *testing.T) { + re := require.New(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes", `return(true)`)) + re.NoError(failpoint.Enable("github.com/tikv/pd/server/delayStartServerLoop", `return(true)`)) + keyspaces := make([]string, 0) + // we test the case which exceed the default max txn ops limit in etcd, which is 128. + for i := 0; i < 129; i++ { + keyspaces = append(keyspaces, fmt.Sprintf("keyspace_%d", i)) + } + tc, err := tests.NewTestAPICluster(ctx, 3, func(conf *config.Config, serverName string) { + conf.Keyspace.PreAlloc = keyspaces + }) + re.NoError(err) + err = tc.RunInitialServers() + re.NoError(err) + pdAddr := tc.GetConfig().GetClientURL() + + _, tsoServerCleanup1, err := tests.StartSingleTSOTestServer(ctx, re, pdAddr, tempurl.Alloc()) + defer tsoServerCleanup1() + re.NoError(err) + _, tsoServerCleanup2, err := tests.StartSingleTSOTestServer(ctx, re, pdAddr, tempurl.Alloc()) + defer tsoServerCleanup2() + re.NoError(err) + cmd := pdctlCmd.GetRootCmd() + + tc.WaitLeader() + leaderServer := tc.GetServer(tc.GetLeader()) + re.NoError(leaderServer.BootstrapCluster()) + + // split keyspace group. + testutil.Eventually(re, func() bool { + args := []string{"-u", pdAddr, "keyspace-group", "split", "0", "1", "2"} + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + return strings.Contains(string(output), "Success") + }) + + args := []string{"-u", pdAddr, "keyspace-group", "finish-split", "0"} + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + strings.Contains(string(output), "Success") + args = []string{"-u", pdAddr, "keyspace-group", "finish-split", "1"} + output, err = pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + strings.Contains(string(output), "Success") + + // merge keyspace group. + testutil.Eventually(re, func() bool { + args := []string{"-u", pdAddr, "keyspace-group", "merge", "0", "1"} + output, err := pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + return strings.Contains(string(output), "Success") + }) + + args = []string{"-u", pdAddr, "keyspace-group", "finish-merge", "0"} + output, err = pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + strings.Contains(string(output), "Success") + args = []string{"-u", pdAddr, "keyspace-group", "0"} + output, err = pdctl.ExecuteCommand(cmd, args...) + re.NoError(err) + var keyspaceGroup endpoint.KeyspaceGroup + err = json.Unmarshal(output, &keyspaceGroup) + re.NoError(err) + re.Len(keyspaceGroup.Keyspaces, 130) + re.Nil(keyspaceGroup.MergeState) + + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/keyspace/acceleratedAllocNodes")) + re.NoError(failpoint.Disable("github.com/tikv/pd/server/delayStartServerLoop")) +} diff --git a/tests/server/apiv2/handlers/testutil.go b/tests/server/apiv2/handlers/testutil.go index b638f1bbba4..900cd84b829 100644 --- a/tests/server/apiv2/handlers/testutil.go +++ b/tests/server/apiv2/handlers/testutil.go @@ -205,7 +205,7 @@ func MustDeleteKeyspaceGroup(re *require.Assertions, server *tests.TestServer, i re.Equal(http.StatusOK, resp.StatusCode, string(data)) } -// MustSplitKeyspaceGroup updates a keyspace group with HTTP API. +// MustSplitKeyspaceGroup splits a keyspace group with HTTP API. func MustSplitKeyspaceGroup(re *require.Assertions, server *tests.TestServer, id uint32, request *handlers.SplitKeyspaceGroupByIDParams) { data, err := json.Marshal(request) re.NoError(err) @@ -232,3 +232,18 @@ func MustFinishSplitKeyspaceGroup(re *require.Assertions, server *tests.TestServ re.NoError(err) re.Equal(http.StatusOK, resp.StatusCode, string(data)) } + +// MustMergeKeyspaceGroup merges keyspace groups with HTTP API. +func MustMergeKeyspaceGroup(re *require.Assertions, server *tests.TestServer, id uint32, request *handlers.MergeKeyspaceGroupsParams) { + data, err := json.Marshal(request) + re.NoError(err) + httpReq, err := http.NewRequest(http.MethodPost, server.GetAddr()+keyspaceGroupsPrefix+fmt.Sprintf("/%d/merge", id), bytes.NewBuffer(data)) + re.NoError(err) + // Send request. + resp, err := dialClient.Do(httpReq) + re.NoError(err) + defer resp.Body.Close() + data, err = io.ReadAll(resp.Body) + re.NoError(err) + re.Equal(http.StatusOK, resp.StatusCode, string(data)) +} diff --git a/tools/pd-ctl/pdctl/command/keyspace_group_command.go b/tools/pd-ctl/pdctl/command/keyspace_group_command.go index 662a4aa157e..3e46df39e63 100644 --- a/tools/pd-ctl/pdctl/command/keyspace_group_command.go +++ b/tools/pd-ctl/pdctl/command/keyspace_group_command.go @@ -33,6 +33,9 @@ func NewKeyspaceGroupCommand() *cobra.Command { Run: showKeyspaceGroupCommandFunc, } cmd.AddCommand(newSplitKeyspaceGroupCommand()) + cmd.AddCommand(newFinishSplitKeyspaceGroupCommand()) + cmd.AddCommand(newMergeKeyspaceGroupCommand()) + cmd.AddCommand(newFinishMergeKeyspaceGroupCommand()) cmd.AddCommand(newSetNodesKeyspaceGroupCommand()) cmd.AddCommand(newSetPriorityKeyspaceGroupCommand()) return cmd @@ -47,6 +50,35 @@ func newSplitKeyspaceGroupCommand() *cobra.Command { return r } +func newFinishSplitKeyspaceGroupCommand() *cobra.Command { + r := &cobra.Command{ + Use: "finish-split ", + Short: "finish split the keyspace group with the given ID", + Run: finishSplitKeyspaceGroupCommandFunc, + Hidden: true, + } + return r +} + +func newMergeKeyspaceGroupCommand() *cobra.Command { + r := &cobra.Command{ + Use: "merge []", + Short: "merge the keyspace group with the given IDs into the target one", + Run: mergeKeyspaceGroupCommandFunc, + } + return r +} + +func newFinishMergeKeyspaceGroupCommand() *cobra.Command { + r := &cobra.Command{ + Use: "finish-merge ", + Short: "finish merge the keyspace group with the given ID", + Run: finishMergeKeyspaceGroupCommandFunc, + Hidden: true, + } + return r +} + func newSetNodesKeyspaceGroupCommand() *cobra.Command { r := &cobra.Command{ Use: "set-node [...]", @@ -108,6 +140,66 @@ func splitKeyspaceGroupCommandFunc(cmd *cobra.Command, args []string) { }) } +func finishSplitKeyspaceGroupCommandFunc(cmd *cobra.Command, args []string) { + if len(args) < 1 { + cmd.Usage() + return + } + _, err := strconv.ParseUint(args[0], 10, 32) + if err != nil { + cmd.Printf("Failed to parse the keyspace group ID: %s\n", err) + return + } + _, err = doRequest(cmd, fmt.Sprintf("%s/%s/split", keyspaceGroupsPrefix, args[0]), http.MethodDelete, http.Header{}) + if err != nil { + cmd.Println(err) + return + } + cmd.Println("Success!") +} + +func mergeKeyspaceGroupCommandFunc(cmd *cobra.Command, args []string) { + if len(args) < 2 { + cmd.Usage() + return + } + _, err := strconv.ParseUint(args[0], 10, 32) + if err != nil { + cmd.Printf("Failed to parse the target keyspace group ID: %s\n", err) + return + } + groups := make([]uint32, 0, len(args)-1) + for _, arg := range args[1:] { + id, err := strconv.ParseUint(arg, 10, 32) + if err != nil { + cmd.Printf("Failed to parse the keyspace ID: %s\n", err) + return + } + groups = append(groups, uint32(id)) + } + postJSON(cmd, fmt.Sprintf("%s/%s/merge", keyspaceGroupsPrefix, args[0]), map[string]interface{}{ + "merge-list": groups, + }) +} + +func finishMergeKeyspaceGroupCommandFunc(cmd *cobra.Command, args []string) { + if len(args) < 1 { + cmd.Usage() + return + } + _, err := strconv.ParseUint(args[0], 10, 32) + if err != nil { + cmd.Printf("Failed to parse the keyspace group ID: %s\n", err) + return + } + _, err = doRequest(cmd, fmt.Sprintf("%s/%s/merge", keyspaceGroupsPrefix, args[0]), http.MethodDelete, http.Header{}) + if err != nil { + cmd.Println(err) + return + } + cmd.Println("Success!") +} + func setNodesKeyspaceGroupCommandFunc(cmd *cobra.Command, args []string) { if len(args) < 2 { cmd.Usage()