diff --git a/pkg/movingaverage/avg_over_time.go b/pkg/movingaverage/avg_over_time.go index 227ee32e8c7..664466fe98c 100644 --- a/pkg/movingaverage/avg_over_time.go +++ b/pkg/movingaverage/avg_over_time.go @@ -32,7 +32,7 @@ type deltaWithInterval struct { // then calculates the change rate by (sum of changes) / (sum of intervals). type AvgOverTime struct { que *queue.Queue // The element is `deltaWithInterval`, sum of all elements' interval is less than `avgInterval` - margin deltaWithInterval // The last element from `PopFront` in `que` + margin deltaWithInterval // The last element from `PopFront` in `que`, it will be changed if the sum exclude it is greater than `avgInterval` deltaSum float64 // Including `margin` and all elements in `que` intervalSum time.Duration // Including `margin` and all elements in `que` avgInterval time.Duration @@ -57,6 +57,7 @@ func (aot *AvgOverTime) Get() float64 { if aot.intervalSum < aot.avgInterval { return 0 } + marginDelta := aot.margin.delta * (aot.intervalSum.Seconds() - aot.avgInterval.Seconds()) / aot.margin.interval.Seconds() return (aot.deltaSum - marginDelta) / aot.avgInterval.Seconds() } @@ -75,6 +76,11 @@ func (aot *AvgOverTime) Clear() { } // Add adds recent change to AvgOverTime. +// It will pop item until the retain item's sum is greater than avgInterval. +// such as: +// que [1,1,1,6], avgInterval is 5. +// It will pop 6 if adding 2, the retaining item's sum is 5(2,1,1,1) >= avgInterval. +// It can't pop 6 if adding 1, the retaining item's sum is 4(1,1,1,1) < avgInterval. func (aot *AvgOverTime) Add(delta float64, interval time.Duration) { if interval == 0 { return @@ -135,7 +141,11 @@ func (aot *AvgOverTime) GetIntervalSum() time.Duration { // GetInstantaneous returns the value just added. func (aot *AvgOverTime) GetInstantaneous() float64 { if aot.que.Len() == 0 || aot.que.Back() == nil { - return 0 + if aot.margin.interval != 0 { + return aot.margin.delta / aot.margin.interval.Seconds() + } + return aot.margin.delta } - return aot.que.Back().(deltaWithInterval).delta + data := aot.que.Back().(deltaWithInterval) + return data.delta / data.interval.Seconds() } diff --git a/pkg/movingaverage/avg_over_time_test.go b/pkg/movingaverage/avg_over_time_test.go index a0787f5af81..43553d9d608 100644 --- a/pkg/movingaverage/avg_over_time_test.go +++ b/pkg/movingaverage/avg_over_time_test.go @@ -42,6 +42,20 @@ func TestPulse(t *testing.T) { } } +func TestPulse2(t *testing.T) { + t.Parallel() + re := require.New(t) + dur := 5 * time.Second + aot := NewAvgOverTime(dur) + re.Equal(float64(0), aot.GetInstantaneous()) + aot.Add(1000, dur) + re.Equal(float64(1000)/dur.Seconds(), aot.GetInstantaneous()) + re.True(aot.IsFull()) + aot.Clear() + aot.Add(1000, dur) + re.Equal(float64(1000)/dur.Seconds(), aot.GetInstantaneous()) +} + func TestChange(t *testing.T) { t.Parallel() re := require.New(t)