Skip to content

Commit

Permalink
unistore: get/batchGet/scan support read-through-lock (#29898)
Browse files Browse the repository at this point in the history
  • Loading branch information
youjiali1995 authored Nov 29, 2021
1 parent 10d390d commit df113a1
Show file tree
Hide file tree
Showing 7 changed files with 223 additions and 83 deletions.
24 changes: 16 additions & 8 deletions br/pkg/lightning/restore/check_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"github.com/pingcap/tidb/br/pkg/lightning/mydump"
"github.com/pingcap/tidb/br/pkg/lightning/verification"
"github.com/pingcap/tidb/br/pkg/storage"
"github.com/pingcap/tidb/br/pkg/version"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/table"
Expand Down Expand Up @@ -179,6 +178,15 @@ func (rc *Controller) ClusterIsAvailable(ctx context.Context) error {
return nil
}

func isTiFlash(store *api.MetaStore) bool {
for _, label := range store.Labels {
if label.Key == "engine" && label.Value == "tiflash" {
return true
}
}
return false
}

func (rc *Controller) checkEmptyRegion(ctx context.Context) error {
passed := true
message := "Cluster doesn't have too many empty regions"
Expand Down Expand Up @@ -206,7 +214,7 @@ func (rc *Controller) checkEmptyRegion(ctx context.Context) error {
}
}
for _, store := range storeInfo.Stores {
stores[store.Store.Id] = store
stores[store.Store.StoreID] = store
}
tableCount := 0
for _, db := range rc.dbMetas {
Expand All @@ -224,10 +232,10 @@ func (rc *Controller) checkEmptyRegion(ctx context.Context) error {
)
for storeID, regionCnt := range regions {
if store, ok := stores[storeID]; ok {
if store.Store.State != metapb.StoreState_Up {
if metapb.StoreState(metapb.StoreState_value[store.Store.StateName]) != metapb.StoreState_Up {
continue
}
if version.IsTiFlash(store.Store.Store) {
if isTiFlash(store.Store) {
continue
}
if regionCnt > errorThrehold {
Expand Down Expand Up @@ -269,10 +277,10 @@ func (rc *Controller) checkRegionDistribution(ctx context.Context) error {
}
stores := make([]*api.StoreInfo, 0, len(result.Stores))
for _, store := range result.Stores {
if store.Store.State != metapb.StoreState_Up {
if metapb.StoreState(metapb.StoreState_value[store.Store.StateName]) != metapb.StoreState_Up {
continue
}
if version.IsTiFlash(store.Store.Store) {
if isTiFlash(store.Store) {
continue
}
stores = append(stores, store)
Expand Down Expand Up @@ -302,11 +310,11 @@ func (rc *Controller) checkRegionDistribution(ctx context.Context) error {
passed = false
message = fmt.Sprintf("Region distribution is unbalanced, the ratio of the regions count of the store(%v) "+
"with least regions(%v) to the store(%v) with most regions(%v) is %v, but we expect it must not be less than %v",
minStore.Store.Id, minStore.Status.RegionCount, maxStore.Store.Id, maxStore.Status.RegionCount, ratio, errorRegionCntMinMaxRatio)
minStore.Store.StoreID, minStore.Status.RegionCount, maxStore.Store.StoreID, maxStore.Status.RegionCount, ratio, errorRegionCntMinMaxRatio)
} else if ratio < warnRegionCntMinMaxRatio {
message = fmt.Sprintf("Region distribution is unbalanced, the ratio of the regions count of the store(%v) "+
"with least regions(%v) to the store(%v) with most regions(%v) is %v, but we expect it should not be less than %v",
minStore.Store.Id, minStore.Status.RegionCount, maxStore.Store.Id, maxStore.Status.RegionCount, ratio, warnRegionCntMinMaxRatio)
minStore.Store.StoreID, minStore.Status.RegionCount, maxStore.Store.StoreID, maxStore.Status.RegionCount, ratio, warnRegionCntMinMaxRatio)
}
return nil
}
Expand Down
20 changes: 10 additions & 10 deletions br/pkg/lightning/restore/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1979,7 +1979,7 @@ func (s *tableRestoreSuite) TestCheckClusterRegion(c *C) {
testCases := []testCase{
{
stores: api.StoresInfo{Stores: []*api.StoreInfo{
{Store: &api.MetaStore{Store: &metapb.Store{Id: 1}}, Status: &api.StoreStatus{RegionCount: 200}},
{Store: &api.MetaStore{StoreID: 1}, Status: &api.StoreStatus{RegionCount: 200}},
}},
emptyRegions: api.RegionsInfo{
Regions: append([]api.RegionInfo(nil), makeRegions(100, 1)...),
Expand All @@ -1990,9 +1990,9 @@ func (s *tableRestoreSuite) TestCheckClusterRegion(c *C) {
},
{
stores: api.StoresInfo{Stores: []*api.StoreInfo{
{Store: &api.MetaStore{Store: &metapb.Store{Id: 1}}, Status: &api.StoreStatus{RegionCount: 2000}},
{Store: &api.MetaStore{Store: &metapb.Store{Id: 2}}, Status: &api.StoreStatus{RegionCount: 3100}},
{Store: &api.MetaStore{Store: &metapb.Store{Id: 3}}, Status: &api.StoreStatus{RegionCount: 2500}},
{Store: &api.MetaStore{StoreID: 1}, Status: &api.StoreStatus{RegionCount: 2000}},
{Store: &api.MetaStore{StoreID: 2}, Status: &api.StoreStatus{RegionCount: 3100}},
{Store: &api.MetaStore{StoreID: 3}, Status: &api.StoreStatus{RegionCount: 2500}},
}},
emptyRegions: api.RegionsInfo{
Regions: append(append(append([]api.RegionInfo(nil),
Expand All @@ -2010,19 +2010,19 @@ func (s *tableRestoreSuite) TestCheckClusterRegion(c *C) {
},
{
stores: api.StoresInfo{Stores: []*api.StoreInfo{
{Store: &api.MetaStore{Store: &metapb.Store{Id: 1}}, Status: &api.StoreStatus{RegionCount: 1200}},
{Store: &api.MetaStore{Store: &metapb.Store{Id: 2}}, Status: &api.StoreStatus{RegionCount: 3000}},
{Store: &api.MetaStore{Store: &metapb.Store{Id: 3}}, Status: &api.StoreStatus{RegionCount: 2500}},
{Store: &api.MetaStore{StoreID: 1}, Status: &api.StoreStatus{RegionCount: 1200}},
{Store: &api.MetaStore{StoreID: 2}, Status: &api.StoreStatus{RegionCount: 3000}},
{Store: &api.MetaStore{StoreID: 3}, Status: &api.StoreStatus{RegionCount: 2500}},
}},
expectMsgs: []string{".*Region distribution is unbalanced.*but we expect it must not be less than 0.5.*"},
expectResult: false,
expectErrorCnt: 1,
},
{
stores: api.StoresInfo{Stores: []*api.StoreInfo{
{Store: &api.MetaStore{Store: &metapb.Store{Id: 1}}, Status: &api.StoreStatus{RegionCount: 0}},
{Store: &api.MetaStore{Store: &metapb.Store{Id: 2}}, Status: &api.StoreStatus{RegionCount: 2800}},
{Store: &api.MetaStore{Store: &metapb.Store{Id: 3}}, Status: &api.StoreStatus{RegionCount: 2500}},
{Store: &api.MetaStore{StoreID: 1}, Status: &api.StoreStatus{RegionCount: 0}},
{Store: &api.MetaStore{StoreID: 2}, Status: &api.StoreStatus{RegionCount: 2800}},
{Store: &api.MetaStore{StoreID: 3}, Status: &api.StoreStatus{RegionCount: 2500}},
}},
expectMsgs: []string{".*Region distribution is unbalanced.*but we expect it must not be less than 0.5.*"},
expectResult: false,
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ require (
github.com/pingcap/errors v0.11.5-0.20211009033009-93128226aaa3
github.com/pingcap/failpoint v0.0.0-20210316064728-7acb0f0a3dfd
github.com/pingcap/fn v0.0.0-20200306044125-d5540d389059
github.com/pingcap/kvproto v0.0.0-20211029081837-3c7bd947cf9b
github.com/pingcap/kvproto v0.0.0-20211122024046-03abd340988f
github.com/pingcap/log v0.0.0-20210906054005-afc726e70354
github.com/pingcap/sysutil v0.0.0-20210730114356-fcd8a63f68c5
github.com/pingcap/tidb-tools v5.2.2-0.20211019062242-37a8bef2fa17+incompatible
Expand All @@ -66,7 +66,7 @@ require (
github.com/stretchr/testify v1.7.0
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211118154139-b11da6307c6f
github.com/tikv/pd v1.1.0-beta.0.20211104095303-69c86d05d379
github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee
github.com/twmb/murmur3 v1.1.3
github.com/uber/jaeger-client-go v2.22.1+incompatible
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
Expand All @@ -76,7 +76,7 @@ require (
go.etcd.io/etcd v0.5.0-alpha.5.0.20210512015243-d19fbe541bf9
go.uber.org/atomic v1.9.0
go.uber.org/automaxprocs v1.4.0
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723
go.uber.org/goleak v1.1.12
go.uber.org/multierr v1.7.0
go.uber.org/zap v1.19.1
golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420
Expand Down
14 changes: 8 additions & 6 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -582,8 +582,9 @@ github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO
github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210819164333-bd5706b9d9f2/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20210915062418-0f5764a128ad/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20211029081837-3c7bd947cf9b h1:/aj6ITlHSJZmsm4hIMOgJAAZti+Dmq11tCyKedA6Dcs=
github.com/pingcap/kvproto v0.0.0-20211029081837-3c7bd947cf9b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20211109071446-a8b4d34474bc/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/kvproto v0.0.0-20211122024046-03abd340988f h1:hjInxK1Ie6CYx7Jy2pYnBdEnWI8jIfr423l9Yh6LRy8=
github.com/pingcap/kvproto v0.0.0-20211122024046-03abd340988f/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI=
github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
github.com/pingcap/log v0.0.0-20210317133921-96f4fcab92a4/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8=
Expand All @@ -594,7 +595,7 @@ github.com/pingcap/sysutil v0.0.0-20210315073920-cc0985d983a3/go.mod h1:tckvA041
github.com/pingcap/sysutil v0.0.0-20210730114356-fcd8a63f68c5 h1:7rvAtZe/ZUzOKzgriNPQoBNvleJXBk4z7L3Z47+tS98=
github.com/pingcap/sysutil v0.0.0-20210730114356-fcd8a63f68c5/go.mod h1:XsOaV712rUk63aOEKYP9PhXTIE3FMNHmC2r1wX5wElY=
github.com/pingcap/tidb-dashboard v0.0.0-20211008050453-a25c25809529/go.mod h1:OCXbZTBTIMRcIt0jFsuCakZP+goYRv6IjawKbwLS2TQ=
github.com/pingcap/tidb-dashboard v0.0.0-20211031170437-08e58c069a2a/go.mod h1:OCXbZTBTIMRcIt0jFsuCakZP+goYRv6IjawKbwLS2TQ=
github.com/pingcap/tidb-dashboard v0.0.0-20211107164327-80363dfbe884/go.mod h1:OCXbZTBTIMRcIt0jFsuCakZP+goYRv6IjawKbwLS2TQ=
github.com/pingcap/tidb-tools v5.2.2-0.20211019062242-37a8bef2fa17+incompatible h1:c7+izmker91NkjkZ6FgTlmD4k1A5FLOAq+li6Ki2/GY=
github.com/pingcap/tidb-tools v5.2.2-0.20211019062242-37a8bef2fa17+incompatible/go.mod h1:XGdcy9+yqlDSEMTpOXnwf3hiTeqrV6MN/u1se9N8yIM=
github.com/pingcap/tipb v0.0.0-20211116093845-e9b045a0bdf8 h1:Vu/6oq8EFNWgyXRHiclNzTKIu+YKHPCSI/Ba5oVrLtM=
Expand Down Expand Up @@ -713,8 +714,8 @@ github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhV
github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211118154139-b11da6307c6f h1:UyJjp3wGIjf1edGiQiIdAtL5QFqaqR4+s3LDwUZU7NY=
github.com/tikv/client-go/v2 v2.0.0-alpha.0.20211118154139-b11da6307c6f/go.mod h1:BEAS0vXm5BorlF/HTndqGwcGDvaiwe7B7BkfgwwZMJ4=
github.com/tikv/pd v1.1.0-beta.0.20211029083450-e65f0c55b6ae/go.mod h1:varH0IE0jJ9E9WN2Ei/N6pajMlPkcXdDEf7f5mmsUVQ=
github.com/tikv/pd v1.1.0-beta.0.20211104095303-69c86d05d379 h1:nFm1jQDz1iRktoyV2SyM5zVk6+PJHQNunJZ7ZJcqzAo=
github.com/tikv/pd v1.1.0-beta.0.20211104095303-69c86d05d379/go.mod h1:y+09hAUXJbrd4c0nktL74zXDDuD7atGtfOKxL90PCOE=
github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee h1:rAAdvQ8Hh36syHr92g0VmZEpkH+40RGQBpFL2121xMs=
github.com/tikv/pd v1.1.0-beta.0.20211118054146-02848d2660ee/go.mod h1:lRbwxBAhnTQR5vqbTzeI/Bj62bD2OvYYuFezo2vrmeI=
github.com/tklauser/go-sysconf v0.3.4 h1:HT8SVixZd3IzLdfs/xlpq0jeSfTX57g1v6wB1EuzV7M=
github.com/tklauser/go-sysconf v0.3.4/go.mod h1:Cl2c8ZRWfHD5IrfHo9VN+FX9kCFjIOyVklgXycLB6ek=
github.com/tklauser/numcpus v0.2.1 h1:ct88eFm+Q7m2ZfXJdan1xYoXKlmwsfP+k88q05KvlZc=
Expand Down Expand Up @@ -804,8 +805,9 @@ go.uber.org/dig v1.8.0/go.mod h1:X34SnWGr8Fyla9zQNO2GSO2D+TIuqB14OS8JhYocIyw=
go.uber.org/fx v1.10.0/go.mod h1:vLRicqpG/qQEzno4SYU86iCwfT95EZza+Eba0ItuxqY=
go.uber.org/goleak v0.10.0/go.mod h1:VCZuO8V8mFPlL0F5J5GK1rtHV3DrFcQ1R8ryq7FK0aI=
go.uber.org/goleak v1.1.10/go.mod h1:8a7PlsEVH3e/a/GLqe5IIrQx6GzcnRmZEufDUTk4A7A=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723 h1:sHOAIxRGBp443oHZIPB+HsUGaksVCXVQENPxwTfQdH4=
go.uber.org/goleak v1.1.11-0.20210813005559-691160354723/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA=
go.uber.org/goleak v1.1.12/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.4.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
Expand Down
109 changes: 75 additions & 34 deletions store/mockstore/unistore/tikv/mvcc.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (store *MVCCStore) PessimisticLock(reqCtx *requestCtx, req *kvrpcpb.Pessimi
for _, m := range mutations {
lock, err := store.checkConflictInLockStore(reqCtx, m, startTS)
if err != nil {
var resourceGroupTag []byte = nil
var resourceGroupTag []byte
if req.Context != nil {
resourceGroupTag = req.Context.ResourceGroupTag
}
Expand Down Expand Up @@ -1098,34 +1098,56 @@ func (store *MVCCStore) checkCommitted(reader *dbreader.DBReader, key []byte, st
return 0, nil
}

func checkLock(lock mvcc.Lock, key []byte, startTS uint64, resolved []uint64) error {
if isResolved(lock.StartTS, resolved) {
return nil
// LockPair contains a pair of key and lock. It's used for reading through locks.
type LockPair struct {
key []byte
lock *mvcc.Lock
}

func getValueFromLock(lock *mvcc.Lock) []byte {
if lock.Op == byte(kvrpcpb.Op_Put) {
// lock owns the value so needn't to safeCopy it.
return lock.Value
}
return nil
}

// *LockPair is not nil if the lock in the committed timestamp set. Read operations can get value from it without deep copy.
func checkLock(lock mvcc.Lock, key []byte, startTS uint64, resolved []uint64, committed []uint64) (*LockPair, error) {
if inTSSet(lock.StartTS, resolved) {
return nil, nil
}
lockVisible := lock.StartTS <= startTS
isWriteLock := lock.Op == uint8(kvrpcpb.Op_Put) || lock.Op == uint8(kvrpcpb.Op_Del)
isPrimaryGet := startTS == maxSystemTS && bytes.Equal(lock.Primary, key) && !lock.UseAsyncCommit
if lockVisible && isWriteLock && !isPrimaryGet {
return BuildLockErr(safeCopy(key), &lock)
if inTSSet(lock.StartTS, committed) {
return &LockPair{safeCopy(key), &lock}, nil
}
return nil, BuildLockErr(safeCopy(key), &lock)
}
return nil
return nil, nil
}

// CheckKeysLock implements the MVCCStore interface.
func (store *MVCCStore) CheckKeysLock(startTS uint64, resolved []uint64, keys ...[]byte) error {
func (store *MVCCStore) CheckKeysLock(startTS uint64, resolved, committed []uint64, keys ...[]byte) ([]*LockPair, error) {
var buf []byte
var lockPairs []*LockPair
for _, key := range keys {
buf = store.lockStore.Get(key, buf)
if len(buf) == 0 {
continue
}
lock := mvcc.DecodeLock(buf)
err := checkLock(lock, key, startTS, resolved)
lockPair, err := checkLock(lock, key, startTS, resolved, committed)
if lockPair != nil {
lockPairs = append(lockPairs, lockPair)
}
if err != nil {
return err
return nil, err
}
}
return nil
return lockPairs, nil
}

// CheckRangeLock implements the MVCCStore interface.
Expand All @@ -1136,7 +1158,7 @@ func (store *MVCCStore) CheckRangeLock(startTS uint64, startKey, endKey []byte,
break
}
lock := mvcc.DecodeLock(it.Value())
err := checkLock(lock, it.Key(), startTS, resolved)
_, err := checkLock(lock, it.Key(), startTS, resolved, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -1386,14 +1408,32 @@ func (store *MVCCStore) DeleteFileInRange(start, end []byte) {
store.db.DeleteFilesInRange(start, end)
}

// Get implements the MVCCStore interface.
func (store *MVCCStore) Get(reqCtx *requestCtx, key []byte, version uint64) ([]byte, error) {
lockPairs, err := store.CheckKeysLock(version, reqCtx.rpcCtx.ResolvedLocks, reqCtx.rpcCtx.CommittedLocks, key)
if err != nil {
return nil, err
}
if len(lockPairs) != 0 {
return getValueFromLock(lockPairs[0].lock), nil
}
val, err := reqCtx.getDBReader().Get(key, version)
return safeCopy(val), err
}

// BatchGet implements the MVCCStore interface.
func (store *MVCCStore) BatchGet(reqCtx *requestCtx, keys [][]byte, version uint64) []*kvrpcpb.KvPair {
pairs := make([]*kvrpcpb.KvPair, 0, len(keys))
remain := make([][]byte, 0, len(keys))
for _, key := range keys {
err := store.CheckKeysLock(version, reqCtx.rpcCtx.ResolvedLocks, key)
lockPairs, err := store.CheckKeysLock(version, reqCtx.rpcCtx.ResolvedLocks, reqCtx.rpcCtx.CommittedLocks, key)
if err != nil {
pairs = append(pairs, &kvrpcpb.KvPair{Key: key, Error: convertToKeyError(err)})
} else if len(lockPairs) != 0 {
value := getValueFromLock(lockPairs[0].lock)
if value != nil {
pairs = append(pairs, &kvrpcpb.KvPair{Key: key, Value: value})
}
} else {
remain = append(remain, key)
}
Expand All @@ -1411,16 +1451,22 @@ func (store *MVCCStore) BatchGet(reqCtx *requestCtx, keys [][]byte, version uint
return pairs
}

func (store *MVCCStore) collectRangeLock(startTS uint64, startKey, endKey []byte, resolved []uint64) []*kvrpcpb.KvPair {
func (store *MVCCStore) collectRangeLock(startTS uint64, startKey, endKey []byte, resolved, committed []uint64) []*kvrpcpb.KvPair {
var pairs []*kvrpcpb.KvPair
it := store.lockStore.NewIterator()
for it.Seek(startKey); it.Valid(); it.Next() {
if exceedEndKey(it.Key(), endKey) {
break
}
lock := mvcc.DecodeLock(it.Value())
err := checkLock(lock, it.Key(), startTS, resolved)
if err != nil {
lockPair, err := checkLock(lock, it.Key(), startTS, resolved, committed)
if lockPair != nil {
pairs = append(pairs, &kvrpcpb.KvPair{
Key: lockPair.key,
// deleted key's value is nil
Value: getValueFromLock(lockPair.lock),
})
} else if err != nil {
pairs = append(pairs, &kvrpcpb.KvPair{
Error: convertToKeyError(err),
Key: safeCopy(it.Key()),
Expand All @@ -1430,8 +1476,8 @@ func (store *MVCCStore) collectRangeLock(startTS uint64, startKey, endKey []byte
return pairs
}

func isResolved(startTS uint64, resolved []uint64) bool {
for _, v := range resolved {
func inTSSet(startTS uint64, tsSet []uint64) bool {
for _, v := range tsSet {
if startTS == v {
return true
}
Expand Down Expand Up @@ -1486,7 +1532,7 @@ func (store *MVCCStore) Scan(reqCtx *requestCtx, req *kvrpcpb.ScanRequest) []*kv
var lockPairs []*kvrpcpb.KvPair
limit := req.GetLimit()
if req.SampleStep == 0 {
lockPairs = store.collectRangeLock(req.GetVersion(), startKey, endKey, req.Context.ResolvedLocks)
lockPairs = store.collectRangeLock(req.GetVersion(), startKey, endKey, reqCtx.rpcCtx.ResolvedLocks, reqCtx.rpcCtx.CommittedLocks)
} else {
limit = req.SampleStep * limit
}
Expand All @@ -1506,31 +1552,26 @@ func (store *MVCCStore) Scan(reqCtx *requestCtx, req *kvrpcpb.ScanRequest) []*kv
})
return scanProc.pairs
}
pairs := append(scanProc.pairs, lockPairs...)
sort.Slice(pairs, func(i, j int) bool {
pairs := append(lockPairs, scanProc.pairs...)
sort.SliceStable(pairs, func(i, j int) bool {
cmp := bytes.Compare(pairs[i].Key, pairs[j].Key)
if req.Reverse {
cmp = -cmp
}
if cmp < 0 {
return true
} else if cmp > 0 {
return false
}
return pairs[i].Error != nil
return cmp < 0
})
validPairs := pairs[:0]
var prevErr *kvrpcpb.KvPair
var prev *kvrpcpb.KvPair
for _, pair := range pairs {
if prevErr != nil && bytes.Equal(prevErr.Key, pair.Key) {
if prev != nil && bytes.Equal(prev.Key, pair.Key) {
continue
}
if pair.Error != nil {
prevErr = pair
}
validPairs = append(validPairs, pair)
if len(validPairs) >= int(limit) {
break
prev = pair
if pair.Error != nil || len(pair.Value) != 0 {
validPairs = append(validPairs, pair)
if len(validPairs) >= int(limit) {
break
}
}
}
return validPairs
Expand Down
Loading

0 comments on commit df113a1

Please sign in to comment.