From a7d7e9bc12722cce911c16554d4d155d45f01d00 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Tue, 22 Oct 2019 17:06:31 +0800 Subject: [PATCH 1/3] store,kv: snapshot doesn't cache the non-exists kv entries lead to poor 'insert ignore' performance Snapshot should cache all the BatchGet() kv entries, including the non-exist ones. This commit fix a bug that non-exist kv entries are not cached, causing `insert ignore` to access TiKV everytime and the performance is poor. --- kv/kv.go | 2 ++ store/mockstore/mocktikv/rpc.go | 7 +++++++ store/tikv/snapshot.go | 25 ++++++++++++++++++++++--- 3 files changed, 31 insertions(+), 3 deletions(-) diff --git a/kv/kv.go b/kv/kv.go index 8413d86f684f9..652229871d625 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -187,6 +187,8 @@ type Transaction interface { // SetVars sets variables to the transaction. SetVars(vars *Variables) // BatchGet gets kv from the memory buffer of statement and transaction, and the kv storage. + // Do not use len(value) == 0 or value == nil to represent non-exist. + // If a key doesn't exist, there shouldn't be any corresponding entry in the result map. BatchGet(ctx context.Context, keys []Key) (map[string][]byte, error) IsPessimistic() bool } diff --git a/store/mockstore/mocktikv/rpc.go b/store/mockstore/mocktikv/rpc.go index 1361ffdd30d95..3dc2b10edcb90 100644 --- a/store/mockstore/mocktikv/rpc.go +++ b/store/mockstore/mocktikv/rpc.go @@ -23,6 +23,7 @@ import ( "time" "github.com/golang/protobuf/proto" + "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/coprocessor" @@ -708,6 +709,12 @@ func (c *RPCClient) checkArgs(ctx context.Context, addr string) (*rpcHandler, er // SendRequest sends a request to mock cluster. func (c *RPCClient) SendRequest(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (*tikvrpc.Response, error) { + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("RPCClient.SendRequest", opentracing.ChildOf(span.Context())) + defer span1.Finish() + ctx = opentracing.ContextWithSpan(ctx, span1) + } + failpoint.Inject("rpcServerBusy", func(val failpoint.Value) { if val.(bool) { failpoint.Return(tikvrpc.GenRegionErrorResp(req, &errorpb.Error{ServerIsBusy: &errorpb.ServerIsBusy{}})) diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 96a30073ca4d1..6c23b6fcbb362 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -21,6 +21,7 @@ import ( "time" "unsafe" + "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" @@ -62,6 +63,10 @@ type tikvSnapshot struct { // Cache the result of BatchGet. // The invariance is that calling BatchGet multiple times using the same start ts, // the result should not change. + // NOTE: This representation here is different from the BatchGet API. + // cached use len(value)=0 to represent a key-value entry doesn't exist (a reliable truth from TiKV). + // In the BatchGet API, it use no key-value entry to represent non-exist. + // It's OK as long as there are no zero-byte values in the protocol. cached map[string][]byte } @@ -95,7 +100,9 @@ func (s *tikvSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string] tmp := keys[:0] for _, key := range keys { if val, ok := s.cached[string(key)]; ok { - m[string(key)] = val + if len(val) > 0 { + m[string(key)] = val + } } else { tmp = append(tmp, key) } @@ -121,6 +128,7 @@ func (s *tikvSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string] if len(v) == 0 { return } + mu.Lock() m[string(k)] = v mu.Unlock() @@ -138,8 +146,13 @@ func (s *tikvSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string] if s.cached == nil { s.cached = make(map[string][]byte, len(m)) } - for key, value := range m { - s.cached[key] = value + for _, key := range keys { + // Updating cache using the reliable truth from TiKV, we set cache[key] = nil to mean non-exist. + if value, ok := m[string(key)]; ok { + s.cached[string(key)] = value + } else { + s.cached[string(key)] = nil + } } return m, nil @@ -253,6 +266,12 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll // Get gets the value for key k from snapshot. func (s *tikvSnapshot) Get(ctx context.Context, k kv.Key) ([]byte, error) { + if span := opentracing.SpanFromContext(ctx); span != nil && span.Tracer() != nil { + span1 := span.Tracer().StartSpan("tikvSnapshot.get", opentracing.ChildOf(span.Context())) + defer span1.Finish() + ctx = opentracing.ContextWithSpan(ctx, span1) + } + ctx = context.WithValue(ctx, txnStartKey, s.version.Ver) val, err := s.get(NewBackoffer(ctx, getMaxBackoff), k) if err != nil { From b593624838f8bbec095a994e5ef3f65a1de4c53a Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 23 Oct 2019 11:23:27 +0800 Subject: [PATCH 2/3] address comment --- store/tikv/snapshot.go | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 6c23b6fcbb362..03aa3f60ec963 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -147,12 +147,7 @@ func (s *tikvSnapshot) BatchGet(ctx context.Context, keys []kv.Key) (map[string] s.cached = make(map[string][]byte, len(m)) } for _, key := range keys { - // Updating cache using the reliable truth from TiKV, we set cache[key] = nil to mean non-exist. - if value, ok := m[string(key)]; ok { - s.cached[string(key)] = value - } else { - s.cached[string(key)] = nil - } + s.cached[string(key)] = m[string(key)] } return m, nil From e85de23d8aa862dde7c1bb667db6b91d0c8c08a3 Mon Sep 17 00:00:00 2001 From: tiancaiamao Date: Wed, 23 Oct 2019 14:59:02 +0800 Subject: [PATCH 3/3] address comment --- store/tikv/snapshot.go | 4 ++++ store/tikv/snapshot_test.go | 22 ++++++++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go index 03aa3f60ec963..584232e3e780f 100644 --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -286,6 +286,10 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) { } } + failpoint.Inject("snapshot-get-cache-fail", func(_ failpoint.Value) { + panic("cache miss") + }) + sender := NewRegionRequestSender(s.store.regionCache, s.store.client) req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, diff --git a/store/tikv/snapshot_test.go b/store/tikv/snapshot_test.go index 7ea31aebe801e..d8ea5792975c2 100644 --- a/store/tikv/snapshot_test.go +++ b/store/tikv/snapshot_test.go @@ -19,6 +19,7 @@ import ( "time" . "github.com/pingcap/check" + "github.com/pingcap/failpoint" pb "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/util/logutil" @@ -117,6 +118,27 @@ func (s *testSnapshotSuite) TestBatchGet(c *C) { } } +func (s *testSnapshotSuite) TestSnapshotCache(c *C) { + txn := s.beginTxn(c) + c.Assert(txn.Set(kv.Key("x"), []byte("x")), IsNil) + c.Assert(txn.Commit(context.Background()), IsNil) + + txn = s.beginTxn(c) + snapshot := newTiKVSnapshot(s.store, kv.Version{Ver: txn.StartTS()}, 0) + _, err := snapshot.BatchGet(context.Background(), []kv.Key{kv.Key("x"), kv.Key("y")}) + c.Assert(err, IsNil) + + c.Assert(failpoint.Enable("github.com/pingcap/tidb/store/tikv/snapshot-get-cache-fail", `return(true)`), IsNil) + ctx := context.WithValue(context.Background(), "TestSnapshotCache", true) + _, err = snapshot.Get(ctx, kv.Key("x")) + c.Assert(err, IsNil) + + _, err = snapshot.Get(ctx, kv.Key("y")) + c.Assert(kv.IsErrNotFound(err), IsTrue) + + c.Assert(failpoint.Disable("github.com/pingcap/tidb/store/tikv/snapshot-get-cache-fail"), IsNil) +} + func (s *testSnapshotSuite) TestBatchGetNotExist(c *C) { for _, rowNum := range s.rowNums { logutil.BgLogger().Debug("test BatchGetNotExist",