Skip to content

Commit

Permalink
Remove "reset" method of ScheduledTaskRoutine to fix bugs (#369)
Browse files Browse the repository at this point in the history
Signed-off-by: Daxin Wang <daxinwang@harmonycloud.cn>
  • Loading branch information
dxsup authored Dec 6, 2022
1 parent d56e936 commit 47be8af
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 175 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@


### Bug fixes
- Remove "reset" method of ScheduledTaskRoutine to fix a potential dead-lock issue. ([#369])(https://github.com/KindlingProject/kindling/pull/369)
- Fix the bug where the pod metadata with persistent IP in the map is deleted incorrectly due to the deleting mechanism with a delay. ([#374](https://github.com/KindlingProject/kindling/pull/374))
- Fix the bug that when the response is nil, the NAT IP and port are not added to the labels of the "DataGroup". ([#378](https://github.com/KindlingProject/kindling/pull/378))
- Fix potential deadlock of exited thread delay queue. ([#373](https://github.com/KindlingProject/kindling/pull/373))
Expand Down
9 changes: 7 additions & 2 deletions collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import (
"sync"
"time"

"go.uber.org/atomic"
"go.uber.org/zap/zapcore"

"github.com/Kindling-project/kindling/collector/pkg/component"
"github.com/Kindling-project/kindling/collector/pkg/component/analyzer"
"github.com/Kindling-project/kindling/collector/pkg/component/consumer"
"github.com/Kindling-project/kindling/collector/pkg/model"
"github.com/Kindling-project/kindling/collector/pkg/model/constnames"
"go.uber.org/zap/zapcore"
)

const (
Expand All @@ -23,6 +25,7 @@ type CpuAnalyzer struct {
cpuPidEvents map[uint32]map[uint32]*TimeSegments
// { pid: routine }
sendEventsRoutineMap sync.Map
routineSize *atomic.Int32
lock sync.Mutex
telemetry *component.TelemetryTools
tidExpiredQueue *tidDeleteQueue
Expand All @@ -43,10 +46,12 @@ func NewCpuAnalyzer(cfg interface{}, telemetry *component.TelemetryTools, consum
cfg: config,
telemetry: telemetry,
nextConsumers: consumers,
routineSize: atomic.NewInt32(0),
}
ca.cpuPidEvents = make(map[uint32]map[uint32]*TimeSegments, 100000)
ca.tidExpiredQueue = newTidDeleteQueue()
go ca.TidDelete(30*time.Second, 10*time.Second)
newSelfMetrics(telemetry.MeterProvider, ca)
return ca
}

Expand All @@ -56,7 +61,7 @@ func (ca *CpuAnalyzer) Start() error {
}

func (ca *CpuAnalyzer) Shutdown() error {
ca.StopProfile()
_ = ca.StopProfile()
return nil
}

Expand Down
82 changes: 82 additions & 0 deletions collector/pkg/component/analyzer/cpuanalyzer/scheduled_task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package cpuanalyzer

import (
"errors"
"time"

"go.uber.org/atomic"
)

type ScheduledTask interface {
run()
}

type ScheduledTaskRoutine struct {
expiredDuration time.Duration
ticker *time.Ticker
timer *time.Timer
stopCh chan struct{}

task ScheduledTask
isRunning *atomic.Bool

expiredCallback func()
}

// NewAndStartScheduledTaskRoutine creates a new routine and start it immediately.
func NewAndStartScheduledTaskRoutine(
tickerDuration time.Duration,
expiredDuration time.Duration,
task ScheduledTask,
expiredCallback func()) *ScheduledTaskRoutine {
ret := &ScheduledTaskRoutine{
expiredDuration: expiredDuration,
ticker: time.NewTicker(tickerDuration),
timer: time.NewTimer(expiredDuration),
task: task,
isRunning: atomic.NewBool(false),
stopCh: make(chan struct{}),
expiredCallback: expiredCallback,
}
// Start the routine once it is created.
_ = ret.Start()
return ret
}

func (s *ScheduledTaskRoutine) Start() error {
swapped := s.isRunning.CAS(false, true)
if !swapped {
return errors.New("the routine has been started")
}
go func() {
if s.expiredCallback != nil {
defer s.expiredCallback()
}
for {
select {
case <-s.ticker.C:
// do some work
s.task.run()
case <-s.timer.C:
// The current task is expired.
s.isRunning.CAS(true, false)
s.ticker.Stop()
return
case <-s.stopCh:
s.timer.Stop()
s.ticker.Stop()
return
}
}
}()
return nil
}

func (s *ScheduledTaskRoutine) Stop() error {
swapped := s.isRunning.CAS(true, false)
if !swapped {
return errors.New("the routine is not running")
}
s.stopCh <- struct{}{}
return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package cpuanalyzer

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestScheduledTask(t *testing.T) {
// test case 1: Normal expired exit
task1 := &testIncrementTask{0}
routine1 := NewAndStartScheduledTaskRoutine(1*time.Millisecond, 5*time.Millisecond, task1, nil)
_ = routine1.Start()
time.Sleep(10 * time.Millisecond)
assert.Equal(t, false, routine1.isRunning.Load())
assert.Equal(t, 5, task1.count)

// Case 2: Double start or double stop
task3 := &testIncrementTask{0}
routine3 := NewAndStartScheduledTaskRoutine(1*time.Millisecond, 5*time.Millisecond, task3, nil)
err := routine3.Start()
assert.Error(t, err)
err = routine3.Stop()
assert.NoError(t, err)
err = routine3.Stop()
assert.Error(t, err)
}

type testIncrementTask struct {
count int
}

func (t *testIncrementTask) run() {
t.count++
}
24 changes: 24 additions & 0 deletions collector/pkg/component/analyzer/cpuanalyzer/self_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package cpuanalyzer

import (
"context"
"sync"

"go.opentelemetry.io/otel/metric"
)

var onceMetric sync.Once

const (
goroutineSize = "kindling_telemetry_cpuanalyzer_routine_size"
)

func newSelfMetrics(meterProvider metric.MeterProvider, analyzer *CpuAnalyzer) {
onceMetric.Do(func() {
meter := metric.Must(meterProvider.Meter("kindling"))
meter.NewInt64GaugeObserver(goroutineSize,
func(ctx context.Context, result metric.Int64ObserverResult) {
result.Observe(int64(analyzer.routineSize.Load()))
})
})
}
129 changes: 8 additions & 121 deletions collector/pkg/component/analyzer/cpuanalyzer/send_trigger.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package cpuanalyzer

import (
"errors"
"sync"
"time"

"github.com/Kindling-project/kindling/collector/pkg/filepathhelper"
"github.com/Kindling-project/kindling/collector/pkg/model"
"github.com/Kindling-project/kindling/collector/pkg/model/constlabels"
"github.com/Kindling-project/kindling/collector/pkg/model/constvalues"
"sync"
"time"

"go.uber.org/atomic"
)

var eventsWindowsDuration = 6 * time.Second
Expand Down Expand Up @@ -61,126 +59,15 @@ func (ca *CpuAnalyzer) ReceiveSendSignal() {
_ = nexConsumer.Consume(sendContent.OriginalData)
}
task := &SendEventsTask{0, ca, &sendContent}
// "Load" and "Delete" could be executed concurrently
value, ok := ca.sendEventsRoutineMap.Load(sendContent.Pid)
if !ok {
ca.putNewRoutine(task)
} else {
// The routine may be found before but stopped and deleted after entering this branch.
// So we much check twice.
routine, _ := value.(*ScheduledTaskRoutine)
// TODO Always replacing the task may cause that some events are skipped when the "spendTime"
// becomes much smaller than before.
err := routine.ResetExpiredTimerWithNewTask(task)
if err != nil {
// The routine has been expired.
ca.putNewRoutine(task)
}
expiredCallback := func() {
ca.routineSize.Dec()
}
// The expired duration should be windowDuration+1 because the ticker and the timer are not started together.
NewAndStartScheduledTaskRoutine(1*time.Second, eventsWindowsDuration+1, task, expiredCallback)
ca.routineSize.Inc()
}
}

func (ca *CpuAnalyzer) putNewRoutine(task *SendEventsTask) {
expiredCallback := func() {
ca.sendEventsRoutineMap.Delete(task.triggerEvent.Pid)
}
// The expired duration should be windowDuration+1 because the ticker and the timer are not started together.
routine := NewAndStartScheduledTaskRoutine(1*time.Second, eventsWindowsDuration+1, task, expiredCallback)
ca.sendEventsRoutineMap.Store(task.triggerEvent.Pid, routine)
}

type ScheduledTask interface {
run()
}

type ScheduledTaskRoutine struct {
expiredDuration time.Duration
ticker *time.Ticker
timer *time.Timer
stopCh chan struct{}

task ScheduledTask
isRunning *atomic.Bool

expiredCallback func()
}

// NewAndStartScheduledTaskRoutine creates a new routine and start it immediately.
func NewAndStartScheduledTaskRoutine(
tickerDuration time.Duration,
expiredDuration time.Duration,
task ScheduledTask,
expiredCallback func()) *ScheduledTaskRoutine {
ret := &ScheduledTaskRoutine{
expiredDuration: expiredDuration,
ticker: time.NewTicker(tickerDuration),
timer: time.NewTimer(expiredDuration),
task: task,
isRunning: atomic.NewBool(false),
stopCh: make(chan struct{}),
expiredCallback: expiredCallback,
}
// Start the routine once it is created.
ret.Start()
return ret
}

func (s *ScheduledTaskRoutine) Start() error {
swapped := s.isRunning.CAS(false, true)
if !swapped {
return errors.New("the routine has been started")
}
go func() {
if s.expiredCallback != nil {
defer s.expiredCallback()
}
for {
select {
case <-s.ticker.C:
// do some work
s.task.run()
case <-s.timer.C:
// The current task is expired.
s.isRunning.CAS(true, false)
s.ticker.Stop()
return
case <-s.stopCh:
s.timer.Stop()
s.ticker.Stop()
return
}
}
}()
return nil
}

// ResetExpiredTimer resets the timer to extend its expired time if it is running.
// If the routine is not running, an error will be returned and nothing will happen.
func (s *ScheduledTaskRoutine) ResetExpiredTimer() error {
if !s.isRunning.Load() {
return errors.New("the routine is not running, can't reset the timer")
}
if !s.timer.Stop() {
<-s.timer.C
}
s.timer.Reset(s.expiredDuration)
return nil
}

func (s *ScheduledTaskRoutine) ResetExpiredTimerWithNewTask(task ScheduledTask) error {
s.task = task
return s.ResetExpiredTimer()
}

func (s *ScheduledTaskRoutine) Stop() error {
swapped := s.isRunning.CAS(true, false)
if !swapped {
return errors.New("the routine is not running")
}
s.stopCh <- struct{}{}
return nil
}

type SendEventsTask struct {
tickerCount int
cpuAnalyzer *CpuAnalyzer
Expand Down
Loading

0 comments on commit 47be8af

Please sign in to comment.