Skip to content

Commit

Permalink
Support learner replica read (#643)
Browse files Browse the repository at this point in the history
Signed-off-by: Yang Zhang <yang.zhang@pingcap.com>
  • Loading branch information
v01dstar authored Jan 11, 2023
1 parent c598334 commit 1b1a805
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 30 deletions.
12 changes: 8 additions & 4 deletions internal/locate/region_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,9 @@ type replicaSelector struct {
// selectorState is the interface of states of the replicaSelector.
// Here is the main state transition diagram:
//
// exceeding maxReplicaAttempt
// +-------------------+ || RPC failure && unreachable && no forwarding
// exceeding maxReplicaAttempt
// +-------------------+ || RPC failure && unreachable && no forwarding
//
// +-------->+ accessKnownLeader +----------------+
// | +------+------------+ |
// | | |
Expand All @@ -282,7 +283,8 @@ type replicaSelector struct {
// | leader becomes v +---+---+
// | reachable +-----+-----+ all proxies are tried ^
// +------------+tryNewProxy+-------------------------+
// +-----------+
//
// +-----------+
type selectorState interface {
next(*retry.Backoffer, *replicaSelector) (*RPCContext, error)
onSendSuccess(*replicaSelector)
Expand Down Expand Up @@ -520,6 +522,7 @@ type accessFollower struct {
option storeSelectorOp
leaderIdx AccessIndex
lastIdx AccessIndex
learnerOnly bool
}

func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
Expand Down Expand Up @@ -589,7 +592,7 @@ func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool
// The request can only be sent to the leader.
((state.option.leaderOnly && idx == state.leaderIdx) ||
// Choose a replica with matched labels.
(!state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) && replica.store.IsLabelsMatch(state.option.labels)))
(!state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) && replica.store.IsLabelsMatch(state.option.labels) && (!state.learnerOnly || replica.peer.Role == metapb.PeerRole_Learner)))
}

type invalidStore struct {
Expand Down Expand Up @@ -647,6 +650,7 @@ func newReplicaSelector(regionCache *RegionCache, regionID RegionVerID, req *tik
option: option,
leaderIdx: regionStore.workTiKVIdx,
lastIdx: -1,
learnerOnly: req.ReplicaReadType == kv.ReplicaReadLearner,
}
}

Expand Down
104 changes: 78 additions & 26 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ package locate

import (
"context"
"fmt"
"sync/atomic"
"testing"
"time"
Expand Down Expand Up @@ -276,6 +277,69 @@ func refreshEpochs(regionStore *regionStore) {
}
}

func AssertRPCCtxEqual(s *testRegionRequestToThreeStoresSuite, rpcCtx *RPCContext, target *replica, proxy *replica) {
s.Equal(rpcCtx.Store, target.store)
s.Equal(rpcCtx.Peer, target.peer)
s.Equal(rpcCtx.Addr, target.store.addr)
s.Equal(rpcCtx.AccessMode, tiKVOnly)
if proxy != nil {
s.Equal(rpcCtx.ProxyStore, proxy.store)
s.Equal(rpcCtx.ProxyAddr, proxy.store.addr)
}
}

func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() {
regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID)
s.Nil(err)
s.NotNil(regionLoc)
region := s.cache.GetCachedRegionWithRLock(regionLoc.Region)
regionStore := region.getStore()
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}, kvrpcpb.Context{})

// Create a fake region and change its leader to the last peer.
regionStore = regionStore.clone()
regionStore.workTiKVIdx = AccessIndex(len(regionStore.stores) - 1)
sidx, _ := regionStore.accessStore(tiKVOnly, regionStore.workTiKVIdx)
regionStore.stores[sidx].epoch++

// Add a TiKV learner peer to the region.
storeID := s.cluster.AllocID()
s.cluster.AddStore(storeID, fmt.Sprintf("store%d", storeID))
tikvLearner := &metapb.Peer{Id: s.cluster.AllocID(), StoreId: storeID, Role: metapb.PeerRole_Learner}
tikvLearnerAccessIdx := len(regionStore.stores)
regionStore.accessIndex[tiKVOnly] = append(regionStore.accessIndex[tiKVOnly], tikvLearnerAccessIdx)
regionStore.stores = append(regionStore.stores, &Store{storeID: tikvLearner.StoreId})
regionStore.storeEpochs = append(regionStore.storeEpochs, 0)

region = &Region{
meta: region.GetMeta(),
}
region.lastAccess = time.Now().Unix()
region.meta.Peers = append(region.meta.Peers, tikvLearner)
atomic.StorePointer(&region.store, unsafe.Pointer(regionStore))

cache := NewRegionCache(s.cache.pdClient)
defer cache.Close()
cache.insertRegionToCache(region)

// Test accessFollower state with kv.ReplicaReadLearner request type.
region.lastAccess = time.Now().Unix()
refreshEpochs(regionStore)
req.ReplicaReadType = kv.ReplicaReadLearner
replicaSelector, err := newReplicaSelector(cache, regionLoc.Region, req)
s.NotNil(replicaSelector)
s.Nil(err)

accessLearner, _ := replicaSelector.state.(*accessFollower)
// Invalidate the region if the leader is not in the region.
region.lastAccess = time.Now().Unix()
rpcCtx, err := replicaSelector.next(s.bo)
s.Nil(err)
// Should swith to the next follower.
s.Equal(AccessIndex(tikvLearnerAccessIdx), accessLearner.lastIdx)
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil)
}

func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
regionLoc, err := s.cache.LocateRegionByID(s.bo, s.regionID)
s.Nil(err)
Expand All @@ -291,16 +355,16 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
regionStore.stores[sidx].epoch++
regionStore.storeEpochs[sidx]++
// Add a TiFlash peer to the region.
peer := &metapb.Peer{Id: s.cluster.AllocID(), StoreId: s.cluster.AllocID()}
tiflash := &metapb.Peer{Id: s.cluster.AllocID(), StoreId: s.cluster.AllocID()}
regionStore.accessIndex[tiFlashOnly] = append(regionStore.accessIndex[tiFlashOnly], len(regionStore.stores))
regionStore.stores = append(regionStore.stores, &Store{storeID: peer.StoreId, storeType: tikvrpc.TiFlash})
regionStore.stores = append(regionStore.stores, &Store{storeID: tiflash.StoreId, storeType: tikvrpc.TiFlash})
regionStore.storeEpochs = append(regionStore.storeEpochs, 0)

region = &Region{
meta: region.GetMeta(),
}
region.lastAccess = time.Now().Unix()
region.meta.Peers = append(region.meta.Peers, peer)
region.meta.Peers = append(region.meta.Peers, tiflash)
atomic.StorePointer(&region.store, unsafe.Pointer(regionStore))

cache := NewRegionCache(s.cache.pdClient)
Expand Down Expand Up @@ -330,24 +394,13 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
}
}

assertRPCCtxEqual := func(rpcCtx *RPCContext, target *replica, proxy *replica) {
s.Equal(rpcCtx.Store, target.store)
s.Equal(rpcCtx.Peer, target.peer)
s.Equal(rpcCtx.Addr, target.store.addr)
s.Equal(rpcCtx.AccessMode, tiKVOnly)
if proxy != nil {
s.Equal(rpcCtx.ProxyStore, proxy.store)
s.Equal(rpcCtx.ProxyAddr, proxy.store.addr)
}
}

// Test accessKnownLeader state
s.IsType(&accessKnownLeader{}, replicaSelector.state)
// Try the leader for maxReplicaAttempt times
for i := 1; i <= maxReplicaAttempt; i++ {
rpcCtx, err := replicaSelector.next(s.bo)
s.Nil(err)
assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil)
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil)
s.IsType(&accessKnownLeader{}, replicaSelector.state)
s.Equal(replicaSelector.replicas[regionStore.workTiKVIdx].attempts, i)
}
Expand All @@ -361,7 +414,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
s.Equal(regionStore.workTiKVIdx, state.leaderIdx)
s.NotEqual(state.lastIdx, regionStore.workTiKVIdx)
s.Equal(replicaSelector.targetIdx, state.lastIdx)
assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil)
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil)
s.Equal(replicaSelector.targetReplica().attempts, 1)
}
// In tryFollower state, if all replicas are tried, nil RPCContext should be returned
Expand All @@ -388,7 +441,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
s.Nil(err)
s.IsType(&tryFollower{}, replicaSelector.state)
s.NotEqual(replicaSelector.targetIdx, regionStore.workTiKVIdx)
assertRPCCtxEqual(rpcCtx, replicaSelector.targetReplica(), nil)
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.targetReplica(), nil)
s.Equal(replicaSelector.targetReplica().attempts, 1)
// If the NotLeader errors provides an unreachable leader, do not switch to it.
replicaSelector.onNotLeader(s.bo, rpcCtx, &errorpb.NotLeader{
Expand Down Expand Up @@ -454,7 +507,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
s.Equal(regionStore.workTiKVIdx, state.leaderIdx)
s.Equal(AccessIndex(2), replicaSelector.targetIdx)
s.NotEqual(AccessIndex(2), replicaSelector.proxyIdx)
assertRPCCtxEqual(rpcCtx, replicaSelector.targetReplica(), replicaSelector.proxyReplica())
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.targetReplica(), replicaSelector.proxyReplica())
s.Equal(replicaSelector.targetReplica().attempts, 1)
s.Equal(replicaSelector.proxyReplica().attempts, 1)

Expand Down Expand Up @@ -488,14 +541,14 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
s.Equal(regionStore.workTiKVIdx, state2.leaderIdx)
_, err = replicaSelector.next(s.bo)
s.Nil(err)
assertRPCCtxEqual(rpcCtx, replicaSelector.targetReplica(), replicaSelector.proxyReplica())
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.targetReplica(), replicaSelector.proxyReplica())

// Switch to tryNewProxy if the current proxy is not available
replicaSelector.onSendFailure(s.bo, nil)
s.IsType(&tryNewProxy{}, replicaSelector.state)
rpcCtx, err = replicaSelector.next(s.bo)
s.Nil(err)
assertRPCCtxEqual(rpcCtx, replicaSelector.targetReplica(), replicaSelector.proxyReplica())
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.targetReplica(), replicaSelector.proxyReplica())
s.Equal(regionStore.workTiKVIdx, state2.leaderIdx)
s.Equal(AccessIndex(2), replicaSelector.targetIdx)
s.NotEqual(regionStore.proxyTiKVIdx, replicaSelector.proxyIdx)
Expand All @@ -510,7 +563,6 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
s.NotNil(replicaSelector)
state3, ok := replicaSelector.state.(*accessFollower)
s.True(ok)
s.False(state3.tryLeader)
s.Equal(regionStore.workTiKVIdx, state3.leaderIdx)
s.Equal(state3.lastIdx, AccessIndex(-1))

Expand All @@ -523,15 +575,15 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
// Shouldn't access the leader if followers aren't exhausted.
s.NotEqual(regionStore.workTiKVIdx, state3.lastIdx)
s.Equal(replicaSelector.targetIdx, state3.lastIdx)
assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil)
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil)
lastIdx = state3.lastIdx
}
// Fallback to the leader for 1 time
rpcCtx, err = replicaSelector.next(s.bo)
s.Nil(err)
s.Equal(regionStore.workTiKVIdx, state3.lastIdx)
s.Equal(replicaSelector.targetIdx, state3.lastIdx)
assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil)
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil)
// All replicas are exhausted.
rpcCtx, err = replicaSelector.next(s.bo)
s.Nil(rpcCtx)
Expand All @@ -554,7 +606,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
s.Nil(err)
s.Equal(regionStore.workTiKVIdx, state3.lastIdx)
s.Equal(replicaSelector.targetIdx, state3.lastIdx)
assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil)
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil)

// Test accessFollower state filtering label-not-match stores.
region.lastAccess = time.Now().Unix()
Expand All @@ -575,7 +627,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
s.Nil(err)
rpcCtx, err = replicaSelector.next(s.bo)
s.Nil(err)
assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[accessIdx], nil)
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[accessIdx], nil)
}

// Test accessFollower state with leaderOnly option
Expand All @@ -588,7 +640,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
rpcCtx, err = replicaSelector.next(s.bo)
s.Nil(err)
// Should always access the leader.
assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil)
AssertRPCCtxEqual(s, rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil)
}

// Test accessFollower state with kv.ReplicaReadMixed request type.
Expand Down
2 changes: 2 additions & 0 deletions kv/store_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ const (
ReplicaReadFollower
// ReplicaReadMixed stands for 'read from leader and follower and learner'.
ReplicaReadMixed
// ReplicaReadLearner stands for 'read from learner'.
ReplicaReadLearner
)

// IsFollowerRead checks if follower is going to be used to read data.
Expand Down

0 comments on commit 1b1a805

Please sign in to comment.