Skip to content

Commit

Permalink
core: batch get region size (#7252) (#7693)
Browse files Browse the repository at this point in the history
close #7248

Signed-off-by: ti-chi-bot <ti-community-prow-bot@tidb.io>
Signed-off-by: Ryan Leung <rleungx@gmail.com>

Co-authored-by: Ryan Leung <rleungx@gmail.com>
Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Feb 28, 2024
1 parent 63bf970 commit a2f526d
Show file tree
Hide file tree
Showing 5 changed files with 145 additions and 20 deletions.
2 changes: 1 addition & 1 deletion server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1453,8 +1453,8 @@ func (c *RaftCluster) checkStores() {
}
} else if c.IsPrepared() {
threshold := c.getThreshold(stores, store)
log.Debug("store serving threshold", zap.Uint64("store-id", storeID), zap.Float64("threshold", threshold))
regionSize := float64(store.GetRegionSize())
log.Debug("store serving threshold", zap.Uint64("store-id", storeID), zap.Float64("threshold", threshold), zap.Float64("region-size", regionSize))
if regionSize >= threshold {
if err := c.ReadyToServe(storeID); err != nil {
log.Error("change store to serving failed",
Expand Down
31 changes: 28 additions & 3 deletions server/core/basic_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"go.uber.org/zap"
)

var scanRegionLimit = 1000

// BasicCluster provides basic data member and interface for a tikv cluster.
type BasicCluster struct {
Stores struct {
Expand Down Expand Up @@ -384,9 +386,32 @@ func (bc *BasicCluster) GetOverlaps(region *RegionInfo) []*RegionInfo {

// GetRegionSizeByRange scans regions intersecting [start key, end key), returns the total region size of this range.
func (bc *BasicCluster) GetRegionSizeByRange(startKey, endKey []byte) int64 {
bc.Regions.mu.RLock()
defer bc.Regions.mu.RUnlock()
return bc.Regions.GetRegionSizeByRange(startKey, endKey)
var size int64
for {
bc.Regions.mu.RLock()
var cnt int
bc.Regions.tree.scanRange(startKey, func(region *RegionInfo) bool {
if len(endKey) > 0 && bytes.Compare(region.GetStartKey(), endKey) >= 0 {
return false
}
if cnt >= scanRegionLimit {
return false
}
cnt++
startKey = region.GetEndKey()
size += region.GetApproximateSize()
return true
})
bc.Regions.mu.RUnlock()
if cnt == 0 {
break
}
if len(startKey) == 0 {
break
}
}

return size
}

func (bc *BasicCluster) getWriteRate(
Expand Down
13 changes: 0 additions & 13 deletions server/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -1191,19 +1191,6 @@ func (r *RegionsInfo) ScanRangeWithIterator(startKey []byte, iterator func(regio
r.tree.scanRange(startKey, iterator)
}

// GetRegionSizeByRange scans regions intersecting [start key, end key), returns the total region size of this range.
func (r *RegionsInfo) GetRegionSizeByRange(startKey, endKey []byte) int64 {
var size int64
r.tree.scanRange(startKey, func(region *RegionInfo) bool {
if len(endKey) > 0 && bytes.Compare(region.GetStartKey(), endKey) >= 0 {
return false
}
size += region.GetApproximateSize()
return true
})
return size
}

// GetAdjacentRegions returns region's info that is adjacent with specific region
func (r *RegionsInfo) GetAdjacentRegions(region *RegionInfo) (*RegionInfo, *RegionInfo) {
p, n := r.tree.getAdjacentRegions(region)
Expand Down
113 changes: 113 additions & 0 deletions server/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@ import (
"strconv"
"strings"
"testing"
"time"

. "github.com/pingcap/check"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/kvproto/pkg/pdpb"
"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/mock/mockid"
"github.com/tikv/pd/server/id"
)
Expand Down Expand Up @@ -662,6 +664,117 @@ func BenchmarkRandomRegion(b *testing.B) {
}
}

func BenchmarkRandomSetRegion(b *testing.B) {
cluster := NewBasicCluster()
var items []*RegionInfo
for i := 0; i < 1000000; i++ {
peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)}
region := NewRegionInfo(&metapb.Region{
Id: uint64(i + 1),
Peers: []*metapb.Peer{peer},
StartKey: []byte(fmt.Sprintf("%20d", i)),
EndKey: []byte(fmt.Sprintf("%20d", i+1)),
}, peer)
cluster.Regions.SetRegion(region)
items = append(items, region)
}
b.ResetTimer()
for i := 0; i < b.N; i++ {
item := items[i%len(items)]
item.approximateKeys = int64(200000)
item.approximateSize = int64(20)
cluster.Regions.SetRegion(item)
}
}

func TestGetRegionSizeByRange(t *testing.T) {
cluster := NewBasicCluster()
nums := 1000010
for i := 0; i < nums; i++ {
peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)}
endKey := []byte(fmt.Sprintf("%20d", i+1))
if i == nums-1 {
endKey = []byte("")
}
region := NewRegionInfo(&metapb.Region{
Id: uint64(i + 1),
Peers: []*metapb.Peer{peer},
StartKey: []byte(fmt.Sprintf("%20d", i)),
EndKey: endKey,
}, peer, SetApproximateSize(10))
cluster.Regions.SetRegion(region)
}
totalSize := cluster.GetRegionSizeByRange([]byte(""), []byte(""))
require.Equal(t, int64(nums*10), totalSize)
for i := 1; i < 10; i++ {
verifyNum := nums / i
endKey := fmt.Sprintf("%20d", verifyNum)
totalSize := cluster.GetRegionSizeByRange([]byte(""), []byte(endKey))
require.Equal(t, int64(verifyNum*10), totalSize)
}
}

func BenchmarkRandomSetRegionWithGetRegionSizeByRange(b *testing.B) {
cluster := NewBasicCluster()
var items []*RegionInfo
for i := 0; i < 1000000; i++ {
peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)}
region := NewRegionInfo(&metapb.Region{
Id: uint64(i + 1),
Peers: []*metapb.Peer{peer},
StartKey: []byte(fmt.Sprintf("%20d", i)),
EndKey: []byte(fmt.Sprintf("%20d", i+1)),
}, peer, SetApproximateSize(10))
cluster.Regions.SetRegion(region)
items = append(items, region)
}
b.ResetTimer()
go func() {
for {
cluster.GetRegionSizeByRange([]byte(""), []byte(""))
time.Sleep(time.Millisecond)
}
}()
for i := 0; i < b.N; i++ {
item := items[i%len(items)]
item.approximateKeys = int64(200000)
cluster.Regions.SetRegion(item)
}
}

func BenchmarkRandomSetRegionWithGetRegionSizeByRangeParallel(b *testing.B) {
cluster := NewBasicCluster()
var items []*RegionInfo
for i := 0; i < 1000000; i++ {
peer := &metapb.Peer{StoreId: 1, Id: uint64(i + 1)}
region := NewRegionInfo(&metapb.Region{
Id: uint64(i + 1),
Peers: []*metapb.Peer{peer},
StartKey: []byte(fmt.Sprintf("%20d", i)),
EndKey: []byte(fmt.Sprintf("%20d", i+1)),
}, peer)
cluster.Regions.SetRegion(region)
items = append(items, region)
}
b.ResetTimer()
go func() {
for {
cluster.GetRegionSizeByRange([]byte(""), []byte(""))
time.Sleep(time.Millisecond)
}
}()

b.RunParallel(
func(pb *testing.PB) {
for pb.Next() {
item := items[rand.Intn(len(items))]
n := item.Clone(SetApproximateSize(20))
cluster.Regions.SetRegion(n)
}
},
)
}

const keyLength = 100

func randomBytes(n int) []byte {
Expand Down
6 changes: 3 additions & 3 deletions tests/server/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -686,7 +686,7 @@ func (s *testProgressSuite) TestPreparingProgress(c *C) {
pdctl.MustPutStore(c, leader.GetServer(), store)
}
for i := 0; i < 100; i++ {
pdctl.MustPutRegion(c, cluster, uint64(i+1), uint64(i)%3+1, []byte(fmt.Sprintf("p%d", i)), []byte(fmt.Sprintf("%d", i+1)), core.SetApproximateSize(10))
pdctl.MustPutRegion(c, cluster, uint64(i+1), uint64(i)%3+1, []byte(fmt.Sprintf("%20d", i)), []byte(fmt.Sprintf("%20d", i+1)), core.SetApproximateSize(10))
}
// no store preparing
output := sendRequest(c, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusNotFound)
Expand All @@ -713,8 +713,8 @@ func (s *testProgressSuite) TestPreparingProgress(c *C) {
c.Assert(p.LeftSeconds, Equals, math.MaxFloat64)

// update size
pdctl.MustPutRegion(c, cluster, 1000, 4, []byte(fmt.Sprintf("%d", 1000)), []byte(fmt.Sprintf("%d", 1001)), core.SetApproximateSize(10))
pdctl.MustPutRegion(c, cluster, 1001, 5, []byte(fmt.Sprintf("%d", 1001)), []byte(fmt.Sprintf("%d", 1002)), core.SetApproximateSize(40))
pdctl.MustPutRegion(c, cluster, 1000, 4, []byte(fmt.Sprintf("%20d", 1000)), []byte(fmt.Sprintf("%20d", 1001)), core.SetApproximateSize(10))
pdctl.MustPutRegion(c, cluster, 1001, 5, []byte(fmt.Sprintf("%20d", 1001)), []byte(fmt.Sprintf("%20d", 1002)), core.SetApproximateSize(40))
time.Sleep(2 * time.Second)
output = sendRequest(c, leader.GetAddr()+"/pd/api/v1/stores/progress?action=preparing", http.MethodGet, http.StatusOK)
c.Assert(json.Unmarshal(output, &p), IsNil)
Expand Down

0 comments on commit a2f526d

Please sign in to comment.