Skip to content

Commit

Permalink
store/tikv: remove kv.Snaphost from store/tikv (#23318)
Browse files Browse the repository at this point in the history
  • Loading branch information
AndreMouche authored Mar 15, 2021
1 parent 3641cd4 commit cad8e15
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 45 deletions.
2 changes: 1 addition & 1 deletion server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ func (ts *ConnTestSuite) TestPrefetchPointKeys(c *C) {
c.Assert(err, IsNil)
c.Assert(txn.Valid(), IsTrue)
snap := txn.GetSnapshot()
c.Assert(tikv.SnapCacheHitCount(snap), Equals, 4)
c.Assert(snap.(*tikv.KVSnapshot).SnapCacheHitCount(), Equals, 4)
tk.MustExec("commit")
tk.MustQuery("select * from prefetch").Check(testkit.Rows("1 1 2", "2 2 4", "3 3 4"))

Expand Down
5 changes: 4 additions & 1 deletion session/clustered_index_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,10 @@ func (s *testClusteredSuite) TestClusteredInsertIgnoreBatchGetKeyCount(c *C) {
tk.MustExec("insert ignore t values ('a', 1)")
txn, err := tk.Se.Txn(false)
c.Assert(err, IsNil)
snapSize := tikv.SnapCacheSize(txn.GetSnapshot())
snapSize := 0
if t, ok := txn.GetSnapshot().(*tikv.KVSnapshot); ok {
snapSize = t.SnapCacheSize()
}
c.Assert(snapSize, Equals, 1)
tk.MustExec("rollback")
}
Expand Down
5 changes: 5 additions & 0 deletions store/driver/txn/txn_driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ func (txn *tikvTxn) Commit(ctx context.Context) error {
return txn.extractKeyErr(err)
}

// GetSnapshot returns the Snapshot binding to this transaction.
func (txn *tikvTxn) GetSnapshot() kv.Snapshot {
return txn.KVTxn.GetSnapshot()
}

func (txn *tikvTxn) extractKeyErr(err error) error {
if e, ok := errors.Cause(err).(*tikv.ErrKeyExist); ok {
return txn.extractKeyExistsErr(e.GetKey())
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (s *KVStore) beginWithExactStaleness(txnScope string, prevSec uint64) (*KVT

// GetSnapshot gets a snapshot that is able to read any data which data is <= ver.
// if ts is MaxVersion or > current max committed version, we will use current version for this snapshot.
func (s *KVStore) GetSnapshot(ts uint64) kv.Snapshot {
func (s *KVStore) GetSnapshot(ts uint64) *KVSnapshot {
snapshot := newTiKVSnapshot(s, ts, s.nextReplicaReadSeed())
return snapshot
}
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

// Scanner support tikv scan
type Scanner struct {
snapshot *tikvSnapshot
snapshot *KVSnapshot
batchSize int
cache []*pb.KvPair
idx int
Expand All @@ -42,7 +42,7 @@ type Scanner struct {
eof bool
}

func newScanner(snapshot *tikvSnapshot, startKey []byte, endKey []byte, batchSize int, reverse bool) (*Scanner, error) {
func newScanner(snapshot *KVSnapshot, startKey []byte, endKey []byte, batchSize int, reverse bool) (*Scanner, error) {
// It must be > 1. Otherwise scanner won't skipFirst.
if batchSize <= 1 {
batchSize = scanBatchSize
Expand Down
64 changes: 26 additions & 38 deletions store/tikv/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,18 +41,14 @@ import (
"go.uber.org/zap"
)

var (
_ kv.Snapshot = (*tikvSnapshot)(nil)
)

const (
scanBatchSize = 256
batchGetSize = 5120
maxTimestamp = math.MaxUint64
)

// tikvSnapshot implements the kv.Snapshot interface.
type tikvSnapshot struct {
// KVSnapshot implements the kv.Snapshot interface.
type KVSnapshot struct {
store *KVStore
version uint64
isolationLevel kv.IsoLevel
Expand Down Expand Up @@ -88,13 +84,13 @@ type tikvSnapshot struct {
}

// newTiKVSnapshot creates a snapshot of an TiKV store.
func newTiKVSnapshot(store *KVStore, ts uint64, replicaReadSeed uint32) *tikvSnapshot {
func newTiKVSnapshot(store *KVStore, ts uint64, replicaReadSeed uint32) *KVSnapshot {
// Sanity check for snapshot version.
if ts >= math.MaxInt64 && ts != math.MaxUint64 {
err := errors.Errorf("try to get snapshot with a large ts %d", ts)
panic(err)
}
return &tikvSnapshot{
return &KVSnapshot{
store: store,
version: ts,
priority: pb.CommandPri_Normal,
Expand All @@ -104,7 +100,7 @@ func newTiKVSnapshot(store *KVStore, ts uint64, replicaReadSeed uint32) *tikvSna
}
}

func (s *tikvSnapshot) setSnapshotTS(ts uint64) {
func (s *KVSnapshot) setSnapshotTS(ts uint64) {
// Sanity check for snapshot version.
if ts >= math.MaxInt64 && ts != math.MaxUint64 {
err := errors.Errorf("try to get snapshot with a large ts %d", ts)
Expand All @@ -121,7 +117,7 @@ func (s *tikvSnapshot) setSnapshotTS(ts uint64) {

// BatchGet gets all the keys' value from kv-server and returns a map contains key/value pairs.
// The map will not contain nonexistent keys.
func (s *tikvSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string][]byte, error) {
func (s *KVSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string][]byte, error) {
// Check the cached value first.
m := make(map[string][]byte)
s.mu.RLock()
Expand Down Expand Up @@ -222,7 +218,7 @@ func appendBatchKeysBySize(b []batchKeys, region RegionVerID, keys [][]byte, siz
return b
}

func (s *tikvSnapshot) batchGetKeysByRegions(bo *Backoffer, keys [][]byte, collectF func(k, v []byte)) error {
func (s *KVSnapshot) batchGetKeysByRegions(bo *Backoffer, keys [][]byte, collectF func(k, v []byte)) error {
defer func(start time.Time) {
metrics.TxnCmdHistogramWithBatchGet.Observe(time.Since(start).Seconds())
}(time.Now())
Expand Down Expand Up @@ -264,7 +260,7 @@ func (s *tikvSnapshot) batchGetKeysByRegions(bo *Backoffer, keys [][]byte, colle
return errors.Trace(err)
}

func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, collectF func(k, v []byte)) error {
func (s *KVSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, collectF func(k, v []byte)) error {
cli := NewClientHelper(s.store, s.resolvedLocks)
s.mu.RLock()
if s.mu.stats != nil {
Expand Down Expand Up @@ -372,7 +368,7 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll
}

// Get gets the value for key k from snapshot.
func (s *tikvSnapshot) Get(ctx context.Context, k kv.Key) ([]byte, error) {
func (s *KVSnapshot) Get(ctx context.Context, k kv.Key) ([]byte, error) {

defer func(start time.Time) {
metrics.TxnCmdHistogramWithGet.Observe(time.Since(start).Seconds())
Expand All @@ -396,7 +392,7 @@ func (s *tikvSnapshot) Get(ctx context.Context, k kv.Key) ([]byte, error) {
return val, nil
}

func (s *tikvSnapshot) get(ctx context.Context, bo *Backoffer, k kv.Key) ([]byte, error) {
func (s *KVSnapshot) get(ctx context.Context, bo *Backoffer, k kv.Key) ([]byte, error) {
// Check the cached values first.
s.mu.RLock()
if s.mu.cached != nil {
Expand Down Expand Up @@ -523,7 +519,7 @@ func (s *tikvSnapshot) get(ctx context.Context, bo *Backoffer, k kv.Key) ([]byte
}
}

func (s *tikvSnapshot) mergeExecDetail(detail *pb.ExecDetailsV2) {
func (s *KVSnapshot) mergeExecDetail(detail *pb.ExecDetailsV2) {
s.mu.Lock()
defer s.mu.Unlock()
if detail == nil || s.mu.stats == nil {
Expand All @@ -540,20 +536,20 @@ func (s *tikvSnapshot) mergeExecDetail(detail *pb.ExecDetailsV2) {
}

// Iter return a list of key-value pair after `k`.
func (s *tikvSnapshot) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) {
func (s *KVSnapshot) Iter(k kv.Key, upperBound kv.Key) (kv.Iterator, error) {
scanner, err := newScanner(s, k, upperBound, scanBatchSize, false)
return scanner, errors.Trace(err)
}

// IterReverse creates a reversed Iterator positioned on the first entry which key is less than k.
func (s *tikvSnapshot) IterReverse(k kv.Key) (kv.Iterator, error) {
func (s *KVSnapshot) IterReverse(k kv.Key) (kv.Iterator, error) {
scanner, err := newScanner(s, nil, k, scanBatchSize, true)
return scanner, errors.Trace(err)
}

// SetOption sets an option with a value, when val is nil, uses the default
// value of this option. Only ReplicaRead is supported for snapshot
func (s *tikvSnapshot) SetOption(opt kv.Option, val interface{}) {
func (s *KVSnapshot) SetOption(opt kv.Option, val interface{}) {
switch opt {
case kv.IsolationLevel:
s.isolationLevel = val.(kv.IsoLevel)
Expand Down Expand Up @@ -594,8 +590,8 @@ func (s *tikvSnapshot) SetOption(opt kv.Option, val interface{}) {
}
}

// ClearFollowerRead disables follower read on current transaction
func (s *tikvSnapshot) DelOption(opt kv.Option) {
// DelOption deletes an option.
func (s *KVSnapshot) DelOption(opt kv.Option) {
switch opt {
case kv.ReplicaRead:
s.mu.Lock()
Expand All @@ -608,24 +604,16 @@ func (s *tikvSnapshot) DelOption(opt kv.Option) {
}
}

// SnapCacheHitCount gets the snapshot cache hit count.
func SnapCacheHitCount(snap kv.Snapshot) int {
tikvSnap, ok := snap.(*tikvSnapshot)
if !ok {
return 0
}
return int(atomic.LoadInt64(&tikvSnap.mu.hitCnt))
// SnapCacheHitCount gets the snapshot cache hit count. Only for test.
func (s *KVSnapshot) SnapCacheHitCount() int {
return int(atomic.LoadInt64(&s.mu.hitCnt))
}

// SnapCacheSize gets the snapshot cache size.
func SnapCacheSize(snap kv.Snapshot) int {
tikvSnap, ok := snap.(*tikvSnapshot)
if !ok {
return 0
}
tikvSnap.mu.RLock()
defer tikvSnap.mu.RLock()
return len(tikvSnap.mu.cached)
// SnapCacheSize gets the snapshot cache size. Only for test.
func (s *KVSnapshot) SnapCacheSize() int {
s.mu.RLock()
defer s.mu.RLock()
return len(s.mu.cached)
}

func extractLockFromKeyErr(keyErr *pb.KeyError) (*Lock, error) {
Expand Down Expand Up @@ -740,7 +728,7 @@ func prettyWriteKey(buf *bytes.Buffer, key []byte) {
}
}

func (s *tikvSnapshot) recordBackoffInfo(bo *Backoffer) {
func (s *KVSnapshot) recordBackoffInfo(bo *Backoffer) {
s.mu.RLock()
if s.mu.stats == nil || bo.totalSleep == 0 {
s.mu.RUnlock()
Expand All @@ -765,7 +753,7 @@ func (s *tikvSnapshot) recordBackoffInfo(bo *Backoffer) {
}
}

func (s *tikvSnapshot) mergeRegionRequestStats(stats map[tikvrpc.CmdType]*RPCRuntimeStats) {
func (s *KVSnapshot) mergeRegionRequestStats(stats map[tikvrpc.CmdType]*RPCRuntimeStats) {
s.mu.Lock()
defer s.mu.Unlock()
if s.mu.stats == nil {
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ type SchemaAmender interface {

// KVTxn contains methods to interact with a TiKV transaction.
type KVTxn struct {
snapshot *tikvSnapshot
snapshot *KVSnapshot
us kv.UnionStore
store *KVStore // for connection to region.
startTS uint64
Expand Down Expand Up @@ -617,6 +617,6 @@ func (txn *KVTxn) GetMemBuffer() kv.MemBuffer {
}

// GetSnapshot returns the Snapshot binding to this transaction.
func (txn *KVTxn) GetSnapshot() kv.Snapshot {
func (txn *KVTxn) GetSnapshot() *KVSnapshot {
return txn.snapshot
}

0 comments on commit cad8e15

Please sign in to comment.