From 68af25677e9b1b3a7bb6d3ef995f6d742203b22b Mon Sep 17 00:00:00 2001 From: Ewan Chou Date: Fri, 15 Nov 2019 11:08:55 +0800 Subject: [PATCH] Cherry pick follower read to TiDB (#11347) (#13464) --- distsql/request_builder.go | 3 +- distsql/request_builder_test.go | 33 +++ executor/analyze.go | 12 +- executor/analyze_test.go | 10 + executor/point_get.go | 3 + kv/kv.go | 27 +++ kv/mock.go | 3 + session/session.go | 13 +- session/session_test.go | 12 + sessionctx/variable/session.go | 10 + sessionctx/variable/sysvar.go | 1 + sessionctx/variable/sysvar_test.go | 3 + sessionctx/variable/tidb_vars.go | 3 + sessionctx/variable/varsutil.go | 7 + sessionctx/variable/varsutil_test.go | 13 ++ store/tikv/coprocessor.go | 25 +- store/tikv/kv.go | 35 +-- store/tikv/region_cache.go | 63 ++++- store/tikv/region_cache_test.go | 333 ++++++++++++++++++++++++--- store/tikv/region_request.go | 8 +- store/tikv/region_request_test.go | 11 +- store/tikv/scan.go | 2 + store/tikv/snapshot.go | 44 +++- store/tikv/tikvrpc/tikvrpc.go | 2 + store/tikv/txn.go | 6 +- util/admin/admin.go | 4 + 26 files changed, 608 insertions(+), 78 deletions(-) mode change 100644 => 100755 store/tikv/coprocessor.go mode change 100644 => 100755 store/tikv/kv.go mode change 100644 => 100755 store/tikv/region_request.go mode change 100644 => 100755 store/tikv/snapshot.go diff --git a/distsql/request_builder.go b/distsql/request_builder.go index 6818579c5a2a5..ee88b98cf8ed7 100644 --- a/distsql/request_builder.go +++ b/distsql/request_builder.go @@ -153,12 +153,13 @@ func (builder *RequestBuilder) getKVPriority(sv *variable.SessionVars) int { } // SetFromSessionVars sets the following fields for "kv.Request" from session variables: -// "Concurrency", "IsolationLevel", "NotFillCache". +// "Concurrency", "IsolationLevel", "NotFillCache", "ReplicaRead". func (builder *RequestBuilder) SetFromSessionVars(sv *variable.SessionVars) *RequestBuilder { builder.Request.Concurrency = sv.DistSQLScanConcurrency builder.Request.IsolationLevel = builder.getIsolationLevel() builder.Request.NotFillCache = sv.StmtCtx.NotFillCache builder.Request.Priority = builder.getKVPriority(sv) + builder.Request.ReplicaRead = sv.ReplicaRead return builder } diff --git a/distsql/request_builder_test.go b/distsql/request_builder_test.go index a2b472b5ad833..7b353d8f29c82 100644 --- a/distsql/request_builder_test.go +++ b/distsql/request_builder_test.go @@ -311,6 +311,7 @@ func (s *testSuite) TestRequestBuilder1(c *C) { NotFillCache: false, SyncLog: false, Streaming: false, + ReplicaRead: kv.ReplicaReadLeader, } c.Assert(actual, DeepEquals, expect) } @@ -385,6 +386,7 @@ func (s *testSuite) TestRequestBuilder2(c *C) { NotFillCache: false, SyncLog: false, Streaming: false, + ReplicaRead: kv.ReplicaReadLeader, } c.Assert(actual, DeepEquals, expect) } @@ -429,6 +431,7 @@ func (s *testSuite) TestRequestBuilder3(c *C) { NotFillCache: false, SyncLog: false, Streaming: false, + ReplicaRead: kv.ReplicaReadLeader, } c.Assert(actual, DeepEquals, expect) } @@ -474,6 +477,7 @@ func (s *testSuite) TestRequestBuilder4(c *C) { Streaming: true, NotFillCache: false, SyncLog: false, + ReplicaRead: kv.ReplicaReadLeader, } c.Assert(actual, DeepEquals, expect) } @@ -554,3 +558,32 @@ func (s *testSuite) TestRequestBuilder6(c *C) { c.Assert(actual, DeepEquals, expect) } + +func (s *testSuite) TestRequestBuilder7(c *C) { + vars := variable.NewSessionVars() + vars.ReplicaRead = kv.ReplicaReadFollower + + concurrency := 10 + + actual, err := (&RequestBuilder{}). + SetFromSessionVars(vars). + SetConcurrency(concurrency). + Build() + c.Assert(err, IsNil) + + expect := &kv.Request{ + Tp: 0, + StartTs: 0x0, + KeepOrder: false, + Desc: false, + Concurrency: concurrency, + IsolationLevel: 0, + Priority: 0, + NotFillCache: false, + SyncLog: false, + Streaming: false, + ReplicaRead: kv.ReplicaReadFollower, + } + + c.Assert(actual, DeepEquals, expect) +} diff --git a/executor/analyze.go b/executor/analyze.go index 4eb25ba9c7526..a13e5daa0e581 100755 --- a/executor/analyze.go +++ b/executor/analyze.go @@ -623,7 +623,8 @@ func (e *AnalyzeFastExec) getSampRegionsRowCount(bo *tikv.Backoffer, needRebuild } var resp *tikvrpc.Response var rpcCtx *tikv.RPCContext - rpcCtx, *err = e.cache.GetRPCContext(bo, loc.Region) + // we always use the first follower when follower read is enabled + rpcCtx, *err = e.cache.GetRPCContext(bo, loc.Region, e.ctx.GetSessionVars().ReplicaRead, 0) if *err != nil { return } @@ -917,6 +918,9 @@ func (e *AnalyzeFastExec) handleScanTasks(bo *tikv.Backoffer) error { if err != nil { return err } + if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() { + snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) + } for _, t := range e.scanTasks { iter, err := snapshot.Iter(t.StartKey, t.EndKey) if err != nil { @@ -934,10 +938,14 @@ func (e *AnalyzeFastExec) handleSampTasks(bo *tikv.Backoffer, workID int, err *e defer e.wg.Done() var snapshot kv.Snapshot snapshot, *err = e.ctx.GetStore().(tikv.Storage).GetSnapshot(kv.MaxVersion) - rander := rand.New(rand.NewSource(e.randSeed + int64(workID))) if *err != nil { return } + if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() { + snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) + } + rander := rand.New(rand.NewSource(e.randSeed + int64(workID))) + for i := workID; i < len(e.sampTasks); i += e.concurrency { task := e.sampTasks[i] if task.SampSize == 0 { diff --git a/executor/analyze_test.go b/executor/analyze_test.go index 07b9ea928d26a..2f097a728b347 100644 --- a/executor/analyze_test.go +++ b/executor/analyze_test.go @@ -106,6 +106,16 @@ PARTITION BY RANGE ( a ) ( } } +func (s *testSuite1) TestAnalyzeReplicaReadFollower(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t") + tk.MustExec("create table t(a int)") + ctx := tk.Se.(sessionctx.Context) + ctx.GetSessionVars().ReplicaRead = kv.ReplicaReadFollower + tk.MustExec("analyze table t") +} + func (s *testSuite1) TestAnalyzeParameters(c *C) { tk := testkit.NewTestKit(c, s.store) tk.MustExec("use test") diff --git a/executor/point_get.go b/executor/point_get.go index 2da6cd6011e02..f9ea71751486b 100644 --- a/executor/point_get.go +++ b/executor/point_get.go @@ -93,6 +93,9 @@ func (e *PointGetExecutor) Next(ctx context.Context, req *chunk.Chunk) error { if err != nil { return err } + if e.ctx.GetSessionVars().ReplicaRead.IsFollowerRead() { + e.snapshot.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) + } if e.idxInfo != nil { idxKey, err1 := e.encodeIndexKey() if err1 != nil && !kv.ErrNotExist.Equal(err1) { diff --git a/kv/kv.go b/kv/kv.go index 5194f99832623..1310a58b32938 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -49,6 +49,8 @@ const ( Pessimistic // SnapshotTS is defined to set snapshot ts. SnapshotTS + // Set replica read + ReplicaRead ) // Priority value for transaction priority. @@ -68,6 +70,23 @@ const ( RC ) +// ReplicaReadType is the type of replica to read data from +type ReplicaReadType byte + +const ( + // ReplicaReadLeader stands for 'read from leader'. + ReplicaReadLeader ReplicaReadType = 1 << iota + // ReplicaReadFollower stands for 'read from follower'. + ReplicaReadFollower + // ReplicaReadLearner stands for 'read from learner'. + ReplicaReadLearner +) + +// IsFollowerRead checks if leader is going to be used to read data. +func (r ReplicaReadType) IsFollowerRead() bool { + return r == ReplicaReadFollower +} + // Those limits is enforced to make sure the transaction can be well handled by TiKV. var ( // TxnEntrySizeLimit is limit of single entry size (len(key) + len(value)). @@ -216,6 +235,8 @@ type Request struct { Streaming bool // MemTracker is used to trace and control memory usage in co-processor layer. MemTracker *memory.Tracker + // ReplicaRead is used for reading data from replicas, only follower is supported at this time. + ReplicaRead ReplicaReadType } // ResultSubset represents a result subset from a single storage unit. @@ -247,6 +268,12 @@ type Snapshot interface { BatchGet(keys []Key) (map[string][]byte, error) // SetPriority snapshot set the priority SetPriority(priority int) + + // SetOption sets an option with a value, when val is nil, uses the default + // value of this option. Only ReplicaRead is supported for snapshot + SetOption(opt Option, val interface{}) + // DelOption deletes an option. + DelOption(opt Option) } // Driver is the interface that must be implemented by a KV storage. diff --git a/kv/mock.go b/kv/mock.go index 352e3bbd29866..944156b0bc451 100644 --- a/kv/mock.go +++ b/kv/mock.go @@ -232,3 +232,6 @@ func (s *mockSnapshot) Iter(k Key, upperBound Key) (Iterator, error) { func (s *mockSnapshot) IterReverse(k Key) (Iterator, error) { return s.store.IterReverse(k) } + +func (s *mockSnapshot) SetOption(opt Option, val interface{}) {} +func (s *mockSnapshot) DelOption(opt Option) {} diff --git a/session/session.go b/session/session.go index 64800593704c8..dde2012d06ddf 100644 --- a/session/session.go +++ b/session/session.go @@ -186,6 +186,9 @@ type session struct { statsCollector *handle.SessionStatsCollector // ddlOwnerChecker is used in `select tidb_is_ddl_owner()` statement; ddlOwnerChecker owner.DDLOwnerChecker + + // shared coprocessor client per session + client kv.Client } // DDLOwnerChecker returns s.ddlOwnerChecker. @@ -495,7 +498,7 @@ func (s *session) RollbackTxn(ctx context.Context) { } func (s *session) GetClient() kv.Client { - return s.store.GetClient() + return s.client } func (s *session) String() string { @@ -1271,6 +1274,9 @@ func (s *session) Txn(active bool) (kv.Transaction, error) { if !s.sessionVars.IsAutocommit() { s.sessionVars.SetStatusFlag(mysql.ServerStatusInTrans, true) } + if s.sessionVars.ReplicaRead.IsFollowerRead() { + s.txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) + } } return &s.txn, nil } @@ -1294,6 +1300,9 @@ func (s *session) NewTxn(ctx context.Context) error { } txn.SetCap(s.getMembufCap()) txn.SetVars(s.sessionVars.KVVars) + if s.GetSessionVars().ReplicaRead.IsFollowerRead() { + txn.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) + } s.txn.changeInvalidToValid(txn) is := domain.GetDomain(s).InfoSchema() s.sessionVars.TxnCtx = &variable.TransactionContext{ @@ -1573,6 +1582,7 @@ func createSession(store kv.Storage) (*session, error) { parser: parser.New(), sessionVars: variable.NewSessionVars(), ddlOwnerChecker: dom.DDL().OwnerManager(), + client: store.GetClient(), } if plannercore.PreparedPlanCacheEnabled() { s.preparedPlanCache = kvcache.NewSimpleLRUCache(plannercore.PreparedPlanCacheCapacity, @@ -1596,6 +1606,7 @@ func createSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, er store: store, parser: parser.New(), sessionVars: variable.NewSessionVars(), + client: store.GetClient(), } if plannercore.PreparedPlanCacheEnabled() { s.preparedPlanCache = kvcache.NewSimpleLRUCache(plannercore.PreparedPlanCacheCapacity, diff --git a/session/session_test.go b/session/session_test.go index 2bce04245a38d..6b98c440e5a43 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -2714,3 +2714,15 @@ func (s *testSessionSuite) TestGrantViewRelated(c *C) { tkUser.MustQuery("select current_user();").Check(testkit.Rows("u_version29@%")) tkUser.MustExec("create view v_version29_c as select * from v_version29;") } + +func (s *testSessionSuite) TestReplicaRead(c *C) { + var err error + tk := testkit.NewTestKit(c, s.store) + tk.Se, err = session.CreateSession4Test(s.store) + c.Assert(err, IsNil) + c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadLeader) + tk.MustExec("set @@tidb_replica_read = 'follower';") + c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadFollower) + tk.MustExec("set @@tidb_replica_read = 'leader';") + c.Assert(tk.Se.GetSessionVars().ReplicaRead, Equals, kv.ReplicaReadLeader) +} diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index dc2389263b999..efcfde9ef6a5a 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -394,6 +394,9 @@ type SessionVars struct { // ConnectionInfo indicates current connection info used by current session, only be lazy assigned by plugin. ConnectionInfo *ConnectionInfo + // ReplicaRead is used for reading data from replicas, only follower is supported at this time. + ReplicaRead kv.ReplicaReadType + // StartTime is the start time of the last query. StartTime time.Time @@ -474,6 +477,7 @@ func NewSessionVars() *SessionVars { SlowQueryFile: config.GetGlobalConfig().Log.SlowQueryFile, WaitSplitRegionFinish: DefTiDBWaitSplitRegionFinish, WaitSplitRegionTimeout: DefWaitSplitRegionTimeout, + ReplicaRead: kv.ReplicaReadLeader, AllowRemoveAutoInc: DefTiDBAllowRemoveAutoInc, LockWaitTimeout: DefInnodbLockWaitTimeout * 1000, } @@ -847,6 +851,12 @@ func (s *SessionVars) SetSystemVar(name string, val string) error { // It's a global variable, but it also wants to be cached in server. case TiDBMaxDeltaSchemaCount: SetMaxDeltaSchemaCount(tidbOptInt64(val, DefTiDBMaxDeltaSchemaCount)) + case TiDBReplicaRead: + if strings.EqualFold(val, "follower") { + s.ReplicaRead = kv.ReplicaReadFollower + } else if strings.EqualFold(val, "leader") || len(val) == 0 { + s.ReplicaRead = kv.ReplicaReadLeader + } case TiDBStoreLimit: storeutil.StoreLimit.Store(tidbOptInt64(val, DefTiDBStoreLimit)) } diff --git a/sessionctx/variable/sysvar.go b/sessionctx/variable/sysvar.go index 3e1ccdb96edbb..fa893492bb0fb 100644 --- a/sessionctx/variable/sysvar.go +++ b/sessionctx/variable/sysvar.go @@ -713,6 +713,7 @@ var defaultSysVars = []*SysVar{ {ScopeSession, TiDBWaitSplitRegionTimeout, strconv.Itoa(DefWaitSplitRegionTimeout)}, {ScopeSession, TiDBLowResolutionTSO, "0"}, {ScopeSession, TiDBExpensiveQueryTimeThreshold, strconv.Itoa(DefTiDBExpensiveQueryTimeThreshold)}, + {ScopeSession, TiDBReplicaRead, "leader"}, {ScopeSession, TiDBAllowRemoveAutoInc, BoolToIntStr(DefTiDBAllowRemoveAutoInc)}, {ScopeGlobal | ScopeSession, TiDBEnableStmtSummary, "0"}, {ScopeGlobal | ScopeSession, TiDBStoreLimit, strconv.FormatInt(atomic.LoadInt64(&config.GetGlobalConfig().TiKVClient.StoreLimit), 10)}, diff --git a/sessionctx/variable/sysvar_test.go b/sessionctx/variable/sysvar_test.go index a084c39414014..5a23978f98f56 100644 --- a/sessionctx/variable/sysvar_test.go +++ b/sessionctx/variable/sysvar_test.go @@ -46,6 +46,9 @@ func (*testSysVarSuite) TestSysVar(c *C) { f = GetSysVar("tidb_low_resolution_tso") c.Assert(f.Value, Equals, "0") + + f = GetSysVar("tidb_replica_read") + c.Assert(f.Value, Equals, "leader") } func (*testSysVarSuite) TestTxnMode(c *C) { diff --git a/sessionctx/variable/tidb_vars.go b/sessionctx/variable/tidb_vars.go index acda23b97eadb..a967d94305a5d 100644 --- a/sessionctx/variable/tidb_vars.go +++ b/sessionctx/variable/tidb_vars.go @@ -146,6 +146,9 @@ const ( // TiDBLowResolutionTSO is used for reading data with low resolution TSO which is updated once every two seconds TiDBLowResolutionTSO = "tidb_low_resolution_tso" + // TiDBReplicaRead is used for reading data from replicas, followers for example. + TiDBReplicaRead = "tidb_replica_read" + // TiDBAllowRemoveAutoInc indicates whether a user can drop the auto_increment column attribute or not. TiDBAllowRemoveAutoInc = "tidb_allow_remove_auto_inc" ) diff --git a/sessionctx/variable/varsutil.go b/sessionctx/variable/varsutil.go index fc53a37bfc40c..5c8a864c3edf9 100644 --- a/sessionctx/variable/varsutil.go +++ b/sessionctx/variable/varsutil.go @@ -587,6 +587,13 @@ func ValidateSetSystemVar(vars *SessionVars, name string, value string) (string, default: return value, ErrWrongValueForVar.GenWithStackByArgs(TiDBTxnMode, value) } + case TiDBReplicaRead: + if strings.EqualFold(value, "follower") { + return "follower", nil + } else if strings.EqualFold(value, "leader") || len(value) == 0 { + return "leader", nil + } + return value, ErrWrongValueForVar.GenWithStackByArgs(name, value) case TiDBAllowRemoveAutoInc: switch { case strings.EqualFold(value, "ON") || value == "1": diff --git a/sessionctx/variable/varsutil_test.go b/sessionctx/variable/varsutil_test.go index ec97078c4d849..e15d95b7c2ffc 100644 --- a/sessionctx/variable/varsutil_test.go +++ b/sessionctx/variable/varsutil_test.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/parser/mysql" "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/config" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/testleak" ) @@ -294,6 +295,17 @@ func (s *testVarsutilSuite) TestVarsutil(c *C) { c.Assert(err, IsNil) c.Assert(val, Equals, "0") c.Assert(v.CorrelationThreshold, Equals, float64(0)) + + SetSessionSystemVar(v, TiDBReplicaRead, types.NewStringDatum("follower")) + val, err = GetSessionSystemVar(v, TiDBReplicaRead) + c.Assert(err, IsNil) + c.Assert(val, Equals, "follower") + c.Assert(v.ReplicaRead, Equals, kv.ReplicaReadFollower) + SetSessionSystemVar(v, TiDBReplicaRead, types.NewStringDatum("leader")) + val, err = GetSessionSystemVar(v, TiDBReplicaRead) + c.Assert(err, IsNil) + c.Assert(val, Equals, "leader") + c.Assert(v.ReplicaRead, Equals, kv.ReplicaReadLeader) } func (s *testVarsutilSuite) TestValidate(c *C) { @@ -348,6 +360,7 @@ func (s *testVarsutilSuite) TestValidate(c *C) { {TiDBTxnMode, "pessimistic", false}, {TiDBTxnMode, "optimistic", false}, {TiDBTxnMode, "", false}, + {TiDBReplicaRead, "invalid", true}, } for _, t := range tests { diff --git a/store/tikv/coprocessor.go b/store/tikv/coprocessor.go old mode 100644 new mode 100755 index e87aa312ef078..559a2562cd61c --- a/store/tikv/coprocessor.go +++ b/store/tikv/coprocessor.go @@ -44,7 +44,8 @@ var tikvTxnRegionsNumHistogramWithCoprocessor = metrics.TiKVTxnRegionsNumHistogr // CopClient is coprocessor client. type CopClient struct { - store *tikvStore + store *tikvStore + replicaReadSeed uint32 } // IsRequestTypeSupported checks whether reqType is supported. @@ -93,12 +94,13 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, vars *kv.Variable return copErrorResponse{err} } it := &copIterator{ - store: c.store, - req: req, - concurrency: req.Concurrency, - finishCh: make(chan struct{}), - vars: vars, - memTracker: req.MemTracker, + store: c.store, + req: req, + concurrency: req.Concurrency, + finishCh: make(chan struct{}), + vars: vars, + memTracker: req.MemTracker, + replicaReadSeed: c.replicaReadSeed, } it.tasks = tasks if it.concurrency > len(tasks) { @@ -386,6 +388,8 @@ type copIterator struct { vars *kv.Variables memTracker *memory.Tracker + + replicaReadSeed uint32 } // copIteratorWorker receives tasks from copIteratorTaskSender, handles tasks and sends the copResponse to respChan. @@ -399,6 +403,8 @@ type copIteratorWorker struct { vars *kv.Variables memTracker *memory.Tracker + + replicaReadSeed uint32 } // copIteratorTaskSender sends tasks to taskCh then wait for the workers to exit. @@ -498,6 +504,8 @@ func (it *copIterator) open(ctx context.Context) { vars: it.vars, memTracker: it.memTracker, + + replicaReadSeed: it.replicaReadSeed, } go worker.run(ctx) } @@ -666,6 +674,7 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch }) sender := NewRegionRequestSender(worker.store.regionCache, worker.store.client) + req := &tikvrpc.Request{ Type: task.cmdType, Cop: &coprocessor.Request{ @@ -679,7 +688,9 @@ func (worker *copIteratorWorker) handleTaskOnce(bo *Backoffer, task *copTask, ch NotFillCache: worker.req.NotFillCache, HandleTime: true, ScanDetail: true, + ReplicaRead: worker.req.ReplicaRead.IsFollowerRead(), }, + ReplicaReadSeed: worker.replicaReadSeed, } startTime := time.Now() resp, rpcCtx, err := sender.SendReqCtx(bo, req, task.region, ReadTimeoutMedium) diff --git a/store/tikv/kv.go b/store/tikv/kv.go old mode 100644 new mode 100755 index 2c5fde9ed51ce..45b18885da39c --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -21,6 +21,7 @@ import ( "net/url" "strings" "sync" + "sync/atomic" "time" "github.com/pingcap/errors" @@ -145,6 +146,8 @@ type tikvStore struct { spTime time.Time spMutex sync.RWMutex // this is used to update safePoint and spTime closed chan struct{} // this is used to nofity when the store is closed + + replicaReadSeed uint32 // this is used to load balance followers / learners when replica read is enabled } func (s *tikvStore) UpdateSPCache(cachedSP uint64, cachedTime time.Time) { @@ -180,16 +183,17 @@ func newTikvStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Clie return nil, errors.Trace(err) } store := &tikvStore{ - clusterID: pdClient.GetClusterID(context.TODO()), - uuid: uuid, - oracle: o, - client: client, - pdClient: pdClient, - regionCache: NewRegionCache(pdClient), - kv: spkv, - safePoint: 0, - spTime: time.Now(), - closed: make(chan struct{}), + clusterID: pdClient.GetClusterID(context.TODO()), + uuid: uuid, + oracle: o, + client: client, + pdClient: pdClient, + regionCache: NewRegionCache(pdClient), + kv: spkv, + safePoint: 0, + spTime: time.Now(), + closed: make(chan struct{}), + replicaReadSeed: rand.Uint32(), } store.lockResolver = newLockResolver(store) store.enableGC = enableGC @@ -263,7 +267,7 @@ func (s *tikvStore) Begin() (kv.Transaction, error) { // BeginWithStartTS begins a transaction with startTS. func (s *tikvStore) BeginWithStartTS(startTS uint64) (kv.Transaction, error) { - txn, err := newTikvTxnWithStartTS(s, startTS) + txn, err := newTikvTxnWithStartTS(s, startTS, s.nextReplicaReadSeed()) if err != nil { return nil, errors.Trace(err) } @@ -272,7 +276,7 @@ func (s *tikvStore) BeginWithStartTS(startTS uint64) (kv.Transaction, error) { } func (s *tikvStore) GetSnapshot(ver kv.Version) (kv.Snapshot, error) { - snapshot := newTiKVSnapshot(s, ver) + snapshot := newTiKVSnapshotWithReplicaReadSeed(s, ver, s.nextReplicaReadSeed()) metrics.TiKVSnapshotCounter.Inc() return snapshot, nil } @@ -336,9 +340,14 @@ func (s *tikvStore) getTimestampWithRetry(bo *Backoffer) (uint64, error) { } } +func (s *tikvStore) nextReplicaReadSeed() uint32 { + return atomic.AddUint32(&s.replicaReadSeed, 1) +} + func (s *tikvStore) GetClient() kv.Client { return &CopClient{ - store: s, + store: s, + replicaReadSeed: s.nextReplicaReadSeed(), } } diff --git a/store/tikv/region_cache.go b/store/tikv/region_cache.go index 060d714599789..c9ec669e6584a 100644 --- a/store/tikv/region_cache.go +++ b/store/tikv/region_cache.go @@ -28,6 +28,7 @@ import ( "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/pd/client" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/util/logutil" atomic2 "go.uber.org/atomic" @@ -88,6 +89,26 @@ func (r *RegionStore) clone() *RegionStore { } } +// return next follower store's index +func (r *RegionStore) follower(seed uint32) int32 { + l := uint32(len(r.stores)) + if l <= 1 { + return r.workStoreIdx + } + + for retry := l - 1; retry > 0; retry-- { + followerIdx := int32(seed % (l - 1)) + if followerIdx >= r.workStoreIdx { + followerIdx++ + } + if r.storeFails[followerIdx] == atomic.LoadUint32(&r.stores[followerIdx].fail) { + return followerIdx + } + seed++ + } + return r.workStoreIdx +} + // init initializes region after constructed. func (r *Region) init(c *RegionCache) { // region store pull used store from global store map @@ -259,7 +280,7 @@ func (c *RPCContext) String() string { // GetRPCContext returns RPCContext for a region. If it returns nil, the region // must be out of date and already dropped from cache. -func (c *RegionCache) GetRPCContext(bo *Backoffer, id RegionVerID) (*RPCContext, error) { +func (c *RegionCache) GetRPCContext(bo *Backoffer, id RegionVerID, replicaRead kv.ReplicaReadType, followerStoreSeed uint32) (*RPCContext, error) { ts := time.Now().Unix() cachedRegion := c.getCachedRegionWithRLock(id) @@ -272,7 +293,15 @@ func (c *RegionCache) GetRPCContext(bo *Backoffer, id RegionVerID) (*RPCContext, } regionStore := cachedRegion.getStore() - store, peer, storeIdx := cachedRegion.WorkStorePeer(regionStore) + var store *Store + var peer *metapb.Peer + var storeIdx int + switch replicaRead { + case kv.ReplicaReadFollower: + store, peer, storeIdx = cachedRegion.FollowerStorePeer(regionStore, followerStoreSeed) + default: + store, peer, storeIdx = cachedRegion.WorkStorePeer(regionStore) + } addr, err := c.getStoreAddr(bo, cachedRegion, store, storeIdx) if err != nil { return nil, err @@ -290,7 +319,7 @@ func (c *RegionCache) GetRPCContext(bo *Backoffer, id RegionVerID) (*RPCContext, } storeFailEpoch := atomic.LoadUint32(&store.fail) - if storeFailEpoch != regionStore.storeFails[regionStore.workStoreIdx] { + if storeFailEpoch != regionStore.storeFails[storeIdx] { cachedRegion.invalidate() logutil.Logger(context.Background()).Info("invalidate current region, because others failed on same store", zap.Uint64("region", id.GetID()), @@ -842,12 +871,21 @@ func (r *Region) GetLeaderStoreID() uint64 { return r.meta.Peers[int(r.getStore().workStoreIdx)].StoreId } +func (r *Region) getStorePeer(rs *RegionStore, pidx int32) (store *Store, peer *metapb.Peer, idx int) { + store = rs.stores[pidx] + peer = r.meta.Peers[pidx] + idx = int(pidx) + return +} + // WorkStorePeer returns current work store with work peer. func (r *Region) WorkStorePeer(rs *RegionStore) (store *Store, peer *metapb.Peer, idx int) { - idx = int(rs.workStoreIdx) - store = rs.stores[rs.workStoreIdx] - peer = r.meta.Peers[rs.workStoreIdx] - return + return r.getStorePeer(rs, rs.workStoreIdx) +} + +// FollowerStorePeer returns a follower store with follower peer. +func (r *Region) FollowerStorePeer(rs *RegionStore, followerStoreSeed uint32) (store *Store, peer *metapb.Peer, idx int) { + return r.getStorePeer(rs, rs.follower(followerStoreSeed)) } // RegionVerID is a unique ID that can identify a Region at a specific version. @@ -891,13 +929,10 @@ func (c *RegionCache) switchToPeer(r *Region, targetStoreID uint64) (found bool) func (c *RegionCache) switchNextPeer(r *Region, currentPeerIdx int, err error) { rs := r.getStore() - if int(rs.workStoreIdx) != currentPeerIdx { - return - } if err != nil { // TODO: refine err, only do this for some errors. - s := rs.stores[rs.workStoreIdx] - epoch := rs.storeFails[rs.workStoreIdx] + s := rs.stores[currentPeerIdx] + epoch := rs.storeFails[currentPeerIdx] if atomic.CompareAndSwapUint32(&s.fail, epoch, epoch+1) { logutil.Logger(context.Background()).Info("mark store's regions need be refill", zap.String("store", s.addr)) tikvRegionCacheCounterWithInvalidateStoreRegionsOK.Inc() @@ -905,6 +940,10 @@ func (c *RegionCache) switchNextPeer(r *Region, currentPeerIdx int, err error) { s.markNeedCheck(c.notifyCheckCh) } + if int(rs.workStoreIdx) != currentPeerIdx { + return + } + nextIdx := (currentPeerIdx + 1) % len(rs.stores) newRegionStore := rs.clone() newRegionStore.workStoreIdx = int32(nextIdx) diff --git a/store/tikv/region_cache_test.go b/store/tikv/region_cache_test.go index 7546f3646988c..cd4f2213ba435 100644 --- a/store/tikv/region_cache_test.go +++ b/store/tikv/region_cache_test.go @@ -17,22 +17,24 @@ import ( "context" "errors" "fmt" + "math/rand" "testing" "time" "github.com/google/btree" . "github.com/pingcap/check" "github.com/pingcap/kvproto/pkg/metapb" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/store/mockstore/mocktikv" ) type testRegionCacheSuite struct { OneByOneSuite cluster *mocktikv.Cluster - store1 uint64 - store2 uint64 - peer1 uint64 - peer2 uint64 + store1 uint64 // store1 is leader + store2 uint64 // store2 is follower + peer1 uint64 // peer1 is leader + peer2 uint64 // peer2 is follower region1 uint64 cache *RegionCache bo *Backoffer @@ -105,10 +107,10 @@ func (s *testRegionCacheSuite) getRegionWithEndKey(c *C, key []byte) *Region { return r } -func (s *testRegionCacheSuite) getAddr(c *C, key []byte) string { +func (s *testRegionCacheSuite) getAddr(c *C, key []byte, replicaRead kv.ReplicaReadType, seed uint32) string { loc, err := s.cache.LocateKey(s.bo, key) c.Assert(err, IsNil) - ctx, err := s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err := s.cache.GetRPCContext(s.bo, loc.Region, replicaRead, seed) c.Assert(err, IsNil) if ctx == nil { return "" @@ -117,10 +119,12 @@ func (s *testRegionCacheSuite) getAddr(c *C, key []byte) string { } func (s *testRegionCacheSuite) TestSimple(c *C) { + seed := rand.Uint32() r := s.getRegion(c, []byte("a")) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, s.region1) - c.Assert(s.getAddr(c, []byte("a")), Equals, s.storeAddr(s.store1)) + c.Assert(s.getAddr(c, []byte("a"), kv.ReplicaReadLeader, 0), Equals, s.storeAddr(s.store1)) + c.Assert(s.getAddr(c, []byte("a"), kv.ReplicaReadFollower, seed), Equals, s.storeAddr(s.store2)) s.checkCache(c, 1) c.Assert(r.GetMeta(), DeepEquals, r.meta) c.Assert(r.GetLeaderID(), Equals, r.meta.Peers[r.getStore().workStoreIdx].Id) @@ -134,7 +138,10 @@ func (s *testRegionCacheSuite) TestDropStore(c *C) { s.cluster.RemoveStore(s.store1) loc, err := s.cache.LocateKey(bo, []byte("a")) c.Assert(err, IsNil) - ctx, err := s.cache.GetRPCContext(bo, loc.Region) + ctx, err := s.cache.GetRPCContext(bo, loc.Region, kv.ReplicaReadLeader, 0) + c.Assert(err, IsNil) + c.Assert(ctx, IsNil) + ctx, err = s.cache.GetRPCContext(bo, loc.Region, kv.ReplicaReadFollower, rand.Uint32()) c.Assert(err, IsNil) c.Assert(ctx, IsNil) s.checkCache(c, 0) @@ -155,6 +162,7 @@ func (s *testRegionCacheSuite) TestDropStoreRetry(c *C) { } func (s *testRegionCacheSuite) TestUpdateLeader(c *C) { + seed := rand.Uint32() loc, err := s.cache.LocateKey(s.bo, []byte("a")) c.Assert(err, IsNil) // tikv-server reports `NotLeader` @@ -163,15 +171,18 @@ func (s *testRegionCacheSuite) TestUpdateLeader(c *C) { r := s.getRegion(c, []byte("a")) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, s.region1) - c.Assert(s.getAddr(c, []byte("a")), Equals, s.storeAddr(s.store2)) + c.Assert(s.getAddr(c, []byte("a"), kv.ReplicaReadLeader, 0), Equals, s.storeAddr(s.store2)) + c.Assert(s.getAddr(c, []byte("a"), kv.ReplicaReadFollower, seed), Equals, s.storeAddr(s.store1)) r = s.getRegionWithEndKey(c, []byte("z")) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, s.region1) - c.Assert(s.getAddr(c, []byte("z")), Equals, s.storeAddr(s.store2)) + c.Assert(s.getAddr(c, []byte("z"), kv.ReplicaReadLeader, 0), Equals, s.storeAddr(s.store2)) + c.Assert(s.getAddr(c, []byte("a"), kv.ReplicaReadFollower, seed), Equals, s.storeAddr(s.store1)) } func (s *testRegionCacheSuite) TestUpdateLeader2(c *C) { + seed := rand.Uint32() loc, err := s.cache.LocateKey(s.bo, []byte("a")) c.Assert(err, IsNil) // new store3 becomes leader @@ -186,7 +197,20 @@ func (s *testRegionCacheSuite) TestUpdateLeader2(c *C) { r := s.getRegion(c, []byte("a")) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, s.region1) - c.Assert(s.getAddr(c, []byte("a")), Equals, s.storeAddr(s.store1)) + c.Assert(s.getAddr(c, []byte("a"), kv.ReplicaReadLeader, 0), Equals, s.storeAddr(s.store1)) + follower := s.getAddr(c, []byte("a"), kv.ReplicaReadFollower, seed) + if seed%2 == 0 { + c.Assert(follower, Equals, s.storeAddr(s.store2)) + } else { + c.Assert(follower, Equals, s.storeAddr(store3)) + } + follower2 := s.getAddr(c, []byte("a"), kv.ReplicaReadFollower, seed+1) + if (seed+1)%2 == 0 { + c.Assert(follower2, Equals, s.storeAddr(s.store2)) + } else { + c.Assert(follower2, Equals, s.storeAddr(store3)) + } + c.Assert(follower, Not(Equals), follower2) // tikv-server notifies new leader to pd-server. s.cluster.ChangeLeader(s.region1, peer3) @@ -195,10 +219,24 @@ func (s *testRegionCacheSuite) TestUpdateLeader2(c *C) { r = s.getRegion(c, []byte("a")) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, s.region1) - c.Assert(s.getAddr(c, []byte("a")), Equals, s.storeAddr(store3)) + c.Assert(s.getAddr(c, []byte("a"), kv.ReplicaReadLeader, 0), Equals, s.storeAddr(store3)) + follower = s.getAddr(c, []byte("a"), kv.ReplicaReadFollower, seed) + if seed%2 == 0 { + c.Assert(follower, Equals, s.storeAddr(s.store1)) + } else { + c.Assert(follower, Equals, s.storeAddr(s.store2)) + } + follower2 = s.getAddr(c, []byte("a"), kv.ReplicaReadFollower, seed+1) + if (seed+1)%2 == 0 { + c.Assert(follower2, Equals, s.storeAddr(s.store1)) + } else { + c.Assert(follower2, Equals, s.storeAddr(s.store2)) + } + c.Assert(follower, Not(Equals), follower2) } func (s *testRegionCacheSuite) TestUpdateLeader3(c *C) { + seed := rand.Uint32() loc, err := s.cache.LocateKey(s.bo, []byte("a")) c.Assert(err, IsNil) // store2 becomes leader @@ -219,11 +257,16 @@ func (s *testRegionCacheSuite) TestUpdateLeader3(c *C) { c.Assert(err, IsNil) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, s.region1) - addr := s.getAddr(c, []byte("a")) + addr := s.getAddr(c, []byte("a"), kv.ReplicaReadLeader, 0) c.Assert(addr, Equals, "") s.getRegion(c, []byte("a")) // pd-server should return the new leader. - c.Assert(s.getAddr(c, []byte("a")), Equals, s.storeAddr(store3)) + c.Assert(s.getAddr(c, []byte("a"), kv.ReplicaReadLeader, 0), Equals, s.storeAddr(store3)) + addr = s.getAddr(c, []byte("a"), kv.ReplicaReadFollower, seed) + c.Assert(addr == s.storeAddr(s.store1) || len(addr) == 0, IsTrue) + addr2 := s.getAddr(c, []byte("a"), kv.ReplicaReadFollower, seed+1) + c.Assert(addr2 == s.storeAddr(s.store1) || len(addr2) == 0, IsTrue) + c.Assert((len(addr2) == 0 && len(addr) == 0) || addr != addr2, IsTrue) } func (s *testRegionCacheSuite) TestSendFailedButLeaderNotChange(c *C) { @@ -236,22 +279,74 @@ func (s *testRegionCacheSuite) TestSendFailedButLeaderNotChange(c *C) { loc, err := s.cache.LocateKey(s.bo, []byte("a")) c.Assert(err, IsNil) - ctx, err := s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer1) c.Assert(len(ctx.Meta.Peers), Equals, 3) + // verify follower to be one of store2 and store3 + seed := rand.Uint32() + ctxFollower1, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer2) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, peer3) + } + ctxFollower2, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer2) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, peer3) + } + c.Assert(ctxFollower1.Peer.Id, Equals, ctxFollower2.Peer.Id) + // send fail leader switch to 2 s.cache.OnSendFail(s.bo, ctx, false, nil) - ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer2) + // verify follower to be one of store1 and store3 + ctxFollower1, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, peer3) + } + ctxFollower2, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed+1) + c.Assert(err, IsNil) + if (seed+1)%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, peer3) + } + c.Assert(ctxFollower1.Peer.Id, Not(Equals), ctxFollower2.Peer.Id) + // access 1 it will return NotLeader, leader back to 2 again s.cache.UpdateLeader(loc.Region, s.store2, ctx.PeerIdx) - ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer2) + + // verify follower to be one of store1 and store3 + ctxFollower1, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, peer3) + } + ctxFollower2, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed+1) + c.Assert(err, IsNil) + if (seed+1)%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, peer3) + } + c.Assert(ctxFollower1.Peer.Id, Not(Equals), ctxFollower2.Peer.Id) } func (s *testRegionCacheSuite) TestSendFailedInHibernateRegion(c *C) { @@ -264,30 +359,100 @@ func (s *testRegionCacheSuite) TestSendFailedInHibernateRegion(c *C) { loc, err := s.cache.LocateKey(s.bo, []byte("a")) c.Assert(err, IsNil) - ctx, err := s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer1) c.Assert(len(ctx.Meta.Peers), Equals, 3) + // verify follower to be one of store2 and store3 + seed := rand.Uint32() + ctxFollower1, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer2) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, peer3) + } + ctxFollower2, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer2) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, peer3) + } + c.Assert(ctxFollower1.Peer.Id, Equals, ctxFollower2.Peer.Id) + // send fail leader switch to 2 s.cache.OnSendFail(s.bo, ctx, false, nil) - ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer2) + // verify follower to be one of store1 and store3 + ctxFollower1, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, peer3) + } + c.Assert(ctxFollower1.Peer.Id == s.peer1 || ctxFollower1.Peer.Id == peer3, IsTrue) + ctxFollower2, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed+1) + c.Assert(err, IsNil) + if (seed+1)%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, peer3) + } + c.Assert(ctxFollower1.Peer.Id, Not(Equals), ctxFollower2.Peer.Id) + // access 2, it's in hibernate and return 0 leader, so switch to 3 s.cache.UpdateLeader(loc.Region, 0, ctx.PeerIdx) - ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, peer3) + // verify follower to be one of store1 and store2 + ctxFollower1, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer2) + } + ctxFollower2, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer2) + } + c.Assert(ctxFollower1.Peer.Id, Equals, ctxFollower2.Peer.Id) + // again peer back to 1 - ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) s.cache.UpdateLeader(loc.Region, 0, ctx.PeerIdx) - ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer1) + + // verify follower to be one of store2 and store3 + ctxFollower1, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer2) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, peer3) + } + ctxFollower2, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed+1) + c.Assert(err, IsNil) + if (seed+1)%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer2) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, peer3) + } + c.Assert(ctxFollower1.Peer.Id, Not(Equals), ctxFollower2.Peer.Id) } func (s *testRegionCacheSuite) TestSendFailInvalidateRegionsInSameStore(c *C) { @@ -305,12 +470,12 @@ func (s *testRegionCacheSuite) TestSendFailInvalidateRegionsInSameStore(c *C) { c.Assert(loc2.Region.id, Equals, region2) // Send fail on region1 - ctx, _ := s.cache.GetRPCContext(s.bo, loc1.Region) + ctx, _ := s.cache.GetRPCContext(s.bo, loc1.Region, kv.ReplicaReadLeader, 0) s.checkCache(c, 2) s.cache.OnSendFail(s.bo, ctx, false, errors.New("test error")) // Get region2 cache will get nil then reload. - ctx2, err := s.cache.GetRPCContext(s.bo, loc2.Region) + ctx2, err := s.cache.GetRPCContext(s.bo, loc2.Region, kv.ReplicaReadLeader, 0) c.Assert(ctx2, IsNil) c.Assert(err, IsNil) } @@ -325,34 +490,106 @@ func (s *testRegionCacheSuite) TestSendFailedInMultipleNode(c *C) { loc, err := s.cache.LocateKey(s.bo, []byte("a")) c.Assert(err, IsNil) - ctx, err := s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer1) c.Assert(len(ctx.Meta.Peers), Equals, 3) + // verify follower to be one of store2 and store3 + seed := rand.Uint32() + ctxFollower1, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer2) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, peer3) + } + ctxFollower2, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer2) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, peer3) + } + c.Assert(ctxFollower1.Peer.Id, Equals, ctxFollower2.Peer.Id) + // send fail leader switch to 2 s.cache.OnSendFail(s.bo, ctx, false, nil) - ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer2) + // verify follower to be one of store1 and store3 + ctxFollower1, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, peer3) + } + ctxFollower2, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed+1) + c.Assert(err, IsNil) + if (seed+1)%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, peer3) + } + c.Assert(ctxFollower1.Peer.Id, Not(Equals), ctxFollower2.Peer.Id) + // send 2 fail leader switch to 3 s.cache.OnSendFail(s.bo, ctx, false, nil) - ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, peer3) + // verify follower to be one of store1 and store2 + ctxFollower1, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer2) + } + c.Assert(ctxFollower1.Peer.Id == s.peer1 || ctxFollower1.Peer.Id == s.peer2, IsTrue) + ctxFollower2, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer1) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer2) + } + c.Assert(ctxFollower1.Peer.Id, Equals, ctxFollower2.Peer.Id) + // 3 can be access, so switch to 1 s.cache.UpdateLeader(loc.Region, s.store1, ctx.PeerIdx) - ctx, err = s.cache.GetRPCContext(s.bo, loc.Region) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) c.Assert(err, IsNil) c.Assert(ctx.Peer.Id, Equals, s.peer1) + + // verify follower to be one of store2 and store3 + ctxFollower1, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed) + c.Assert(err, IsNil) + if seed%2 == 0 { + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer2) + } else { + c.Assert(ctxFollower1.Peer.Id, Equals, peer3) + } + ctxFollower2, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, seed+1) + c.Assert(err, IsNil) + if (seed+1)%2 == 0 { + c.Assert(ctxFollower2.Peer.Id, Equals, s.peer2) + } else { + c.Assert(ctxFollower2.Peer.Id, Equals, peer3) + } + c.Assert(ctxFollower1.Peer.Id, Not(Equals), ctxFollower2.Peer.Id) } func (s *testRegionCacheSuite) TestSplit(c *C) { + seed := rand.Uint32() r := s.getRegion(c, []byte("x")) c.Assert(r.GetID(), Equals, s.region1) - c.Assert(s.getAddr(c, []byte("x")), Equals, s.storeAddr(s.store1)) + c.Assert(s.getAddr(c, []byte("x"), kv.ReplicaReadLeader, 0), Equals, s.storeAddr(s.store1)) + c.Assert(s.getAddr(c, []byte("x"), kv.ReplicaReadFollower, seed), Equals, s.storeAddr(s.store2)) // split to ['' - 'm' - 'z'] region2 := s.cluster.AllocID() @@ -365,7 +602,8 @@ func (s *testRegionCacheSuite) TestSplit(c *C) { r = s.getRegion(c, []byte("x")) c.Assert(r.GetID(), Equals, region2) - c.Assert(s.getAddr(c, []byte("x")), Equals, s.storeAddr(s.store1)) + c.Assert(s.getAddr(c, []byte("x"), kv.ReplicaReadLeader, 0), Equals, s.storeAddr(s.store1)) + c.Assert(s.getAddr(c, []byte("x"), kv.ReplicaReadFollower, seed), Equals, s.storeAddr(s.store2)) s.checkCache(c, 1) r = s.getRegionWithEndKey(c, []byte("m")) @@ -397,6 +635,7 @@ func (s *testRegionCacheSuite) TestMerge(c *C) { } func (s *testRegionCacheSuite) TestReconnect(c *C) { + seed := rand.Uint32() loc, err := s.cache.LocateKey(s.bo, []byte("a")) c.Assert(err, IsNil) @@ -406,7 +645,8 @@ func (s *testRegionCacheSuite) TestReconnect(c *C) { r := s.getRegion(c, []byte("a")) c.Assert(r, NotNil) c.Assert(r.GetID(), Equals, s.region1) - c.Assert(s.getAddr(c, []byte("a")), Equals, s.storeAddr(s.store1)) + c.Assert(s.getAddr(c, []byte("a"), kv.ReplicaReadLeader, 0), Equals, s.storeAddr(s.store1)) + c.Assert(s.getAddr(c, []byte("x"), kv.ReplicaReadFollower, seed), Equals, s.storeAddr(s.store2)) s.checkCache(c, 1) } @@ -573,6 +813,37 @@ func (s *testRegionCacheSuite) TestListRegionIDsInCache(c *C) { c.Assert(regionIDs, DeepEquals, []uint64{s.region1, region2}) } +func (s *testRegionCacheSuite) TestFollowerReadFallback(c *C) { + // 3 nodes and no.1 is leader. + store3 := s.cluster.AllocID() + peer3 := s.cluster.AllocID() + s.cluster.AddStore(store3, s.storeAddr(store3)) + s.cluster.AddPeer(s.region1, store3, peer3) + s.cluster.ChangeLeader(s.region1, s.peer1) + + loc, err := s.cache.LocateKey(s.bo, []byte("a")) + c.Assert(err, IsNil) + ctx, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadLeader, 0) + c.Assert(err, IsNil) + c.Assert(ctx.Peer.Id, Equals, s.peer1) + c.Assert(len(ctx.Meta.Peers), Equals, 3) + + // verify follower to be store2 and store3 + ctxFollower1, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, 0) + c.Assert(err, IsNil) + c.Assert(ctxFollower1.Peer.Id, Equals, s.peer2) + ctxFollower2, err := s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, 1) + c.Assert(err, IsNil) + c.Assert(ctxFollower2.Peer.Id, Equals, peer3) + c.Assert(ctxFollower1.Peer.Id, Not(Equals), ctxFollower2.Peer.Id) + + // send fail on store2, next follower read is going to fallback to store3 + s.cache.OnSendFail(s.bo, ctxFollower1, false, errors.New("test error")) + ctx, err = s.cache.GetRPCContext(s.bo, loc.Region, kv.ReplicaReadFollower, 0) + c.Assert(err, IsNil) + c.Assert(ctx.Peer.Id, Equals, peer3) +} + func createSampleRegion(startKey, endKey []byte) *Region { return &Region{ meta: &metapb.Region{ diff --git a/store/tikv/region_request.go b/store/tikv/region_request.go old mode 100644 new mode 100755 index 306a6b96aa085..cbfcfc6658100 --- a/store/tikv/region_request.go +++ b/store/tikv/region_request.go @@ -98,8 +98,14 @@ func (s *RegionRequestSender) SendReqCtx(bo *Backoffer, req *tikvrpc.Request, re } }) + var replicaRead kv.ReplicaReadType + if req.ReplicaRead { + replicaRead = kv.ReplicaReadFollower + } else { + replicaRead = kv.ReplicaReadLeader + } for { - ctx, err := s.regionCache.GetRPCContext(bo, regionID) + ctx, err := s.regionCache.GetRPCContext(bo, regionID, replicaRead, req.ReplicaReadSeed) if err != nil { return nil, nil, errors.Trace(err) } diff --git a/store/tikv/region_request_test.go b/store/tikv/region_request_test.go index c087b44bccf80..9a890eb6e2b09 100644 --- a/store/tikv/region_request_test.go +++ b/store/tikv/region_request_test.go @@ -189,6 +189,11 @@ func (s *testRegionRequestSuite) TestSendReqCtx(c *C) { c.Assert(err, IsNil) c.Assert(resp.RawPut, NotNil) c.Assert(ctx, NotNil) + req.ReplicaRead = true + resp, ctx, err = s.regionRequestSender.SendReqCtx(s.bo, req, region.Region, time.Second) + c.Assert(err, IsNil) + c.Assert(resp.RawPut, NotNil) + c.Assert(ctx, NotNil) } func (s *testRegionRequestSuite) TestOnSendFailedWithCancelled(c *C) { @@ -298,6 +303,9 @@ func (s *mockTikvGrpcServer) KvPessimisticLock(context.Context, *kvrpcpb.Pessimi func (s *mockTikvGrpcServer) KVPessimisticRollback(context.Context, *kvrpcpb.PessimisticRollbackRequest) (*kvrpcpb.PessimisticRollbackResponse, error) { return nil, errors.New("unreachable") } +func (s *mockTikvGrpcServer) KvTxnHeartBeat(ctx context.Context, in *kvrpcpb.TxnHeartBeatRequest) (*kvrpcpb.TxnHeartBeatResponse, error) { + return nil, errors.New("unreachable") +} func (s *mockTikvGrpcServer) KvGC(context.Context, *kvrpcpb.GCRequest) (*kvrpcpb.GCResponse, error) { return nil, errors.New("unreachable") } @@ -364,9 +372,6 @@ func (s *mockTikvGrpcServer) BatchCommands(tikvpb.Tikv_BatchCommandsServer) erro func (s *mockTikvGrpcServer) ReadIndex(context.Context, *kvrpcpb.ReadIndexRequest) (*kvrpcpb.ReadIndexResponse, error) { return nil, errors.New("unreachable") } -func (s *mockTikvGrpcServer) KvTxnHeartBeat(ctx context.Context, in *kvrpcpb.TxnHeartBeatRequest) (*kvrpcpb.TxnHeartBeatResponse, error) { - return nil, errors.New("unreachable") -} func (s *testRegionRequestSuite) TestNoReloadRegionForGrpcWhenCtxCanceled(c *C) { // prepare a mock tikv grpc server diff --git a/store/tikv/scan.go b/store/tikv/scan.go index 4ce5979270c02..0ef220941d77e 100644 --- a/store/tikv/scan.go +++ b/store/tikv/scan.go @@ -197,7 +197,9 @@ func (s *Scanner) getData(bo *Backoffer) error { Context: pb.Context{ Priority: s.snapshot.priority, NotFillCache: s.snapshot.notFillCache, + ReplicaRead: s.snapshot.replicaRead.IsFollowerRead(), }, + ReplicaReadSeed: s.snapshot.replicaReadSeed, } if s.reverse { req.Scan.StartKey = s.nextEndKey diff --git a/store/tikv/snapshot.go b/store/tikv/snapshot.go old mode 100644 new mode 100755 index 3f86eb3cf6000..83222d75e2d7a --- a/store/tikv/snapshot.go +++ b/store/tikv/snapshot.go @@ -57,6 +57,9 @@ type tikvSnapshot struct { keyOnly bool vars *kv.Variables + replicaRead kv.ReplicaReadType + replicaReadSeed uint32 + // Cache the result of BatchGet. // The invariance is that calling BatchGet multiple times using the same start ts, // the result should not change. @@ -70,10 +73,22 @@ type tikvSnapshot struct { // newTiKVSnapshot creates a snapshot of an TiKV store. func newTiKVSnapshot(store *tikvStore, ver kv.Version) *tikvSnapshot { return &tikvSnapshot{ - store: store, - version: ver, - priority: pb.CommandPri_Normal, - vars: kv.DefaultVars, + store: store, + version: ver, + priority: pb.CommandPri_Normal, + vars: kv.DefaultVars, + replicaReadSeed: 0, + } +} + +// newTiKVSnapshot creates a snapshot of an TiKV store with replicaReadSeed. +func newTiKVSnapshotWithReplicaReadSeed(store *tikvStore, ver kv.Version, replicaReadSeed uint32) *tikvSnapshot { + return &tikvSnapshot{ + store: store, + version: ver, + priority: pb.CommandPri_Normal, + vars: kv.DefaultVars, + replicaReadSeed: replicaReadSeed, } } @@ -202,7 +217,9 @@ func (s *tikvSnapshot) batchGetSingleRegion(bo *Backoffer, batch batchKeys, coll Context: pb.Context{ Priority: s.priority, NotFillCache: s.notFillCache, + ReplicaRead: s.replicaRead.IsFollowerRead(), }, + ReplicaReadSeed: s.replicaReadSeed, } resp, err := sender.SendReq(bo, req, batch.region, ReadTimeoutMedium) if err != nil { @@ -295,7 +312,9 @@ func (s *tikvSnapshot) get(bo *Backoffer, k kv.Key) ([]byte, error) { Context: pb.Context{ Priority: s.priority, NotFillCache: s.notFillCache, + ReplicaRead: s.replicaRead.IsFollowerRead(), }, + ReplicaReadSeed: s.replicaReadSeed, } for { loc, err := s.store.regionCache.LocateKey(bo, k) @@ -355,6 +374,23 @@ func (s *tikvSnapshot) IterReverse(k kv.Key) (kv.Iterator, error) { return scanner, errors.Trace(err) } +// SetOption sets an option with a value, when val is nil, uses the default +// value of this option. Only ReplicaRead is supported for snapshot +func (s *tikvSnapshot) SetOption(opt kv.Option, val interface{}) { + switch opt { + case kv.ReplicaRead: + s.replicaRead = val.(kv.ReplicaReadType) + } +} + +// ClearFollowerRead disables follower read on current transaction +func (s *tikvSnapshot) DelOption(opt kv.Option) { + switch opt { + case kv.ReplicaRead: + s.replicaRead = kv.ReplicaReadLeader + } +} + func extractLockFromKeyErr(keyErr *pb.KeyError) (*Lock, error) { if locked := keyErr.GetLocked(); locked != nil { return NewLock(locked), nil diff --git a/store/tikv/tikvrpc/tikvrpc.go b/store/tikv/tikvrpc/tikvrpc.go index 1d88e7b25bcb3..5fc34e5e4ff8e 100644 --- a/store/tikv/tikvrpc/tikvrpc.go +++ b/store/tikv/tikvrpc/tikvrpc.go @@ -169,6 +169,8 @@ type Request struct { DebugGetRegionProperties *debugpb.GetRegionPropertiesRequest + ReplicaReadSeed uint32 + Empty *tikvpb.BatchCommandsEmptyRequest TxnHeartBeat *kvrpcpb.TxnHeartBeatRequest } diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 357b81b191969..9996d8b3251bc 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -78,13 +78,13 @@ func newTiKVTxn(store *tikvStore) (*tikvTxn, error) { if err != nil { return nil, errors.Trace(err) } - return newTikvTxnWithStartTS(store, startTS) + return newTikvTxnWithStartTS(store, startTS, store.nextReplicaReadSeed()) } // newTikvTxnWithStartTS creates a txn with startTS. -func newTikvTxnWithStartTS(store *tikvStore, startTS uint64) (*tikvTxn, error) { +func newTikvTxnWithStartTS(store *tikvStore, startTS uint64, replicaReadSeed uint32) (*tikvTxn, error) { ver := kv.NewVersion(startTS) - snapshot := newTiKVSnapshot(store, ver) + snapshot := newTiKVSnapshotWithReplicaReadSeed(store, ver, replicaReadSeed) return &tikvTxn{ snapshot: snapshot, us: kv.NewUnionStore(snapshot), diff --git a/util/admin/admin.go b/util/admin/admin.go index 3e471360bbca6..2ea4125427312 100644 --- a/util/admin/admin.go +++ b/util/admin/admin.go @@ -560,6 +560,10 @@ func ScanSnapshotTableRecord(sessCtx sessionctx.Context, store kv.Storage, ver k return nil, 0, errors.Trace(err) } + if sessCtx.GetSessionVars().ReplicaRead.IsFollowerRead() { + snap.SetOption(kv.ReplicaRead, kv.ReplicaReadFollower) + } + records, nextHandle, err := ScanTableRecord(sessCtx, snap, t, startHandle, limit) return records, nextHandle, errors.Trace(err)