Skip to content

Commit

Permalink
stats: delete unused Counters and Timers when flushing
Browse files Browse the repository at this point in the history
This commit changes the statsStore to remove Counters and Timers that
were not used N times between flush intervals. This should alleviate
memory pressure when during cardinality explosions.
  • Loading branch information
charlievieth committed Jun 24, 2023
1 parent e6b88e6 commit 3b65338
Show file tree
Hide file tree
Showing 2 changed files with 138 additions and 1 deletion.
48 changes: 47 additions & 1 deletion stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,14 @@ import (
tagspkg "github.com/lyft/gostats/internal/tags"
)

// Counter and Timer metrics that were zero after unusedMetricPruneCount are
// deleted from the statStore.
//
// TODO: considering a time interval for this instead of basing it off flush
// count. This guards against aggressively short/long flush intervals, which
// is configurable and thus out of our control.
const unusedMetricPruneCount = 4

// A Store holds statistics.
// There are two options when creating a new store:
//
Expand Down Expand Up @@ -238,6 +246,8 @@ func NewDefaultStore() Store {
type counter struct {
currentValue uint64
lastSentValue uint64
// number of times this counter had a value of zero during flush
zeroCount uint32
}

func (c *counter) Add(delta uint64) {
Expand Down Expand Up @@ -302,6 +312,12 @@ type timer struct {
base time.Duration
name string
sink Sink
// active is a boolean used to check if the timer was used between flushes
active uint32
// zeroCount is the number of times the timer was not used between
// flushes - if exceeds unusedMetricPruneCount the timer is deleted
// from the Store.
zeroCount uint32
}

func (t *timer) time(dur time.Duration) {
Expand All @@ -313,6 +329,7 @@ func (t *timer) AddDuration(dur time.Duration) {
}

func (t *timer) AddValue(value float64) {
atomic.StoreUint32(&t.active, 1)
t.sink.FlushTimer(t.name, value)
}

Expand Down Expand Up @@ -369,10 +386,35 @@ func (s *statStore) Flush() {
}
s.mu.RUnlock()

// This is kinda slow and does not write to the sink so run it in a
// separate goroutine.
wg := new(sync.WaitGroup)
wg.Add(1)
go func(timers *sync.Map) {
defer wg.Done()
timers.Range(func(key, v interface{}) bool {
timer := v.(*timer)
switch {
case atomic.SwapUint32(&timer.active, 0) != 0:
atomic.StoreUint32(&timer.zeroCount, 0)
case atomic.AddUint32(&timer.zeroCount, 1) >= unusedMetricPruneCount:
timers.Delete(key)
}
return true
})
}(&s.timers)

s.counters.Range(func(key, v interface{}) bool {
c := v.(*counter)
value := c.latch()
switch {
// do not flush counters that are set to zero
if value := v.(*counter).latch(); value != 0 {
case value != 0:
s.sink.FlushCounter(key.(string), value)
atomic.StoreUint32(&c.zeroCount, 0)
// delete unused counters
case atomic.AddUint32(&c.zeroCount, 1) >= unusedMetricPruneCount:
s.counters.Delete(key)
}
return true
})
Expand All @@ -386,6 +428,10 @@ func (s *statStore) Flush() {
if ok {
flushableSink.Flush()
}

// Wait for the goroutine pruning timers to finish. This prevents an
// explosion of goroutines if someone calls Flush in a hot loop.
wg.Wait()
}

func (s *statStore) AddStatGenerator(statGenerator StatGenerator) {
Expand Down
91 changes: 91 additions & 0 deletions stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,45 @@ func TestPerInstanceStats(t *testing.T) {
})
}

func TestStatsStorePrune(t *testing.T) {
s := NewStore(nullSink{}, false).(*statStore)
tick := time.NewTicker(time.Hour) // don't flush automatically
defer tick.Stop()
go s.Start(tick)

const N = 1024
for i := 0; i < N; i++ {
id := strconv.Itoa(i)
s.NewCounter("counter_" + id)
s.NewTimer("timer_" + id)
}

mlen := func(m *sync.Map) int {
n := 0
m.Range(func(_, _ any) bool {
n++
return true
})
return n
}

for i := 0; i < unusedMetricPruneCount; i++ {
if n := mlen(&s.counters); n != N {
t.Errorf("len(s.counters) == %d; want: %d", n, N)
}
if n := mlen(&s.timers); n != N {
t.Errorf("len(s.timers) == %d; want: %d", n, N)
}
s.Flush()
}
if n := mlen(&s.counters); n != 0 {
t.Errorf("len(s.counters) == %d; want: %d", n, 0)
}
if n := mlen(&s.timers); n != 0 {
t.Errorf("len(s.timers) == %d; want: %d", n, 0)
}
}

func BenchmarkStore_MutexContention(b *testing.B) {
s := NewStore(nullSink{}, false)
t := time.NewTicker(500 * time.Microsecond) // we want flush to contend with accessing metrics
Expand Down Expand Up @@ -493,3 +532,55 @@ func BenchmarkStoreNewPerInstanceCounter(b *testing.B) {
}
})
}

func BenchmarkStoreNewCounterParallel(b *testing.B) {
s := NewStore(nullSink{}, false)
t := time.NewTicker(time.Hour) // don't flush
defer t.Stop()
go s.Start(t)
names := new([2048]string)
for i := 0; i < len(names); i++ {
names[i] = "counter_" + strconv.Itoa(i)
}
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for i := 0; pb.Next(); i++ {
s.NewCounter(names[i%len(names)])
}
})
}

func BenchmarkStoreFlush(b *testing.B) {
s := NewStore(nullSink{}, false)
t := time.NewTicker(time.Hour) // don't flush automatically
defer t.Stop()
go s.Start(t)

counters := new([2048]*counter)
gauges := new([2048]*gauge)
timers := new([2048]*timer)

for i := 0; i < len(counters); i++ {
id := strconv.Itoa(i)
counters[i] = s.NewCounter("counter_" + id).(*counter)
counters[i].Set(1)
gauges[i] = s.NewGauge("gauge_" + id).(*gauge)
gauges[i].Set(1)
timers[i] = s.NewTimer("timer_" + id).(*timer)
}
b.ResetTimer()

for i, n := 0, 0; i < b.N; i++ {
s.Flush()
if n++; n == unusedMetricPruneCount {
b.StopTimer()
for i := 0; i < len(counters); i++ {
counters[i].currentValue = uint64(i)
counters[i].zeroCount = 0
timers[i].active = 1
}
b.StartTimer()
n = 0
}
}
}

0 comments on commit 3b65338

Please sign in to comment.