Skip to content

Commit

Permalink
mcs: pick some bug fix (tikv#129)
Browse files Browse the repository at this point in the history
* tools: support get all groups (tikv#6714)

ref tikv#5895, ref tikv#6706

Signed-off-by: Ryan Leung <rleungx@gmail.com>

* Fix data race between read APIs and finshiSplit/finishMerge in keyspace group manager (tikv#6723)

close tikv#6721

checkTSOMerge and checkTSOSplit will read from kgm.getKeyspaceGroupMeta

finishMergeKeyspaceGroup and finishSplitKeyspaceGroup will update kgm

so just return a copy to avoid data race

Signed-off-by: Bin Shi <binshi.bing@gmail.com>

* tso: fix memory leak introduced by timer.After (tikv#6730)

close tikv#6719, ref tikv#6720

Signed-off-by: lhy1024 <admin@liudos.us>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>

* mcs, tso: fix split and split-range command bugs. (tikv#6732)

close tikv#6687, close tikv#6731

Fix split and split-range command bugs.

Signed-off-by: Bin Shi <binshi.bing@gmail.com>

---------

Signed-off-by: Ryan Leung <rleungx@gmail.com>
Signed-off-by: Bin Shi <binshi.bing@gmail.com>
Co-authored-by: Ryan Leung <rleungx@gmail.com>
Co-authored-by: Bin Shi <39923490+binshi-bing@users.noreply.github.com>
Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
4 people authored Jul 1, 2023
1 parent 3b9ca3f commit 2d22ba6
Show file tree
Hide file tree
Showing 10 changed files with 269 additions and 57 deletions.
16 changes: 14 additions & 2 deletions pkg/keyspace/tso_keyspace_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -622,7 +622,17 @@ func buildSplitKeyspaces(
oldSplit = append(oldSplit, keyspace)
}
}
return oldSplit, new, nil
// If newNum != len(newKeyspaceMap), it means the provided new keyspace list contains
// duplicate keyspaces, and we need to dedup them (https://github.com/tikv/pd/issues/6687);
// otherwise, we can just return the old split and new keyspace list.
if newNum == len(newKeyspaceMap) {
return oldSplit, new, nil
}
newSplit := make([]uint32, 0, len(newKeyspaceMap))
for keyspace := range newKeyspaceMap {
newSplit = append(newSplit, keyspace)
}
return oldSplit, newSplit, nil
}
// Split according to the start and end keyspace ID.
if startKeyspaceID == 0 && endKeyspaceID == 0 {
Expand All @@ -634,7 +644,9 @@ func buildSplitKeyspaces(
)
for _, keyspace := range old {
if keyspace == utils.DefaultKeyspaceID {
return nil, nil, ErrModifyDefaultKeyspace
// The source keyspace group must be the default keyspace group and we always keep the default
// keyspace in the default keyspace group.
continue
}
if startKeyspaceID <= keyspace && keyspace <= endKeyspaceID {
newSplit = append(newSplit, keyspace)
Expand Down
41 changes: 41 additions & 0 deletions pkg/keyspace/tso_keyspace_group_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,27 +483,68 @@ func TestBuildSplitKeyspaces(t *testing.T) {
new: []uint32{6},
err: ErrKeyspaceNotInKeyspaceGroup,
},
{
old: []uint32{1, 2},
new: []uint32{2, 2},
expectedOld: []uint32{1},
expectedNew: []uint32{2},
},
{
old: []uint32{0, 1, 2, 3, 4, 5},
startKeyspaceID: 2,
endKeyspaceID: 4,
expectedOld: []uint32{0, 1, 5},
expectedNew: []uint32{2, 3, 4},
},
{
old: []uint32{0, 1, 2, 3, 4, 5},
startKeyspaceID: 0,
endKeyspaceID: 4,
expectedOld: []uint32{0, 5},
expectedNew: []uint32{1, 2, 3, 4},
},
{
old: []uint32{1, 2, 3, 4, 5},
startKeyspaceID: 2,
endKeyspaceID: 4,
expectedOld: []uint32{1, 5},
expectedNew: []uint32{2, 3, 4},
},
{
old: []uint32{1, 2, 3, 4, 5},
startKeyspaceID: 5,
endKeyspaceID: 6,
expectedOld: []uint32{1, 2, 3, 4},
expectedNew: []uint32{5},
},
{
old: []uint32{1, 2, 3, 4, 5},
startKeyspaceID: 2,
endKeyspaceID: 6,
expectedOld: []uint32{1},
expectedNew: []uint32{2, 3, 4, 5},
},
{
old: []uint32{1, 2, 3, 4, 5},
startKeyspaceID: 1,
endKeyspaceID: 1,
expectedOld: []uint32{2, 3, 4, 5},
expectedNew: []uint32{1},
},
{
old: []uint32{1, 2, 3, 4, 5},
startKeyspaceID: 0,
endKeyspaceID: 6,
expectedOld: []uint32{},
expectedNew: []uint32{1, 2, 3, 4, 5},
},
{
old: []uint32{1, 2, 3, 4, 5},
startKeyspaceID: 7,
endKeyspaceID: 10,
expectedOld: []uint32{1, 2, 3, 4, 5},
expectedNew: []uint32{},
},
{
old: []uint32{1, 2, 3, 4, 5},
err: ErrKeyspaceNotInKeyspaceGroup,
Expand Down
43 changes: 43 additions & 0 deletions pkg/timerpool/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2020 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133

package timerpool

import (
"sync"
"time"
)

// GlobalTimerPool is a global pool for reusing *time.Timer.
var GlobalTimerPool TimerPool

// TimerPool is a wrapper of sync.Pool which caches *time.Timer for reuse.
type TimerPool struct {
pool sync.Pool
}

// Get returns a timer with a given duration.
func (tp *TimerPool) Get(d time.Duration) *time.Timer {
if v := tp.pool.Get(); v != nil {
timer := v.(*time.Timer)
timer.Reset(d)
return timer
}
return time.NewTimer(d)
}

// Put tries to call timer.Stop() before putting it back into pool,
// if the timer.Stop() returns false (it has either already expired or been stopped),
// have a shot at draining the channel with residual time if there is one.
func (tp *TimerPool) Put(timer *time.Timer) {
if !timer.Stop() {
select {
case <-timer.C:
default:
}
}
tp.pool.Put(timer)
}
70 changes: 70 additions & 0 deletions pkg/timerpool/pool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright 2020 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Note: This file is copied from https://go-review.googlesource.com/c/go/+/276133

package timerpool

import (
"testing"
"time"
)

func TestTimerPool(t *testing.T) {
var tp TimerPool

for i := 0; i < 100; i++ {
timer := tp.Get(20 * time.Millisecond)

select {
case <-timer.C:
t.Errorf("timer expired too early")
continue
default:
}

select {
case <-time.After(100 * time.Millisecond):
t.Errorf("timer didn't expire on time")
case <-timer.C:
}

tp.Put(timer)
}
}

const timeout = 10 * time.Millisecond

func BenchmarkTimerUtilization(b *testing.B) {
b.Run("TimerWithPool", func(b *testing.B) {
for i := 0; i < b.N; i++ {
t := GlobalTimerPool.Get(timeout)
GlobalTimerPool.Put(t)
}
})
b.Run("TimerWithoutPool", func(b *testing.B) {
for i := 0; i < b.N; i++ {
t := time.NewTimer(timeout)
t.Stop()
}
})
}

func BenchmarkTimerPoolParallel(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
t := GlobalTimerPool.Get(timeout)
GlobalTimerPool.Put(t)
}
})
}

func BenchmarkTimerNativeParallel(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
t := time.NewTimer(timeout)
t.Stop()
}
})
}
98 changes: 60 additions & 38 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,39 @@ func (s *state) getKeyspaceGroupMeta(
return s.ams[groupID], s.kgs[groupID]
}

func (s *state) checkTSOSplit(
targetGroupID uint32,
) (splitTargetAM, splitSourceAM *AllocatorManager, err error) {
s.RLock()
defer s.RUnlock()
splitTargetAM, splitTargetGroup := s.ams[targetGroupID], s.kgs[targetGroupID]
// Only the split target keyspace group needs to check the TSO split.
if !splitTargetGroup.IsSplitTarget() {
return nil, nil, nil // it isn't in the split state
}
sourceGroupID := splitTargetGroup.SplitSource()
splitSourceAM, splitSourceGroup := s.ams[sourceGroupID], s.kgs[sourceGroupID]
if splitSourceAM == nil || splitSourceGroup == nil {
log.Error("the split source keyspace group is not initialized",
zap.Uint32("source", sourceGroupID))
return nil, nil, errs.ErrKeyspaceGroupNotInitialized.FastGenByArgs(sourceGroupID)
}
return splitTargetAM, splitSourceAM, nil
}

// Reject any request if the keyspace group is in merging state,
// we need to wait for the merging checker to finish the TSO merging.
func (s *state) checkTSOMerge(
groupID uint32,
) error {
s.RLock()
defer s.RUnlock()
if s.kgs[groupID] == nil || !s.kgs[groupID].IsMerging() {
return nil
}
return errs.ErrKeyspaceGroupIsMerging.FastGenByArgs(groupID)
}

// getKeyspaceGroupMetaWithCheck returns the keyspace group meta of the given keyspace.
// It also checks if the keyspace is served by the given keyspace group. If not, it returns the meta
// of the keyspace group to which the keyspace currently belongs and returns NotServed (by the given
Expand Down Expand Up @@ -957,7 +990,7 @@ func (kgm *KeyspaceGroupManager) HandleTSORequest(
if err != nil {
return pdpb.Timestamp{}, curKeyspaceGroupID, err
}
err = kgm.checkTSOMerge(curKeyspaceGroupID)
err = kgm.state.checkTSOMerge(curKeyspaceGroupID)
if err != nil {
return pdpb.Timestamp{}, curKeyspaceGroupID, err
}
Expand Down Expand Up @@ -1032,27 +1065,19 @@ func (kgm *KeyspaceGroupManager) checkTSOSplit(
keyspaceGroupID uint32,
dcLocation string,
) error {
splitAM, splitGroup := kgm.getKeyspaceGroupMeta(keyspaceGroupID)
// Only the split target keyspace group needs to check the TSO split.
if !splitGroup.IsSplitTarget() {
return nil
}
splitSource := splitGroup.SplitSource()
splitSourceAM, splitSourceGroup := kgm.getKeyspaceGroupMeta(splitSource)
if splitSourceAM == nil || splitSourceGroup == nil {
log.Error("the split source keyspace group is not initialized",
zap.Uint32("source", splitSource))
return errs.ErrKeyspaceGroupNotInitialized.FastGenByArgs(splitSource)
splitTargetAM, splitSourceAM, err := kgm.state.checkTSOSplit(keyspaceGroupID)
if err != nil || splitTargetAM == nil {
return err
}
splitAllocator, err := splitAM.GetAllocator(dcLocation)
splitTargetAllocator, err := splitTargetAM.GetAllocator(dcLocation)
if err != nil {
return err
}
splitSourceAllocator, err := splitSourceAM.GetAllocator(dcLocation)
if err != nil {
return err
}
splitTSO, err := splitAllocator.GenerateTSO(1)
splitTargetTSO, err := splitTargetAllocator.GenerateTSO(1)
if err != nil {
return err
}
Expand All @@ -1061,19 +1086,19 @@ func (kgm *KeyspaceGroupManager) checkTSOSplit(
return err
}
// If the split source TSO is not greater than the newly split TSO, we don't need to do anything.
if tsoutil.CompareTimestamp(&splitSourceTSO, &splitTSO) <= 0 {
if tsoutil.CompareTimestamp(&splitSourceTSO, &splitTargetTSO) <= 0 {
log.Info("the split source tso is less than the newly split tso",
zap.Int64("split-source-tso-physical", splitSourceTSO.Physical),
zap.Int64("split-source-tso-logical", splitSourceTSO.Logical),
zap.Int64("split-tso-physical", splitTSO.Physical),
zap.Int64("split-tso-logical", splitTSO.Logical))
zap.Int64("split-tso-physical", splitTargetTSO.Physical),
zap.Int64("split-tso-logical", splitTargetTSO.Logical))
// Finish the split state directly.
return kgm.finishSplitKeyspaceGroup(keyspaceGroupID)
}
// If the split source TSO is greater than the newly split TSO, we need to update the split
// TSO to make sure the following TSO will be greater than the split keyspaces ever had
// in the past.
err = splitAllocator.SetTSO(tsoutil.GenerateTS(&pdpb.Timestamp{
err = splitTargetAllocator.SetTSO(tsoutil.GenerateTS(&pdpb.Timestamp{
Physical: splitSourceTSO.Physical + 1,
Logical: splitSourceTSO.Logical,
}), true, true)
Expand All @@ -1083,8 +1108,8 @@ func (kgm *KeyspaceGroupManager) checkTSOSplit(
log.Info("the split source tso is greater than the newly split tso",
zap.Int64("split-source-tso-physical", splitSourceTSO.Physical),
zap.Int64("split-source-tso-logical", splitSourceTSO.Logical),
zap.Int64("split-tso-physical", splitTSO.Physical),
zap.Int64("split-tso-logical", splitTSO.Logical))
zap.Int64("split-tso-physical", splitTargetTSO.Physical),
zap.Int64("split-tso-logical", splitTargetTSO.Logical))
// Finish the split state.
return kgm.finishSplitKeyspaceGroup(keyspaceGroupID)
}
Expand Down Expand Up @@ -1116,9 +1141,13 @@ func (kgm *KeyspaceGroupManager) finishSplitKeyspaceGroup(id uint32) error {
zap.Int("status-code", statusCode))
return errs.ErrSendRequest.FastGenByArgs()
}
// Pre-update the split keyspace group split state in memory.
splitGroup.SplitState = nil
kgm.kgs[id] = splitGroup
// Pre-update the split keyspace group's split state in memory.
// Note: to avoid data race with state read APIs, we always replace the group in memory as a whole.
// For now, we only have scenarios to update split state/merge state, and the other fields are always
// loaded from etcd without any modification, so we can simply copy the group and replace the state.
newSplitGroup := *splitGroup
newSplitGroup.SplitState = nil
kgm.kgs[id] = &newSplitGroup
return nil
}

Expand Down Expand Up @@ -1146,9 +1175,14 @@ func (kgm *KeyspaceGroupManager) finishMergeKeyspaceGroup(id uint32) error {
zap.Int("status-code", statusCode))
return errs.ErrSendRequest.FastGenByArgs()
}
// Pre-update the split keyspace group split state in memory.
mergeTarget.MergeState = nil
kgm.kgs[id] = mergeTarget

// Pre-update the merge target keyspace group's merge state in memory.
// Note: to avoid data race with state read APIs, we always replace the group in memory as a whole.
// For now, we only have scenarios to update split state/merge state, and the other fields are always
// loaded from etcd without any modification, so we can simply copy the group and replace the state.
newTargetGroup := *mergeTarget
newTargetGroup.MergeState = nil
kgm.kgs[id] = &newTargetGroup
return nil
}

Expand Down Expand Up @@ -1286,15 +1320,3 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget
return
}
}

// Reject any request if the keyspace group is in merging state,
// we need to wait for the merging checker to finish the TSO merging.
func (kgm *KeyspaceGroupManager) checkTSOMerge(
keyspaceGroupID uint32,
) error {
_, group := kgm.getKeyspaceGroupMeta(keyspaceGroupID)
if !group.IsMerging() {
return nil
}
return errs.ErrKeyspaceGroupIsMerging.FastGenByArgs(keyspaceGroupID)
}
Loading

0 comments on commit 2d22ba6

Please sign in to comment.