diff --git a/pkg/core/storelimit/limit_test.go b/pkg/core/storelimit/limit_test.go index 2b418952bb6..6f57c01eccb 100644 --- a/pkg/core/storelimit/limit_test.go +++ b/pkg/core/storelimit/limit_test.go @@ -18,6 +18,7 @@ import ( "container/list" "context" "math/rand" + "sync/atomic" "testing" "time" @@ -125,21 +126,22 @@ func TestFeedback(t *testing.T) { } // region size is 10GB, snapshot write limit is 100MB/s and the snapshot concurrency is 3. // the best strategy is that the tikv executing queue equals the wait. - regionSize, limit, wait := int64(10000), int64(100), int64(4) - iter := 100 + const regionSize, limit, wait = int64(10000), int64(100), int64(4) + var iter atomic.Int32 + iter.Store(100) ops := make(chan int64, 10) ctx, cancel := context.WithCancel(context.Background()) // generate the operator go func() { for { - if s.Available(regionSize, SendSnapshot, constant.Low) && iter > 0 { - iter-- + if s.Available(regionSize, SendSnapshot, constant.Low) && iter.Load() > 0 { + iter.Add(-1) size := regionSize - rand.Int63n(regionSize/10) s.Take(size, SendSnapshot, constant.Low) ops <- size } - if iter == 0 { + if iter.Load() == 0 { cancel() return } @@ -185,7 +187,7 @@ func TestFeedback(t *testing.T) { err := exec*wait - cost queue.Remove(first) s.Feedback(float64(err)) - if iter < 5 { + if iter.Load() < 5 { re.Greater(float64(s.GetCap()), float64(regionSize*(wait-2))) re.Less(float64(s.GetCap()), float64(regionSize*wait)) } diff --git a/pkg/statistics/region_collection.go b/pkg/statistics/region_collection.go index 72c2bc1bb91..23791a14514 100644 --- a/pkg/statistics/region_collection.go +++ b/pkg/statistics/region_collection.go @@ -318,8 +318,22 @@ func (r *RegionStatistics) Collect() { // Reset resets the metrics of the regions' status. func (r *RegionStatistics) Reset() { - regionStatusGauge.Reset() - offlineRegionStatusGauge.Reset() + regionMissPeerRegionCounter.Set(0) + regionExtraPeerRegionCounter.Set(0) + regionDownPeerRegionCounter.Set(0) + regionPendingPeerRegionCounter.Set(0) + regionLearnerPeerRegionCounter.Set(0) + regionEmptyRegionCounter.Set(0) + regionOversizedRegionCounter.Set(0) + regionUndersizedRegionCounter.Set(0) + regionWitnesssLeaderRegionCounter.Set(0) + + offlineMissPeerRegionCounter.Set(0) + offlineExtraPeerRegionCounter.Set(0) + offlineDownPeerRegionCounter.Set(0) + offlinePendingPeerRegionCounter.Set(0) + offlineLearnerPeerRegionCounter.Set(0) + offlineOfflinePeerRegionCounter.Set(0) } // LabelStatistics is the statistics of the level of labels. diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index c2bc7a908b0..69710be506c 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -22,7 +22,6 @@ import ( "strconv" "strings" "sync" - "sync/atomic" "time" "github.com/coreos/go-semver/semver" @@ -134,7 +133,7 @@ type RaftCluster struct { etcdClient *clientv3.Client httpClient *http.Client - running atomic.Bool + running bool meta *metapb.Cluster storeConfigManager *config.StoreConfigManager storage storage.Storage @@ -258,14 +257,14 @@ func (c *RaftCluster) InitCluster( // Start starts a cluster. func (c *RaftCluster) Start(s Server) error { - if c.IsRunning() { + c.Lock() + defer c.Unlock() + + if c.running { log.Warn("raft cluster has already been started") return nil } - c.Lock() - defer c.Unlock() - c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetStorage(), s.GetBasicCluster(), s.GetKeyspaceGroupManager()) cluster, err := c.LoadClusterInfo() if err != nil { @@ -317,7 +316,7 @@ func (c *RaftCluster) Start(s Server) error { go c.runUpdateStoreStats() go c.startGCTuner() - c.running.Store(true) + c.running = true return nil } @@ -605,26 +604,31 @@ func (c *RaftCluster) runReplicationMode() { // Stop stops the cluster. func (c *RaftCluster) Stop() { c.Lock() - if !c.running.CompareAndSwap(true, false) { + if !c.running { c.Unlock() return } - + c.running = false c.coordinator.stop() c.cancel() c.Unlock() + c.wg.Wait() log.Info("raftcluster is stopped") } // IsRunning return if the cluster is running. func (c *RaftCluster) IsRunning() bool { - return c.running.Load() + c.RLock() + defer c.RUnlock() + return c.running } // Context returns the context of RaftCluster. func (c *RaftCluster) Context() context.Context { - if c.running.Load() { + c.RLock() + defer c.RUnlock() + if c.running { return c.ctx } return nil diff --git a/server/config/store_config.go b/server/config/store_config.go index 9df390b8e7c..a468ebae0ae 100644 --- a/server/config/store_config.go +++ b/server/config/store_config.go @@ -130,6 +130,9 @@ func (c *StoreConfig) IsEnableRegionBucket() bool { // IsRaftKV2 returns true if the raft kv is v2. func (c *StoreConfig) IsRaftKV2() bool { + if c == nil { + return false + } return c.Storage.Engine == raftStoreV2 }