Skip to content

Commit

Permalink
cherry pick tikv#1377 to tidb-7.5
Browse files Browse the repository at this point in the history
Signed-off-by: you06 <you1474600@gmail.com>
  • Loading branch information
you06 committed Jul 8, 2024
1 parent 7a74511 commit 3de8cca
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 3 deletions.
2 changes: 1 addition & 1 deletion integration_tests/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ require (
github.com/stretchr/testify v1.8.4
github.com/tidwall/gjson v1.14.1
github.com/tikv/client-go/v2 v2.0.8-0.20231025022411-cad314220659
github.com/tikv/pd/client v0.0.0-20231116062916-ef6ba8551e52
github.com/tikv/pd/client v0.0.0-20240202093025-6978558f4e97
go.uber.org/goleak v1.2.1
)

Expand Down
4 changes: 2 additions & 2 deletions integration_tests/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -507,8 +507,8 @@ github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
github.com/tikv/pd/client v0.0.0-20231116062916-ef6ba8551e52 h1:wucAo/ks8INgayRVfbrzZ+BSWEwRLETj0XfngDcrZ4k=
github.com/tikv/pd/client v0.0.0-20231116062916-ef6ba8551e52/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ=
github.com/tikv/pd/client v0.0.0-20240202093025-6978558f4e97 h1:LlsMm3/IIRRXrip19M0yYb4bZ7BvdQgzF77csNnGK1Y=
github.com/tikv/pd/client v0.0.0-20240202093025-6978558f4e97/go.mod h1:AwjTSpM7CgAynYwB6qTG5R5fVC9/eXlQXiTO6zDL1HI=
github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs=
github.com/tklauser/go-sysconf v0.3.11 h1:89WgdJhk5SNwJfu+GKyYveZ4IaJ7xAkecBo+KdJV0CM=
github.com/tklauser/go-sysconf v0.3.11/go.mod h1:GqXfhXY3kiPa0nAXPDIQIWzJbMCB7AmcWpGR8lSZfqI=
Expand Down
49 changes: 49 additions & 0 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -1829,6 +1829,15 @@ func (c *RegionCache) scanRegions(bo *retry.Backoffer, startKey, endKey []byte,
util.HexRegionKeyStr(startKey), util.HexRegionKeyStr(endKey), limit,
util.HexRegionKeyStr(c.codec.EncodeRegionKey(startKey)), util.HexRegionKeyStr(c.codec.EncodeRegionKey(endKey)),
)
continue
}

if regionsHaveGapInRanges(startKey, endKey, regionsInfo, limit) {
backoffErr = errors.Errorf(
"PD returned regions have gaps, startKey: %q, endKey: %q, limit: %d",
startKey, endKey, limit,
)
continue
}
regions := make([]*Region, 0, len(regionsInfo))
for _, r := range regionsInfo {
Expand All @@ -1853,6 +1862,46 @@ func (c *RegionCache) scanRegions(bo *retry.Backoffer, startKey, endKey []byte,
}
}

// regionsHaveGapInRanges checks if the loaded regions can fully cover the key ranges.
// If there are any gaps between the regions, it returns true, then the requests might be retried.
// TODO: remove this function after PD client supports gap detection and handling it.
func regionsHaveGapInRanges(start, end []byte, regionsInfo []*pd.Region, limit int) bool {
if len(regionsInfo) == 0 {
return true
}
var lastEndKey []byte
for i, r := range regionsInfo {
if r.Meta == nil {
return true
}
if i == 0 {
if bytes.Compare(r.Meta.StartKey, start) > 0 {
// there is a gap between first returned region's start_key and start key.
return true
}
}
if i > 0 && bytes.Compare(r.Meta.StartKey, lastEndKey) > 0 {
// there is a gap between two regions.
return true
}
if len(r.Meta.EndKey) == 0 {
// the current region contains all the rest ranges.
return false
}
// note lastEndKey never be empty.
lastEndKey = r.Meta.EndKey
}
if limit > 0 && len(regionsInfo) == limit {
// the regionsInfo is limited by the limit, so there may be some ranges not covered.
// But the previous regions are continuous, so we just need to check the rest ranges.
return false
}
if len(end) == 0 {
return len(lastEndKey) != 0
}
return bytes.Compare(lastEndKey, end) < 0
}

// GetCachedRegionWithRLock returns region with lock.
func (c *RegionCache) GetCachedRegionWithRLock(regionID RegionVerID) (r *Region) {
c.mu.RLock()
Expand Down
105 changes: 105 additions & 0 deletions internal/locate/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1876,3 +1876,108 @@ func (s *testRegionCacheSuite) TestHealthCheckWithStoreReplace() {
return newStore1.getResolveState() == resolved && newStore1.getLivenessState() == reachable
}, 3*time.Second, time.Second)
}

func (s *testRegionCacheSuite) TestRangesAreCoveredCheck() {
check := func(ranges []string, regions []string, limit int, expect bool) {
s.Len(ranges, 2)
rgs := make([]*pd.Region, 0, len(regions))
for i := 0; i < len(regions); i += 2 {
rgs = append(rgs, &pd.Region{Meta: &metapb.Region{
StartKey: []byte(regions[i]),
EndKey: []byte(regions[i+1]),
}})
}
s.Equal(expect, regionsHaveGapInRanges([]byte(ranges[0]), []byte(ranges[1]), rgs, limit))
}

boundCase := []string{"a", "c"}
// positive
check(boundCase, []string{"a", "c"}, -1, false)
check(boundCase, []string{"a", ""}, -1, false)
check(boundCase, []string{"", "c"}, -1, false)
// negative
check(boundCase, []string{"a", "b"}, -1, true)
check(boundCase, []string{"b", "c"}, -1, true)
check(boundCase, []string{"b", ""}, -1, true)
check(boundCase, []string{"", "b"}, -1, true)
// positive
check(boundCase, []string{"a", "b", "b", "c"}, -1, false)
check(boundCase, []string{"", "b", "b", "c"}, -1, false)
check(boundCase, []string{"a", "b", "b", ""}, -1, false)
check(boundCase, []string{"", "b", "b", ""}, -1, false)
// negative
check(boundCase, []string{"a", "b", "b1", "c"}, -1, true)
check(boundCase, []string{"", "b", "b1", "c"}, -1, true)
check(boundCase, []string{"a", "b", "b1", ""}, -1, true)
check(boundCase, []string{"", "b", "b1", ""}, -1, true)
check(boundCase, []string{}, -1, true)

unboundCase := []string{"", ""}
// positive
check(unboundCase, []string{"", ""}, -1, false)
// negative
check(unboundCase, []string{"a", "c"}, -1, true)
check(unboundCase, []string{"a", ""}, -1, true)
check(unboundCase, []string{"", "c"}, -1, true)
// positive
check(unboundCase, []string{"", "b", "b", ""}, -1, false)
// negative
check(unboundCase, []string{"", "b", "b1", ""}, -1, true)
check(unboundCase, []string{"a", "b", "b", ""}, -1, true)
check(unboundCase, []string{"", "b", "b", "c"}, -1, true)
check(unboundCase, []string{}, -1, true)

// test half bounded ranges
check([]string{"", "b"}, []string{"", "a"}, -1, true)
check([]string{"", "b"}, []string{"", "a"}, 1, false) // it's just limitation reached
check([]string{"", "b"}, []string{"", "a"}, 2, true)
check([]string{"a", ""}, []string{"b", ""}, -1, true)
check([]string{"a", ""}, []string{"b", ""}, 1, true)
check([]string{"a", ""}, []string{"b", "c"}, 1, true)
check([]string{"a", ""}, []string{"a", ""}, -1, false)
}

func (s *testRegionCacheSuite) TestScanRegionsWithGaps() {
// Split at "a", "c", "e"
// nil --- 'a' --- 'c' --- 'e' --- nil
// <- 0 -> <- 1 -> <- 2 -> <- 3 -->
regions := s.cluster.AllocIDs(3)
regions = append([]uint64{s.region1}, regions...)

peers := [][]uint64{{s.peer1, s.peer2}}
for i := 0; i < 3; i++ {
peers = append(peers, s.cluster.AllocIDs(2))
}

for i := 0; i < 3; i++ {
s.cluster.Split(regions[i], regions[i+1], []byte{'a' + 2*byte(i)}, peers[i+1], peers[i+1][0])
}

// the last region is not reported to PD yet
getRegionIDsWithInject := func(fn func() ([]*Region, error)) []uint64 {
s.cache.clear()
err := failpoint.Enable("tikvclient/mockSplitRegionNotReportToPD", fmt.Sprintf(`return(%d)`, regions[2]))
s.Nil(err)
resCh := make(chan []*Region)
errCh := make(chan error)
go func() {
rs, err := fn()
errCh <- err
resCh <- rs
}()
time.Sleep(time.Second)
failpoint.Disable("tikvclient/mockSplitRegionNotReportToPD")
s.Nil(<-errCh)
rs := <-resCh
regionIDs := make([]uint64, 0, len(rs))
for _, r := range rs {
regionIDs = append(regionIDs, r.GetID())
}
return regionIDs
}

scanRegionRes := getRegionIDsWithInject(func() ([]*Region, error) {
return s.cache.BatchLoadRegionsWithKeyRange(s.bo, []byte(""), []byte(""), 10)
})
s.Equal(scanRegionRes, regions)
}
10 changes: 10 additions & 0 deletions internal/mockstore/mocktikv/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/tikv/client-go/v2/internal/mockstore/cluster"
"github.com/tikv/client-go/v2/util"
pd "github.com/tikv/pd/client"
)

Expand Down Expand Up @@ -377,6 +378,15 @@ func (c *Cluster) ScanRegions(startKey, endKey []byte, limit int) []*pd.Region {
regions = regions[:endPos]
}
}
if rid, err := util.EvalFailpoint("mockSplitRegionNotReportToPD"); err == nil {
notReportRegionID := uint64(rid.(int))
for i, r := range regions {
if r.Meta.Id == notReportRegionID {
regions = append(regions[:i], regions[i+1:]...)
break
}
}
}
if limit > 0 && len(regions) > limit {
regions = regions[:limit]
}
Expand Down

0 comments on commit 3de8cca

Please sign in to comment.