Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove "reset" method of ScheduledTaskRoutine to fix bugs #369

Merged
merged 4 commits into from
Dec 6, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@


### Bug fixes
- Remove "reset" method of ScheduledTaskRoutine to fix a potential dead-lock issue. ([#369])(https://github.com/KindlingProject/kindling/pull/369)
- Fix the bug where the pod metadata with persistent IP in the map is deleted incorrectly due to the deleting mechanism with a delay. ([#374](https://github.com/KindlingProject/kindling/pull/374))
- Fix the bug that when the response is nil, the NAT IP and port are not added to the labels of the "DataGroup". ([#378](https://github.com/KindlingProject/kindling/pull/378))
- Fix potential deadlock of exited thread delay queue. ([#373](https://github.com/KindlingProject/kindling/pull/373))
Expand Down
9 changes: 7 additions & 2 deletions collector/pkg/component/analyzer/cpuanalyzer/cpu_analyzer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@ import (
"sync"
"time"

"go.uber.org/atomic"
"go.uber.org/zap/zapcore"

"github.com/Kindling-project/kindling/collector/pkg/component"
"github.com/Kindling-project/kindling/collector/pkg/component/analyzer"
"github.com/Kindling-project/kindling/collector/pkg/component/consumer"
"github.com/Kindling-project/kindling/collector/pkg/model"
"github.com/Kindling-project/kindling/collector/pkg/model/constnames"
"go.uber.org/zap/zapcore"
)

const (
Expand All @@ -23,6 +25,7 @@ type CpuAnalyzer struct {
cpuPidEvents map[uint32]map[uint32]*TimeSegments
// { pid: routine }
sendEventsRoutineMap sync.Map
routineSize *atomic.Int32
lock sync.Mutex
telemetry *component.TelemetryTools
tidExpiredQueue *tidDeleteQueue
Expand All @@ -43,10 +46,12 @@ func NewCpuAnalyzer(cfg interface{}, telemetry *component.TelemetryTools, consum
cfg: config,
telemetry: telemetry,
nextConsumers: consumers,
routineSize: atomic.NewInt32(0),
}
ca.cpuPidEvents = make(map[uint32]map[uint32]*TimeSegments, 100000)
ca.tidExpiredQueue = newTidDeleteQueue()
go ca.TidDelete(30*time.Second, 10*time.Second)
newSelfMetrics(telemetry.MeterProvider, ca)
return ca
}

Expand All @@ -56,7 +61,7 @@ func (ca *CpuAnalyzer) Start() error {
}

func (ca *CpuAnalyzer) Shutdown() error {
ca.StopProfile()
_ = ca.StopProfile()
return nil
}

Expand Down
82 changes: 82 additions & 0 deletions collector/pkg/component/analyzer/cpuanalyzer/scheduled_task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package cpuanalyzer

import (
"errors"
"time"

"go.uber.org/atomic"
)

type ScheduledTask interface {
run()
}

type ScheduledTaskRoutine struct {
expiredDuration time.Duration
ticker *time.Ticker
timer *time.Timer
stopCh chan struct{}

task ScheduledTask
isRunning *atomic.Bool

expiredCallback func()
}

// NewAndStartScheduledTaskRoutine creates a new routine and start it immediately.
func NewAndStartScheduledTaskRoutine(
tickerDuration time.Duration,
expiredDuration time.Duration,
task ScheduledTask,
expiredCallback func()) *ScheduledTaskRoutine {
ret := &ScheduledTaskRoutine{
expiredDuration: expiredDuration,
ticker: time.NewTicker(tickerDuration),
timer: time.NewTimer(expiredDuration),
task: task,
isRunning: atomic.NewBool(false),
stopCh: make(chan struct{}),
expiredCallback: expiredCallback,
}
// Start the routine once it is created.
_ = ret.Start()
return ret
}

func (s *ScheduledTaskRoutine) Start() error {
swapped := s.isRunning.CAS(false, true)
if !swapped {
return errors.New("the routine has been started")
}
go func() {
if s.expiredCallback != nil {
defer s.expiredCallback()
}
for {
select {
case <-s.ticker.C:
// do some work
s.task.run()
case <-s.timer.C:
// The current task is expired.
s.isRunning.CAS(true, false)
s.ticker.Stop()
return
case <-s.stopCh:
s.timer.Stop()
s.ticker.Stop()
return
}
}
}()
return nil
}

func (s *ScheduledTaskRoutine) Stop() error {
swapped := s.isRunning.CAS(true, false)
if !swapped {
return errors.New("the routine is not running")
}
s.stopCh <- struct{}{}
return nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package cpuanalyzer

import (
"testing"
"time"

"github.com/stretchr/testify/assert"
)

func TestScheduledTask(t *testing.T) {
// test case 1: Normal expired exit
task1 := &testIncrementTask{0}
routine1 := NewAndStartScheduledTaskRoutine(1*time.Millisecond, 5*time.Millisecond, task1, nil)
_ = routine1.Start()
time.Sleep(10 * time.Millisecond)
assert.Equal(t, false, routine1.isRunning.Load())
assert.Equal(t, 5, task1.count)

// Case 2: Double start or double stop
task3 := &testIncrementTask{0}
routine3 := NewAndStartScheduledTaskRoutine(1*time.Millisecond, 5*time.Millisecond, task3, nil)
err := routine3.Start()
assert.Error(t, err)
err = routine3.Stop()
assert.NoError(t, err)
err = routine3.Stop()
assert.Error(t, err)
}

type testIncrementTask struct {
count int
}

func (t *testIncrementTask) run() {
t.count++
}
24 changes: 24 additions & 0 deletions collector/pkg/component/analyzer/cpuanalyzer/self_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package cpuanalyzer

import (
"context"
"sync"

"go.opentelemetry.io/otel/metric"
)

var onceMetric sync.Once

const (
goroutineSize = "kindling_telemetry_cpuanalyzer_routine_size"
)

func newSelfMetrics(meterProvider metric.MeterProvider, analyzer *CpuAnalyzer) {
onceMetric.Do(func() {
meter := metric.Must(meterProvider.Meter("kindling"))
meter.NewInt64GaugeObserver(goroutineSize,
func(ctx context.Context, result metric.Int64ObserverResult) {
result.Observe(int64(analyzer.routineSize.Load()))
})
})
}
129 changes: 8 additions & 121 deletions collector/pkg/component/analyzer/cpuanalyzer/send_trigger.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package cpuanalyzer

import (
"errors"
"sync"
"time"

"github.com/Kindling-project/kindling/collector/pkg/filepathhelper"
"github.com/Kindling-project/kindling/collector/pkg/model"
"github.com/Kindling-project/kindling/collector/pkg/model/constlabels"
"github.com/Kindling-project/kindling/collector/pkg/model/constvalues"
"sync"
"time"

"go.uber.org/atomic"
)

var eventsWindowsDuration = 6 * time.Second
Expand Down Expand Up @@ -61,126 +59,15 @@ func (ca *CpuAnalyzer) ReceiveSendSignal() {
_ = nexConsumer.Consume(sendContent.OriginalData)
}
task := &SendEventsTask{0, ca, &sendContent}
// "Load" and "Delete" could be executed concurrently
value, ok := ca.sendEventsRoutineMap.Load(sendContent.Pid)
if !ok {
ca.putNewRoutine(task)
} else {
// The routine may be found before but stopped and deleted after entering this branch.
// So we much check twice.
routine, _ := value.(*ScheduledTaskRoutine)
// TODO Always replacing the task may cause that some events are skipped when the "spendTime"
// becomes much smaller than before.
err := routine.ResetExpiredTimerWithNewTask(task)
if err != nil {
// The routine has been expired.
ca.putNewRoutine(task)
}
expiredCallback := func() {
ca.routineSize.Dec()
}
// The expired duration should be windowDuration+1 because the ticker and the timer are not started together.
NewAndStartScheduledTaskRoutine(1*time.Second, eventsWindowsDuration+1, task, expiredCallback)
ca.routineSize.Inc()
}
}

func (ca *CpuAnalyzer) putNewRoutine(task *SendEventsTask) {
expiredCallback := func() {
ca.sendEventsRoutineMap.Delete(task.triggerEvent.Pid)
}
// The expired duration should be windowDuration+1 because the ticker and the timer are not started together.
routine := NewAndStartScheduledTaskRoutine(1*time.Second, eventsWindowsDuration+1, task, expiredCallback)
ca.sendEventsRoutineMap.Store(task.triggerEvent.Pid, routine)
}

type ScheduledTask interface {
run()
}

type ScheduledTaskRoutine struct {
expiredDuration time.Duration
ticker *time.Ticker
timer *time.Timer
stopCh chan struct{}

task ScheduledTask
isRunning *atomic.Bool

expiredCallback func()
}

// NewAndStartScheduledTaskRoutine creates a new routine and start it immediately.
func NewAndStartScheduledTaskRoutine(
tickerDuration time.Duration,
expiredDuration time.Duration,
task ScheduledTask,
expiredCallback func()) *ScheduledTaskRoutine {
ret := &ScheduledTaskRoutine{
expiredDuration: expiredDuration,
ticker: time.NewTicker(tickerDuration),
timer: time.NewTimer(expiredDuration),
task: task,
isRunning: atomic.NewBool(false),
stopCh: make(chan struct{}),
expiredCallback: expiredCallback,
}
// Start the routine once it is created.
ret.Start()
return ret
}

func (s *ScheduledTaskRoutine) Start() error {
swapped := s.isRunning.CAS(false, true)
if !swapped {
return errors.New("the routine has been started")
}
go func() {
if s.expiredCallback != nil {
defer s.expiredCallback()
}
for {
select {
case <-s.ticker.C:
// do some work
s.task.run()
case <-s.timer.C:
// The current task is expired.
s.isRunning.CAS(true, false)
s.ticker.Stop()
return
case <-s.stopCh:
s.timer.Stop()
s.ticker.Stop()
return
}
}
}()
return nil
}

// ResetExpiredTimer resets the timer to extend its expired time if it is running.
// If the routine is not running, an error will be returned and nothing will happen.
func (s *ScheduledTaskRoutine) ResetExpiredTimer() error {
if !s.isRunning.Load() {
return errors.New("the routine is not running, can't reset the timer")
}
if !s.timer.Stop() {
<-s.timer.C
}
s.timer.Reset(s.expiredDuration)
return nil
}

func (s *ScheduledTaskRoutine) ResetExpiredTimerWithNewTask(task ScheduledTask) error {
s.task = task
return s.ResetExpiredTimer()
}

func (s *ScheduledTaskRoutine) Stop() error {
swapped := s.isRunning.CAS(true, false)
if !swapped {
return errors.New("the routine is not running")
}
s.stopCh <- struct{}{}
return nil
}

type SendEventsTask struct {
tickerCount int
cpuAnalyzer *CpuAnalyzer
Expand Down
Loading