Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed Jul 16, 2024
1 parent cc5160f commit 5941965
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 25 deletions.
12 changes: 6 additions & 6 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,9 @@ func NewCluster(parentCtx context.Context, persistConfig *config.PersistConfig,
clusterID: clusterID,
checkMembershipCh: checkMembershipCh,

heartbeatRunner: ratelimit.NewConcurrentRunner(ctx, heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
miscRunner: ratelimit.NewConcurrentRunner(ctx, miscTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
logRunner: ratelimit.NewConcurrentRunner(ctx, logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
miscRunner: ratelimit.NewConcurrentRunner(miscTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
}
c.coordinator = schedule.NewCoordinator(ctx, c, hbStreams)
err = c.ruleManager.Initialize(persistConfig.GetMaxReplicas(), persistConfig.GetLocationLabels(), persistConfig.GetIsolationLevel())
Expand Down Expand Up @@ -549,9 +549,9 @@ func (c *Cluster) StartBackgroundJobs() {
go c.runUpdateStoreStats()
go c.runCoordinator()
go c.runMetricsCollectionJob()
c.heartbeatRunner.Start()
c.miscRunner.Start()
c.logRunner.Start()
c.heartbeatRunner.Start(c.ctx)
c.miscRunner.Start(c.ctx)
c.logRunner.Start(c.ctx)
c.running.Store(true)
}

Expand Down
12 changes: 5 additions & 7 deletions pkg/ratelimit/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const (
// Runner is the interface for running tasks.
type Runner interface {
RunTask(id uint64, name string, f func(), opts ...TaskOption) error
Start()
Start(ctx context.Context)
Stop()
}

Expand Down Expand Up @@ -81,11 +81,8 @@ type ConcurrentRunner struct {
}

// NewConcurrentRunner creates a new ConcurrentRunner.
func NewConcurrentRunner(ctx context.Context, name string, limiter *ConcurrencyLimiter, maxPendingDuration time.Duration) *ConcurrentRunner {
ctx, cancel := context.WithCancel(ctx)
func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDuration time.Duration) *ConcurrentRunner {
s := &ConcurrentRunner{
ctx: ctx,
cancel: cancel,
name: name,
limiter: limiter,
maxPendingDuration: maxPendingDuration,
Expand All @@ -107,7 +104,8 @@ func WithRetained(retained bool) TaskOption {
}

// Start starts the runner.
func (cr *ConcurrentRunner) Start() {
func (cr *ConcurrentRunner) Start(ctx context.Context) {
cr.ctx, cr.cancel = context.WithCancel(ctx)
cr.wg.Add(1)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
Expand Down Expand Up @@ -246,7 +244,7 @@ func (*SyncRunner) RunTask(_ uint64, _ string, f func(), _ ...TaskOption) error
}

// Start starts the runner.
func (*SyncRunner) Start() {}
func (*SyncRunner) Start(context.Context) {}

// Stop stops the runner.
func (*SyncRunner) Stop() {}
12 changes: 6 additions & 6 deletions pkg/ratelimit/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ import (

func TestConcurrentRunner(t *testing.T) {
t.Run("RunTask", func(t *testing.T) {
runner := NewConcurrentRunner(context.TODO(), "test", NewConcurrencyLimiter(1), time.Second)
runner.Start()
runner := NewConcurrentRunner("test", NewConcurrencyLimiter(1), time.Second)
runner.Start(context.TODO())
defer runner.Stop()

var wg sync.WaitGroup
Expand All @@ -47,8 +47,8 @@ func TestConcurrentRunner(t *testing.T) {
})

t.Run("MaxPendingDuration", func(t *testing.T) {
runner := NewConcurrentRunner(context.TODO(), "test", NewConcurrencyLimiter(1), 2*time.Millisecond)
runner.Start()
runner := NewConcurrentRunner("test", NewConcurrencyLimiter(1), 2*time.Millisecond)
runner.Start(context.TODO())
defer runner.Stop()
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
Expand Down Expand Up @@ -76,8 +76,8 @@ func TestConcurrentRunner(t *testing.T) {
})

t.Run("DuplicatedTask", func(t *testing.T) {
runner := NewConcurrentRunner(context.TODO(), "test", NewConcurrencyLimiter(1), time.Minute)
runner.Start()
runner := NewConcurrentRunner("test", NewConcurrencyLimiter(1), time.Minute)
runner.Start(context.TODO())
defer runner.Stop()
for i := 1; i < 11; i++ {
regionID := uint64(i)
Expand Down
12 changes: 6 additions & 6 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,9 +204,9 @@ func NewRaftCluster(ctx context.Context, clusterID uint64, basicCluster *core.Ba
etcdClient: etcdClient,
BasicCluster: basicCluster,
storage: storage,
heartbeatRunner: ratelimit.NewConcurrentRunner(ctx, heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
miscRunner: ratelimit.NewConcurrentRunner(ctx, miscTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
logRunner: ratelimit.NewConcurrentRunner(ctx, logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
heartbeatRunner: ratelimit.NewConcurrentRunner(heartbeatTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
miscRunner: ratelimit.NewConcurrentRunner(miscTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
logRunner: ratelimit.NewConcurrentRunner(logTaskRunner, ratelimit.NewConcurrencyLimiter(uint64(runtime.NumCPU()*2)), time.Minute),
}
}

Expand Down Expand Up @@ -364,9 +364,9 @@ func (c *RaftCluster) Start(s Server) error {
go c.startGCTuner()

c.running = true
c.heartbeatRunner.Start()
c.miscRunner.Start()
c.logRunner.Start()
c.heartbeatRunner.Start(c.ctx)
c.miscRunner.Start(c.ctx)
c.logRunner.Start(c.ctx)
return nil
}

Expand Down

0 comments on commit 5941965

Please sign in to comment.