Skip to content

Commit

Permalink
remove old duplicated task
Browse files Browse the repository at this point in the history
Signed-off-by: Ryan Leung <rleungx@gmail.com>
  • Loading branch information
rleungx committed May 31, 2024
1 parent c498063 commit 2d43787
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 20 deletions.
2 changes: 2 additions & 0 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,6 +755,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
debug = func(msg string, fields ...zap.Field) {
logRunner.RunTask(
ctx.Context,
region.GetID(),

Check warning on line 758 in pkg/core/region.go

View check run for this annotation

Codecov / codecov/patch

pkg/core/region.go#L758

Added line #L758 was not covered by tests
"DebugLog",
func(_ context.Context) {
d(msg, fields...)
Expand All @@ -764,6 +765,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
info = func(msg string, fields ...zap.Field) {
logRunner.RunTask(
ctx.Context,
region.GetID(),
"InfoLog",
func(_ context.Context) {
i(msg, fields...)
Expand Down
7 changes: 6 additions & 1 deletion pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,13 +607,14 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
_, saveCache, _, retained := core.GenerateRegionGuideFunc(true)(ctx, region, origin)

regionID := region.GetID()
if !saveCache {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
ctx.TaskRunner.RunTask(
ctx,
regionID,

Check warning on line 617 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L617

Added line #L617 was not covered by tests
ratelimit.ObserveRegionStatsAsync,
func(_ context.Context) {
if c.regionStats.RegionStatsNeedUpdate(region) {
Expand All @@ -626,6 +627,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
if origin.GetRef() < 2 {
ctx.TaskRunner.RunTask(
ctx,
regionID,

Check warning on line 630 in pkg/mcs/scheduling/server/cluster.go

View check run for this annotation

Codecov / codecov/patch

pkg/mcs/scheduling/server/cluster.go#L630

Added line #L630 was not covered by tests
ratelimit.UpdateSubTree,
func(_ context.Context) {
c.CheckAndPutSubTree(region)
Expand All @@ -650,6 +652,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
}
ctx.TaskRunner.RunTask(
ctx,
regionID,
ratelimit.UpdateSubTree,
func(_ context.Context) {
c.CheckAndPutSubTree(region)
Expand All @@ -659,6 +662,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
tracer.OnUpdateSubTreeFinished()
ctx.TaskRunner.RunTask(
ctx,
regionID,
ratelimit.HandleOverlaps,
func(_ context.Context) {
cluster.HandleOverlaps(c, overlaps)
Expand All @@ -669,6 +673,7 @@ func (c *Cluster) processRegionHeartbeat(ctx *core.MetaProcessContext, region *c
// handle region stats
ctx.TaskRunner.RunTask(
ctx,
regionID,
ratelimit.CollectRegionStatsAsync,
func(_ context.Context) {
cluster.Collect(c, region, hasRegionStats)
Expand Down
55 changes: 36 additions & 19 deletions pkg/ratelimit/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
package ratelimit

import (
"container/list"
"context"
"errors"
"fmt"
"sync"
"time"

Expand All @@ -42,14 +44,15 @@ const (

// Runner is the interface for running tasks.
type Runner interface {
RunTask(ctx context.Context, name string, f func(context.Context), opts ...TaskOption) error
RunTask(ctx context.Context, regionID uint64, name string, f func(context.Context), opts ...TaskOption) error
Start()
Stop()
}

// Task is a task to be run.
type Task struct {
ctx context.Context
regionID uint64
submittedAt time.Time
f func(context.Context)
name string
Expand All @@ -66,11 +69,12 @@ type ConcurrentRunner struct {
limiter *ConcurrencyLimiter
maxPendingDuration time.Duration
taskChan chan *Task
pendingTasks []*Task
pendingMu sync.Mutex
stopChan chan struct{}
wg sync.WaitGroup
pendingTaskCount map[string]int64
pendingTasks *list.List
pendingRegionTasks map[string]*list.Element
maxWaitingDuration prometheus.Gauge
}

Expand All @@ -81,8 +85,9 @@ func NewConcurrentRunner(name string, limiter *ConcurrencyLimiter, maxPendingDur
limiter: limiter,
maxPendingDuration: maxPendingDuration,
taskChan: make(chan *Task),
pendingTasks: make([]*Task, 0, initialCapacity),
pendingTasks: list.New(),
pendingTaskCount: make(map[string]int64),
pendingRegionTasks: make(map[string]*list.Element),
maxWaitingDuration: RunnerTaskMaxWaitingDuration.WithLabelValues(name),
}
return s
Expand All @@ -101,6 +106,7 @@ func (cr *ConcurrentRunner) Start() {
cr.stopChan = make(chan struct{})
cr.wg.Add(1)
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
go func() {
defer cr.wg.Done()
for {
Expand All @@ -117,15 +123,15 @@ func (cr *ConcurrentRunner) Start() {
}
case <-cr.stopChan:
cr.pendingMu.Lock()
cr.pendingTasks = make([]*Task, 0, initialCapacity)
cr.pendingTasks = list.New()
cr.pendingMu.Unlock()
log.Info("stopping async task runner", zap.String("name", cr.name))
return
case <-ticker.C:
maxDuration := time.Duration(0)
cr.pendingMu.Lock()
if len(cr.pendingTasks) > 0 {
maxDuration = time.Since(cr.pendingTasks[0].submittedAt)
if cr.pendingTasks.Len() > 0 {
maxDuration = time.Since(cr.pendingTasks.Front().Value.(*Task).submittedAt)

Check warning on line 134 in pkg/ratelimit/runner.go

View check run for this annotation

Codecov / codecov/patch

pkg/ratelimit/runner.go#L133-L134

Added lines #L133 - L134 were not covered by tests
}
for taskName, cnt := range cr.pendingTaskCount {
RunnerPendingTasks.WithLabelValues(cr.name, taskName).Set(float64(cnt))
Expand All @@ -151,12 +157,13 @@ func (cr *ConcurrentRunner) run(task *Task, token *TaskToken) {
func (cr *ConcurrentRunner) processPendingTasks() {
cr.pendingMu.Lock()
defer cr.pendingMu.Unlock()
if len(cr.pendingTasks) > 0 {
task := cr.pendingTasks[0]
if cr.pendingTasks.Len() > 0 {
task := cr.pendingTasks.Front().Value.(*Task)
select {
case cr.taskChan <- task:
cr.pendingTasks = cr.pendingTasks[1:]
cr.pendingTasks.Remove(cr.pendingTasks.Front())
cr.pendingTaskCount[task.name]--
delete(cr.pendingRegionTasks, fmt.Sprintf("%d-%s", task.regionID, task.name))
default:
}
return
Expand All @@ -170,11 +177,13 @@ func (cr *ConcurrentRunner) Stop() {
}

// RunTask runs the task asynchronously.
func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(context.Context), opts ...TaskOption) error {
func (cr *ConcurrentRunner) RunTask(ctx context.Context, regionID uint64, name string, f func(context.Context), opts ...TaskOption) error {
task := &Task{
ctx: ctx,
name: name,
f: f,
ctx: ctx,
regionID: regionID,
name: name,
f: f,
submittedAt: time.Now(),
}
for _, opt := range opts {
opt(task)
Expand All @@ -186,24 +195,32 @@ func (cr *ConcurrentRunner) RunTask(ctx context.Context, name string, f func(con
cr.processPendingTasks()
}()

pendingTaskNum := len(cr.pendingTasks)
pendingTaskNum := cr.pendingTasks.Len()
taskID := fmt.Sprintf("%d-%s", regionID, name)
if pendingTaskNum > 0 {
if element, ok := cr.pendingRegionTasks[taskID]; ok {
// Update the task in pendingTasks
element.Value = task
// Update the task in pendingRegionTasks
cr.pendingRegionTasks[taskID] = element
return nil
}
if !task.retained {
maxWait := time.Since(cr.pendingTasks[0].submittedAt)
maxWait := time.Since(cr.pendingTasks.Front().Value.(*Task).submittedAt)
if maxWait > cr.maxPendingDuration {
RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc()
return ErrMaxWaitingTasksExceeded
}
}
// We use the max task number to limit the memory usage.
// It occupies around 1.5GB memory when there is 20000000 pending task.
if len(cr.pendingTasks) > maxPendingTaskNum {
if pendingTaskNum > maxPendingTaskNum {
RunnerFailedTasks.WithLabelValues(cr.name, task.name).Inc()
return ErrMaxWaitingTasksExceeded
}
}
task.submittedAt = time.Now()
cr.pendingTasks = append(cr.pendingTasks, task)
element := cr.pendingTasks.PushBack(task)
cr.pendingRegionTasks[taskID] = element
cr.pendingTaskCount[task.name]++
return nil
}
Expand All @@ -217,7 +234,7 @@ func NewSyncRunner() *SyncRunner {
}

// RunTask runs the task synchronously.
func (*SyncRunner) RunTask(ctx context.Context, _ string, f func(context.Context), _ ...TaskOption) error {
func (*SyncRunner) RunTask(ctx context.Context, _ uint64, _ string, f func(context.Context), _ ...TaskOption) error {
f(ctx)
return nil
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/ratelimit/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func TestConcurrentRunner(t *testing.T) {
wg.Add(1)
err := runner.RunTask(
context.Background(),
uint64(i),
"test1",
func(context.Context) {
defer wg.Done()
Expand All @@ -55,6 +56,7 @@ func TestConcurrentRunner(t *testing.T) {
wg.Add(1)
err := runner.RunTask(
context.Background(),
uint64(i),
"test2",
func(context.Context) {
defer wg.Done()
Expand Down
7 changes: 7 additions & 0 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1038,6 +1038,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
// Save to cache if meta or leader is updated, or contains any down/pending peer.
saveKV, saveCache, needSync, retained := regionGuide(ctx, region, origin)
tracer.OnRegionGuideFinished()
regionID := region.GetID()
if !saveKV && !saveCache {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
Expand All @@ -1047,6 +1048,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
ctx.MiscRunner.RunTask(
ctx.Context,
regionID,
ratelimit.ObserveRegionStatsAsync,
func(_ context.Context) {
if c.regionStats.RegionStatsNeedUpdate(region) {
Expand All @@ -1059,6 +1061,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
if origin.GetRef() < 2 {
ctx.TaskRunner.RunTask(
ctx,
regionID,
ratelimit.UpdateSubTree,
func(_ context.Context) {
c.CheckAndPutSubTree(region)
Expand Down Expand Up @@ -1087,6 +1090,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
}
ctx.TaskRunner.RunTask(
ctx,
regionID,
ratelimit.UpdateSubTree,
func(_ context.Context) {
c.CheckAndPutSubTree(region)
Expand All @@ -1098,6 +1102,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
ctx.MiscRunner.RunTask(
ctx.Context,
regionID,
ratelimit.HandleOverlaps,
func(_ context.Context) {
cluster.HandleOverlaps(c, overlaps)
Expand All @@ -1111,6 +1116,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
// handle region stats
ctx.MiscRunner.RunTask(
ctx.Context,
regionID,
ratelimit.CollectRegionStatsAsync,
func(_ context.Context) {
// TODO: Due to the accuracy requirements of the API "/regions/check/xxx",
Expand All @@ -1125,6 +1131,7 @@ func (c *RaftCluster) processRegionHeartbeat(ctx *core.MetaProcessContext, regio
if saveKV {
ctx.MiscRunner.RunTask(
ctx.Context,
regionID,
ratelimit.SaveRegionToKV,
func(_ context.Context) {
// If there are concurrent heartbeats from the same region, the last write will win even if
Expand Down

0 comments on commit 2d43787

Please sign in to comment.