Skip to content

Commit

Permalink
Merge branch 'cache_in_hot_region' of github.com:bufferflies/pd into …
Browse files Browse the repository at this point in the history
…cache_in_hot_region
  • Loading branch information
bufferflies committed Apr 24, 2023
2 parents b38d88e + c2bcf91 commit d9176f9
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 19 deletions.
14 changes: 8 additions & 6 deletions pkg/core/storelimit/limit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"container/list"
"context"
"math/rand"
"sync/atomic"
"testing"
"time"

Expand Down Expand Up @@ -125,21 +126,22 @@ func TestFeedback(t *testing.T) {
}
// region size is 10GB, snapshot write limit is 100MB/s and the snapshot concurrency is 3.
// the best strategy is that the tikv executing queue equals the wait.
regionSize, limit, wait := int64(10000), int64(100), int64(4)
iter := 100
const regionSize, limit, wait = int64(10000), int64(100), int64(4)
var iter atomic.Int32
iter.Store(100)
ops := make(chan int64, 10)
ctx, cancel := context.WithCancel(context.Background())

// generate the operator
go func() {
for {
if s.Available(regionSize, SendSnapshot, constant.Low) && iter > 0 {
iter--
if s.Available(regionSize, SendSnapshot, constant.Low) && iter.Load() > 0 {
iter.Add(-1)
size := regionSize - rand.Int63n(regionSize/10)
s.Take(size, SendSnapshot, constant.Low)
ops <- size
}
if iter == 0 {
if iter.Load() == 0 {
cancel()
return
}
Expand Down Expand Up @@ -185,7 +187,7 @@ func TestFeedback(t *testing.T) {
err := exec*wait - cost
queue.Remove(first)
s.Feedback(float64(err))
if iter < 5 {
if iter.Load() < 5 {
re.Greater(float64(s.GetCap()), float64(regionSize*(wait-2)))
re.Less(float64(s.GetCap()), float64(regionSize*wait))
}
Expand Down
18 changes: 16 additions & 2 deletions pkg/statistics/region_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,22 @@ func (r *RegionStatistics) Collect() {

// Reset resets the metrics of the regions' status.
func (r *RegionStatistics) Reset() {
regionStatusGauge.Reset()
offlineRegionStatusGauge.Reset()
regionMissPeerRegionCounter.Set(0)
regionExtraPeerRegionCounter.Set(0)
regionDownPeerRegionCounter.Set(0)
regionPendingPeerRegionCounter.Set(0)
regionLearnerPeerRegionCounter.Set(0)
regionEmptyRegionCounter.Set(0)
regionOversizedRegionCounter.Set(0)
regionUndersizedRegionCounter.Set(0)
regionWitnesssLeaderRegionCounter.Set(0)

offlineMissPeerRegionCounter.Set(0)
offlineExtraPeerRegionCounter.Set(0)
offlineDownPeerRegionCounter.Set(0)
offlinePendingPeerRegionCounter.Set(0)
offlineLearnerPeerRegionCounter.Set(0)
offlineOfflinePeerRegionCounter.Set(0)
}

// LabelStatistics is the statistics of the level of labels.
Expand Down
26 changes: 15 additions & 11 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"strconv"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/coreos/go-semver/semver"
Expand Down Expand Up @@ -134,7 +133,7 @@ type RaftCluster struct {
etcdClient *clientv3.Client
httpClient *http.Client

running atomic.Bool
running bool
meta *metapb.Cluster
storeConfigManager *config.StoreConfigManager
storage storage.Storage
Expand Down Expand Up @@ -258,14 +257,14 @@ func (c *RaftCluster) InitCluster(

// Start starts a cluster.
func (c *RaftCluster) Start(s Server) error {
if c.IsRunning() {
c.Lock()
defer c.Unlock()

if c.running {
log.Warn("raft cluster has already been started")
return nil
}

c.Lock()
defer c.Unlock()

c.InitCluster(s.GetAllocator(), s.GetPersistOptions(), s.GetStorage(), s.GetBasicCluster(), s.GetKeyspaceGroupManager())
cluster, err := c.LoadClusterInfo()
if err != nil {
Expand Down Expand Up @@ -317,7 +316,7 @@ func (c *RaftCluster) Start(s Server) error {
go c.runUpdateStoreStats()
go c.startGCTuner()

c.running.Store(true)
c.running = true
return nil
}

Expand Down Expand Up @@ -605,26 +604,31 @@ func (c *RaftCluster) runReplicationMode() {
// Stop stops the cluster.
func (c *RaftCluster) Stop() {
c.Lock()
if !c.running.CompareAndSwap(true, false) {
if !c.running {
c.Unlock()
return
}

c.running = false
c.coordinator.stop()
c.cancel()
c.Unlock()

c.wg.Wait()
log.Info("raftcluster is stopped")
}

// IsRunning return if the cluster is running.
func (c *RaftCluster) IsRunning() bool {
return c.running.Load()
c.RLock()
defer c.RUnlock()
return c.running
}

// Context returns the context of RaftCluster.
func (c *RaftCluster) Context() context.Context {
if c.running.Load() {
c.RLock()
defer c.RUnlock()
if c.running {
return c.ctx
}
return nil
Expand Down
3 changes: 3 additions & 0 deletions server/config/store_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,9 @@ func (c *StoreConfig) IsEnableRegionBucket() bool {

// IsRaftKV2 returns true if the raft kv is v2.
func (c *StoreConfig) IsRaftKV2() bool {
if c == nil {
return false
}
return c.Storage.Engine == raftStoreV2
}

Expand Down

0 comments on commit d9176f9

Please sign in to comment.