From 55e96836f51fc6e87623e88e23079c40e1ec495c Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Sun, 18 Feb 2024 17:13:02 +0800 Subject: [PATCH] Fix checkAndUpdateStoreHealthStatus panicking Signed-off-by: MyonKeminta --- internal/locate/region_cache.go | 77 ++++++++++++++++++------- internal/locate/region_cache_test.go | 7 ++- internal/locate/region_request3_test.go | 10 +++- internal/locate/region_request_test.go | 9 ++- 4 files changed, 77 insertions(+), 26 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index 8bf3a52cb..c829f903a 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -667,14 +667,7 @@ func (c *RegionCache) checkAndResolve(needCheckStores []*Store, needCheck func(* // SetRegionCacheStore is used to set a store in region cache, for testing only func (c *RegionCache) SetRegionCacheStore(id uint64, addr string, peerAddr string, storeType tikvrpc.EndpointType, state uint64, labels []*metapb.StoreLabel) { - c.putStore(&Store{ - storeID: id, - storeType: storeType, - state: state, - labels: labels, - addr: addr, - peerAddr: peerAddr, - }) + c.putStore(newStore(id, addr, peerAddr, "", storeType, resolveState(state), labels)) } // SetPDClient replaces pd client,for testing only @@ -2096,15 +2089,15 @@ func reloadTiFlashComputeStores(ctx context.Context, registry storeRegistry) (re } for _, s := range stores { if s.GetState() == metapb.StoreState_Up && isStoreContainLabel(s.GetLabels(), tikvrpc.EngineLabelKey, tikvrpc.EngineLabelTiFlashCompute) { - res = append(res, &Store{ - storeID: s.GetId(), - addr: s.GetAddress(), - peerAddr: s.GetPeerAddress(), - saddr: s.GetStatusAddress(), - storeType: tikvrpc.GetStoreTypeByMeta(s), - labels: s.GetLabels(), - state: uint64(resolved), - }) + res = append(res, newStore( + s.GetId(), + s.GetAddress(), + s.GetPeerAddress(), + s.GetStatusAddress(), + tikvrpc.GetStoreTypeByMeta(s), + resolved, + s.GetLabels(), + )) } } return res, nil @@ -2516,7 +2509,7 @@ type StoreHealthStatus struct { // A statistic for counting the request latency to this store clientSideSlowScore SlowScoreStat - tikvSideSlowScore *struct { + tikvSideSlowScore struct { // Assuming the type `Store` is always used in heap instead of in stack sync.Mutex @@ -2664,7 +2657,7 @@ type Store struct { livenessState uint32 unreachableSince time.Time - healthStatus StoreHealthStatus + healthStatus *StoreHealthStatus // A statistic for counting the flows of different replicas on this store replicaFlowsStats [numReplicaFlowsType]uint64 } @@ -2710,6 +2703,37 @@ func (s resolveState) String() string { } } +func newStore( + id uint64, + addr string, + peerAddr string, + statusAddr string, + storeType tikvrpc.EndpointType, + state resolveState, + labels []*metapb.StoreLabel, +) *Store { + return &Store{ + storeID: id, + storeType: storeType, + state: uint64(state), + labels: labels, + addr: addr, + peerAddr: peerAddr, + saddr: statusAddr, + // Make sure healthStatus field is never null. + healthStatus: &StoreHealthStatus{}, + } +} + +// newUninitializedStore creates a `Store` instance with only storeID initialized. +func newUninitializedStore(id uint64) *Store { + return &Store{ + storeID: id, + // Make sure healthStatus field is never null. + healthStatus: &StoreHealthStatus{}, + } +} + // IsTiFlash returns true if the storeType is TiFlash func (s *Store) IsTiFlash() bool { return s.storeType == tikvrpc.TiFlash @@ -2810,7 +2834,15 @@ func (s *Store) reResolve(c *RegionCache) (bool, error) { storeType := tikvrpc.GetStoreTypeByMeta(store) addr = store.GetAddress() if s.addr != addr || !s.IsSameLabels(store.GetLabels()) { - newStore := &Store{storeID: s.storeID, addr: addr, peerAddr: store.GetPeerAddress(), saddr: store.GetStatusAddress(), storeType: storeType, labels: store.GetLabels(), state: uint64(resolved)} + newStore := newStore( + s.storeID, + addr, + store.GetPeerAddress(), + store.GetStatusAddress(), + storeType, + resolved, + store.GetLabels(), + ) newStore.livenessState = atomic.LoadUint32(&s.livenessState) newStore.unreachableSince = s.unreachableSince if s.addr == addr { @@ -3196,6 +3228,9 @@ func (c *RegionCache) checkAndUpdateStoreHealthStats() { zap.Any("r", r), zap.Stack("stack trace")) } + if _, err := util.EvalFailpoint("doNotRecoverStoreHealthCheckPanic"); err == nil { + panic(r) + } }() healthDetails := make(map[uint64]HealthStatusDetail) c.forEachStore(func(store *Store) { @@ -3345,7 +3380,7 @@ func (c *RegionCache) getStoreOrInsertDefault(id uint64) *Store { c.storeMu.Lock() store, exists := c.storeMu.stores[id] if !exists { - store = &Store{storeID: id} + store = newUninitializedStore(id) c.storeMu.stores[id] = store } c.storeMu.Unlock() diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 2d6d9aab3..db8382aa5 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -47,6 +47,7 @@ import ( "unsafe" "github.com/gogo/protobuf/proto" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/errorpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/suite" @@ -100,6 +101,8 @@ func (s *testRegionCacheSuite) SetupTest() { pdCli := &CodecPDClient{mocktikv.NewPDClient(s.cluster), apicodec.NewCodecV1(apicodec.ModeTxn)} s.cache = NewRegionCache(pdCli) s.bo = retry.NewBackofferWithVars(context.Background(), 5000, nil) + + s.NoError(failpoint.Enable("tikvclient/doNotRecoverStoreHealthCheckPanic", "return")) } func (s *testRegionCacheSuite) TearDownTest() { @@ -108,6 +111,8 @@ func (s *testRegionCacheSuite) TearDownTest() { if s.onClosed != nil { s.onClosed() } + + s.NoError(failpoint.Disable("tikvclient/doNotRecoverStoreHealthCheckPanic")) } func (s *testRegionCacheSuite) storeAddr(id uint64) string { @@ -1932,7 +1937,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestRefreshCache() { region, _ := s.cache.LocateRegionByID(s.bo, s.region) v2 := region.Region.confVer + 1 r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}} - st := &Store{storeID: s.store} + st := newUninitializedStore(s.store) s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), ttl: nextTTLWithoutJitter(time.Now().Unix())}, true, true) r, _ = s.cache.scanRegionsFromCache(s.bo, []byte{}, nil, 10) diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index c5cf930a1..dd6e3e8a3 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -89,6 +89,8 @@ func (s *testRegionRequestToThreeStoresSuite) SetupTest() { s.bo = retry.NewNoopBackoff(context.Background()) client := mocktikv.NewRPCClient(s.cluster, s.mvccStore, nil) s.regionRequestSender = NewRegionRequestSender(s.cache, client) + + s.NoError(failpoint.Enable("tikvclient/doNotRecoverStoreHealthCheckPanic", "return")) } func (s *testRegionRequestToThreeStoresSuite) TearDownTest() { @@ -97,6 +99,8 @@ func (s *testRegionRequestToThreeStoresSuite) TearDownTest() { if s.onClosed != nil { s.onClosed() } + + s.NoError(failpoint.Disable("tikvclient/doNotRecoverStoreHealthCheckPanic")) } func (s *testRegionRequestToThreeStoresSuite) TestStoreTokenLimit() { @@ -404,7 +408,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestLearnerReplicaSelector() { tikvLearner := &metapb.Peer{Id: s.cluster.AllocID(), StoreId: storeID, Role: metapb.PeerRole_Learner} tikvLearnerAccessIdx := len(regionStore.stores) regionStore.accessIndex[tiKVOnly] = append(regionStore.accessIndex[tiKVOnly], tikvLearnerAccessIdx) - regionStore.stores = append(regionStore.stores, &Store{storeID: tikvLearner.StoreId}) + regionStore.stores = append(regionStore.stores, newUninitializedStore(tikvLearner.StoreId)) regionStore.storeEpochs = append(regionStore.storeEpochs, 0) region = &Region{ @@ -455,7 +459,9 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { // Add a TiFlash peer to the region. tiflash := &metapb.Peer{Id: s.cluster.AllocID(), StoreId: s.cluster.AllocID()} regionStore.accessIndex[tiFlashOnly] = append(regionStore.accessIndex[tiFlashOnly], len(regionStore.stores)) - regionStore.stores = append(regionStore.stores, &Store{storeID: tiflash.StoreId, storeType: tikvrpc.TiFlash}) + tiflashStore := newUninitializedStore(tiflash.StoreId) + tiflashStore.storeType = tikvrpc.TiFlash + regionStore.stores = append(regionStore.stores, tiflashStore) regionStore.storeEpochs = append(regionStore.storeEpochs, 0) region = &Region{ diff --git a/internal/locate/region_request_test.go b/internal/locate/region_request_test.go index 72cc1ac4a..d0dd6480a 100644 --- a/internal/locate/region_request_test.go +++ b/internal/locate/region_request_test.go @@ -45,6 +45,7 @@ import ( "time" "unsafe" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/coprocessor" "github.com/pingcap/kvproto/pkg/disaggregated" "github.com/pingcap/kvproto/pkg/errorpb" @@ -89,11 +90,15 @@ func (s *testRegionRequestToSingleStoreSuite) SetupTest() { s.bo = retry.NewNoopBackoff(context.Background()) client := mocktikv.NewRPCClient(s.cluster, s.mvccStore, nil) s.regionRequestSender = NewRegionRequestSender(s.cache, client) + + s.NoError(failpoint.Enable("tikvclient/doNotRecoverStoreHealthCheckPanic", "return")) } func (s *testRegionRequestToSingleStoreSuite) TearDownTest() { s.cache.Close() s.mvccStore.Close() + + s.NoError(failpoint.Disable("tikvclient/doNotRecoverStoreHealthCheckPanic")) } type fnClient struct { @@ -620,7 +625,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestGetRegionByIDFromCache() { // test kv load new region with new start-key and new epoch v2 := region.Region.confVer + 1 r2 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: region.Region.ver, ConfVer: v2}, StartKey: []byte{1}} - st := &Store{storeID: s.store} + st := newUninitializedStore(s.store) s.cache.insertRegionToCache(&Region{meta: &r2, store: unsafe.Pointer(st), ttl: nextTTLWithoutJitter(time.Now().Unix())}, true, true) region, err = s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err) @@ -630,7 +635,7 @@ func (s *testRegionRequestToSingleStoreSuite) TestGetRegionByIDFromCache() { v3 := region.Region.confVer + 1 r3 := metapb.Region{Id: region.Region.id, RegionEpoch: &metapb.RegionEpoch{Version: v3, ConfVer: region.Region.confVer}, StartKey: []byte{2}} - st = &Store{storeID: s.store} + st = newUninitializedStore(s.store) s.cache.insertRegionToCache(&Region{meta: &r3, store: unsafe.Pointer(st), ttl: nextTTLWithoutJitter(time.Now().Unix())}, true, true) region, err = s.cache.LocateRegionByID(s.bo, s.region) s.Nil(err)