Skip to content

Commit

Permalink
Fix checkAndUpdateStoreHealthStatus panicking
Browse files Browse the repository at this point in the history
Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>
  • Loading branch information
MyonKeminta committed Feb 18, 2024
1 parent c381ab5 commit cce241a
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 26 deletions.
78 changes: 57 additions & 21 deletions internal/locate/region_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -667,14 +667,7 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(*

// SetRegionCacheStore is used to set a store in region cache, for testing only
func (c *RegionCache) SetRegionCacheStore(id uint64, addr string, peerAddr string, storeType tikvrpc.EndpointType, state uint64, labels []*metapb.StoreLabel) {
c.putStore(&Store{
storeID: id,
storeType: storeType,
state: state,
labels: labels,
addr: addr,
peerAddr: peerAddr,
})
c.putStore(newStore(id, addr, peerAddr, "", storeType, resolveState(state), labels))
}

// SetPDClient replaces pd client,for testing only
Expand Down Expand Up @@ -2096,15 +2089,15 @@ func reloadTiFlashComputeStores(ctx context.Context, registry storeRegistry) (re
}
for _, s := range stores {
if s.GetState() == metapb.StoreState_Up && isStoreContainLabel(s.GetLabels(), tikvrpc.EngineLabelKey, tikvrpc.EngineLabelTiFlashCompute) {
res = append(res, &Store{
storeID: s.GetId(),
addr: s.GetAddress(),
peerAddr: s.GetPeerAddress(),
saddr: s.GetStatusAddress(),
storeType: tikvrpc.GetStoreTypeByMeta(s),
labels: s.GetLabels(),
state: uint64(resolved),
})
res = append(res, newStore(
s.GetId(),
s.GetAddress(),
s.GetPeerAddress(),
s.GetStatusAddress(),
tikvrpc.GetStoreTypeByMeta(s),
resolved,
s.GetLabels(),
))
}
}
return res, nil
Expand Down Expand Up @@ -2516,7 +2509,7 @@ type StoreHealthStatus struct {
// A statistic for counting the request latency to this store
clientSideSlowScore SlowScoreStat

tikvSideSlowScore *struct {
tikvSideSlowScore struct {
// Assuming the type `Store` is always used in heap instead of in stack
sync.Mutex

Expand Down Expand Up @@ -2664,7 +2657,7 @@ type Store struct {
livenessState uint32
unreachableSince time.Time

healthStatus StoreHealthStatus
healthStatus *StoreHealthStatus
// A statistic for counting the flows of different replicas on this store
replicaFlowsStats [numReplicaFlowsType]uint64
}
Expand Down Expand Up @@ -2710,6 +2703,37 @@ func (s resolveState) String() string {
}
}

func newStore(
id uint64,
addr string,
peerAddr string,
statusAddr string,
storeType tikvrpc.EndpointType,
state resolveState,
labels []*metapb.StoreLabel,
) *Store {
return &Store{
storeID: id,
storeType: storeType,
state: uint64(state),
labels: labels,
addr: addr,
peerAddr: peerAddr,
saddr: statusAddr,
// Make sure healthStatus field is never null.
healthStatus: &StoreHealthStatus{},
}
}

// newUninitializedStore creates a `Store` instance with only storeID initialized.
func newUninitializedStore(id uint64) *Store {
return &Store{
storeID: id,
// Make sure healthStatus field is never null.
healthStatus: &StoreHealthStatus{},
}
}

// IsTiFlash returns true if the storeType is TiFlash
func (s *Store) IsTiFlash() bool {
return s.storeType == tikvrpc.TiFlash
Expand Down Expand Up @@ -2810,7 +2834,15 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) {
storeType := tikvrpc.GetStoreTypeByMeta(store)
addr = store.GetAddress()
if s.addr != addr || !s.IsSameLabels(store.GetLabels()) {
newStore := &Store{storeID: s.storeID, addr: addr, peerAddr: store.GetPeerAddress(), saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels(), state: uint64(resolved)}
newStore := newStore(
s.storeID,
addr,
store.GetPeerAddress(),
store.GetStatusAddress(),
storeType,
resolved,
store.GetLabels(),
)
newStore.livenessState = atomic.LoadUint32(&s.livenessState)
newStore.unreachableSince = s.unreachableSince
if s.addr == addr {
Expand Down Expand Up @@ -3196,12 +3228,16 @@ func (c *RegionCache) checkAndUpdateStoreHealthStats() {
zap.Any("r", r),
zap.Stack("stack trace"))
}
if _, err := util.EvalFailpoint("doNotRecoverStoreHealthCheckPanic"); err == nil {
panic(r)
}
}()
healthDetails := make(map[uint64]HealthStatusDetail)
c.forEachStore(func(store *Store) {
store.healthStatus.update()
healthDetails[store.storeID] = store.healthStatus.GetHealthStatusDetail()
})
logutil.BgLogger().Info("checkAndUpdateStoreHealthStats: get health details", zap.Reflect("details", healthDetails))
for store, details := range healthDetails {
metrics.TiKVStoreSlowScoreGauge.WithLabelValues(strconv.FormatUint(store, 10)).Set(float64(details.ClientSideSlowScore))
metrics.TiKVFeedbackSlowScoreGauge.WithLabelValues(strconv.FormatUint(store, 10)).Set(float64(details.TiKVSideSlowScore))
Expand Down Expand Up @@ -3345,7 +3381,7 @@ func (c *RegionCache) getStoreOrInsertDefault(id uint64) *Store {
c.storeMu.Lock()
store, exists := c.storeMu.stores[id]
if !exists {
store = &Store{storeID: id}
store = newUninitializedStore(id)
c.storeMu.stores[id] = store
}
c.storeMu.Unlock()
Expand Down
7 changes: 6 additions & 1 deletion internal/locate/region_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
"unsafe"

"github.com/gogo/protobuf/proto"
"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/suite"
Expand Down Expand Up @@ -100,6 +101,8 @@ func (s *testRegionCacheSuite) SetupTest() {
pdCli := &CodecPDClient{mocktikv.NewPDClient(s.cluster), apicodec.NewCodecV1(apicodec.ModeTxn)}
s.cache = NewRegionCache(pdCli)
s.bo = retry.NewBackofferWithVars(context.Background(), 5000, nil)

s.NoError(failpoint.Enable("tikvclient/doNotRecoverStoreHealthCheckPanic", "return"))
}

func (s *testRegionCacheSuite) TearDownTest() {
Expand All @@ -108,6 +111,8 @@ func (s *testRegionCacheSuite) TearDownTest() {
if s.onClosed != nil {
s.onClosed()
}

s.NoError(failpoint.Disable("tikvclient/doNotRecoverStoreHealthCheckPanic"))
}

func (s *testRegionCacheSuite) storeAddr(id uint64) string {
Expand Down Expand Up @@ -1932,7 +1937,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestRefreshCache() {
region, _ := s.cache.LocateRegionByID(s.bo, s.region)
v2 := region.Region.confVer + 1
r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}}
st := &Store{storeID: s.store}
st := newUninitializedStore(s.store)
s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), ttl: nextTTLWithoutJitter(time.Now().Unix())}, true, true)

r, _ = s.cache.scanRegionsFromCache(s.bo, []byte{}, nil, 10)
Expand Down
10 changes: 8 additions & 2 deletions internal/locate/region_request3_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ func (s *testRegionRequestToThreeStoresSuite) SetupTest() {
s.bo = retry.NewNoopBackoff(context.Background())
client := mocktikv.NewRPCClient(s.cluster, s.mvccStore, nil)
s.regionRequestSender = NewRegionRequestSender(s.cache, client)

s.NoError(failpoint.Enable("tikvclient/doNotRecoverStoreHealthCheckPanic", "return"))
}

func (s *testRegionRequestToThreeStoresSuite) TearDownTest() {
Expand All @@ -97,6 +99,8 @@ func (s *testRegionRequestToThreeStoresSuite) TearDownTest() {
if s.onClosed != nil {
s.onClosed()
}

s.NoError(failpoint.Disable("tikvclient/doNotRecoverStoreHealthCheckPanic"))
}

func (s *testRegionRequestToThreeStoresSuite) TestStoreTokenLimit() {
Expand Down Expand Up @@ -404,7 +408,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() {
tikvLearner := &metapb.Peer{Id: s.cluster.AllocID(), StoreId: storeID, Role: metapb.PeerRole_Learner}
tikvLearnerAccessIdx := len(regionStore.stores)
regionStore.accessIndex[tiKVOnly] = append(regionStore.accessIndex[tiKVOnly], tikvLearnerAccessIdx)
regionStore.stores = append(regionStore.stores, &Store{storeID: tikvLearner.StoreId})
regionStore.stores = append(regionStore.stores, newUninitializedStore(tikvLearner.StoreId))
regionStore.storeEpochs = append(regionStore.storeEpochs, 0)

region = &Region{
Expand Down Expand Up @@ -455,7 +459,9 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
// Add a TiFlash peer to the region.
tiflash := &metapb.Peer{Id: s.cluster.AllocID(), StoreId: s.cluster.AllocID()}
regionStore.accessIndex[tiFlashOnly] = append(regionStore.accessIndex[tiFlashOnly], len(regionStore.stores))
regionStore.stores = append(regionStore.stores, &Store{storeID: tiflash.StoreId, storeType: tikvrpc.TiFlash})
tiflashStore := newUninitializedStore(tiflash.StoreId)
tiflashStore.storeType = tikvrpc.TiFlash
regionStore.stores = append(regionStore.stores, tiflashStore)
regionStore.storeEpochs = append(regionStore.storeEpochs, 0)

region = &Region{
Expand Down
9 changes: 7 additions & 2 deletions internal/locate/region_request_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"time"
"unsafe"

"github.com/pingcap/failpoint"
"github.com/pingcap/kvproto/pkg/coprocessor"
"github.com/pingcap/kvproto/pkg/disaggregated"
"github.com/pingcap/kvproto/pkg/errorpb"
Expand Down Expand Up @@ -89,11 +90,15 @@ func (s *testRegionRequestToSingleStoreSuite) SetupTest() {
s.bo = retry.NewNoopBackoff(context.Background())
client := mocktikv.NewRPCClient(s.cluster, s.mvccStore, nil)
s.regionRequestSender = NewRegionRequestSender(s.cache, client)

s.NoError(failpoint.Enable("tikvclient/doNotRecoverStoreHealthCheckPanic", "return"))
}

func (s *testRegionRequestToSingleStoreSuite) TearDownTest() {
s.cache.Close()
s.mvccStore.Close()

s.NoError(failpoint.Disable("tikvclient/doNotRecoverStoreHealthCheckPanic"))
}

type fnClient struct {
Expand Down Expand Up @@ -620,7 +625,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestGetRegionByIDFromCache() {
// test kv load new region with new start-key and new epoch
v2 := region.Region.confVer + 1
r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}}
st := &Store{storeID: s.store}
st := newUninitializedStore(s.store)
s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), ttl: nextTTLWithoutJitter(time.Now().Unix())}, true, true)
region, err = s.cache.LocateRegionByID(s.bo, s.region)
s.Nil(err)
Expand All @@ -630,7 +635,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestGetRegionByIDFromCache() {

v3 := region.Region.confVer + 1
r3 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: v3, ConfVer: region.Region.confVer}, StartKey: []byte{2}}
st = &Store{storeID: s.store}
st = newUninitializedStore(s.store)
s.cache.insertRegionToCache(&Region{meta: &r3, store: unsafe.Pointer(st), ttl: nextTTLWithoutJitter(time.Now().Unix())}, true, true)
region, err = s.cache.LocateRegionByID(s.bo, s.region)
s.Nil(err)
Expand Down

0 comments on commit cce241a

Please sign in to comment.