Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/master' into ignore_ru_for_bac…
Browse files Browse the repository at this point in the history
…kground
  • Loading branch information
HuSharp committed Jul 13, 2023
2 parents 3936a77 + 51633ad commit 42b1ac2
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 10 deletions.
4 changes: 4 additions & 0 deletions config/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ type TiKVClient struct {
// StoreLivenessTimeout is the timeout for store liveness check request.
StoreLivenessTimeout string `toml:"store-liveness-timeout" json:"store-liveness-timeout"`
CoprCache CoprocessorCache `toml:"copr-cache" json:"copr-cache"`
// CoprReqTimeout is the timeout for a single coprocessor request
// Note: this is a transitional modification, and it will be removed if it's dynamic configurable version is ready.
CoprReqTimeout time.Duration `toml:"copr-req-timeout" json:"copr-req-timeout"`
// TTLRefreshedTxnSize controls whether a transaction should update its TTL or not.
TTLRefreshedTxnSize int64 `toml:"ttl-refreshed-txn-size" json:"ttl-refreshed-txn-size"`
ResolveLockLiteThreshold uint64 `toml:"resolve-lock-lite-threshold" json:"resolve-lock-lite-threshold"`
Expand Down Expand Up @@ -150,6 +153,7 @@ func DefaultTiKVClient() TiKVClient {
AdmissionMaxResultMB: 10,
AdmissionMinProcessMs: 5,
},
CoprReqTimeout: 60 * time.Second,

ResolveLockLiteThreshold: 16,
}
Expand Down
37 changes: 27 additions & 10 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ type regionStore struct {
// buckets is not accurate and it can change even if the region is not changed.
// It can be stale and buckets keys can be out of the region range.
buckets *metapb.Buckets
// record all storeIDs on which pending peers reside.
// key is storeID, val is peerID.
pendingTiFlashPeerStores map[uint64]uint64
}

func (r *regionStore) accessStore(mode accessMode, idx AccessIndex) (int, *Store) {
Expand Down Expand Up @@ -269,11 +272,12 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Regio
// regionStore pull used store from global store map
// to avoid acquire storeMu in later access.
rs := &regionStore{
workTiKVIdx: 0,
proxyTiKVIdx: -1,
stores: make([]*Store, 0, len(r.meta.Peers)),
storeEpochs: make([]uint32, 0, len(r.meta.Peers)),
buckets: pdRegion.Buckets,
workTiKVIdx: 0,
proxyTiKVIdx: -1,
stores: make([]*Store, 0, len(r.meta.Peers)),
pendingTiFlashPeerStores: map[uint64]uint64{},
storeEpochs: make([]uint32, 0, len(r.meta.Peers)),
buckets: pdRegion.Buckets,
}

leader := pdRegion.Leader
Expand Down Expand Up @@ -314,6 +318,11 @@ func newRegion(bo *retry.Backoffer, c *RegionCache, pdRegion *pd.Region) (*Regio
}
rs.stores = append(rs.stores, store)
rs.storeEpochs = append(rs.storeEpochs, atomic.LoadUint32(&store.epoch))
for _, pendingPeer := range pdRegion.PendingPeers {
if pendingPeer.Id == p.Id {
rs.pendingTiFlashPeerStores[store.storeID] = p.Id
}
}
}
// TODO(youjiali1995): It's possible the region info in PD is stale for now but it can recover.
// Maybe we need backoff here.
Expand Down Expand Up @@ -780,23 +789,26 @@ func (c *RegionCache) GetTiKVRPCContext(bo *retry.Backoffer, id RegionVerID, rep
}

// GetAllValidTiFlashStores returns the store ids of all valid TiFlash stores, the store id of currentStore is always the first one
func (c *RegionCache) GetAllValidTiFlashStores(id RegionVerID, currentStore *Store, labelFilter LabelFilter) []uint64 {
// Caller may use `nonPendingStores` first, this can avoid task need to wait tiflash replica syncing from tikv.
// But if all tiflash peers are pending(len(nonPendingStores) == 0), use `allStores` is also ok.
func (c *RegionCache) GetAllValidTiFlashStores(id RegionVerID, currentStore *Store, labelFilter LabelFilter) ([]uint64, []uint64) {
// set the cap to 2 because usually, TiFlash table will have 2 replicas
allStores := make([]uint64, 0, 2)
nonPendingStores := make([]uint64, 0, 2)
// make sure currentStore id is always the first in allStores
allStores = append(allStores, currentStore.storeID)
ts := time.Now().Unix()
cachedRegion := c.GetCachedRegionWithRLock(id)
if cachedRegion == nil {
return allStores
return allStores, nonPendingStores
}
if !cachedRegion.checkRegionCacheTTL(ts) {
return allStores
return allStores, nonPendingStores
}
regionStore := cachedRegion.getStore()
currentIndex := regionStore.getAccessIndex(tiFlashOnly, currentStore)
if currentIndex == -1 {
return allStores
return allStores, nonPendingStores
}
for startOffset := 1; startOffset < regionStore.accessStoreNum(tiFlashOnly); startOffset++ {
accessIdx := AccessIndex((int(currentIndex) + startOffset) % regionStore.accessStoreNum(tiFlashOnly))
Expand All @@ -813,7 +825,12 @@ func (c *RegionCache) GetAllValidTiFlashStores(id RegionVerID, currentStore *Sto
}
allStores = append(allStores, store.storeID)
}
return allStores
for _, storeID := range allStores {
if _, ok := regionStore.pendingTiFlashPeerStores[storeID]; !ok {
nonPendingStores = append(nonPendingStores, storeID)
}
}
return allStores, nonPendingStores
}

// GetTiFlashRPCContext returns RPCContext for a region must access flash store. If it returns nil, the region
Expand Down

0 comments on commit 42b1ac2

Please sign in to comment.