diff --git a/stats.go b/stats.go index 4143dc6..18d830a 100644 --- a/stats.go +++ b/stats.go @@ -10,6 +10,14 @@ import ( tagspkg "github.com/lyft/gostats/internal/tags" ) +// Counter and Timer metrics that were zero after unusedMetricPruneCount are +// deleted from the statStore. +// +// TODO: considering a time interval for this instead of basing it off flush +// count. This guards against aggressively short/long flush intervals, which +// is configurable and thus out of our control. +const unusedMetricPruneCount = 4 + // A Store holds statistics. // There are two options when creating a new store: // @@ -238,6 +246,8 @@ func NewDefaultStore() Store { type counter struct { currentValue uint64 lastSentValue uint64 + // number of times this counter had a value of zero during flush + zeroCount uint32 } func (c *counter) Add(delta uint64) { @@ -302,6 +312,12 @@ type timer struct { base time.Duration name string sink Sink + // active is a boolean used to check if the timer was used between flushes + active uint32 + // zeroCount is the number of times the timer was not used between + // flushes - if exceeds unusedMetricPruneCount the timer is deleted + // from the Store. + zeroCount uint32 } func (t *timer) time(dur time.Duration) { @@ -313,6 +329,7 @@ func (t *timer) AddDuration(dur time.Duration) { } func (t *timer) AddValue(value float64) { + atomic.StoreUint32(&t.active, 1) t.sink.FlushTimer(t.name, value) } @@ -369,10 +386,35 @@ func (s *statStore) Flush() { } s.mu.RUnlock() + // This is kinda slow and does not write to the sink so run it in a + // separate goroutine. + wg := new(sync.WaitGroup) + wg.Add(1) + go func(timers *sync.Map) { + defer wg.Done() + timers.Range(func(key, v interface{}) bool { + timer := v.(*timer) + switch { + case atomic.SwapUint32(&timer.active, 0) != 0: + atomic.StoreUint32(&timer.zeroCount, 0) + case atomic.AddUint32(&timer.zeroCount, 1) >= unusedMetricPruneCount: + timers.Delete(key) + } + return true + }) + }(&s.timers) + s.counters.Range(func(key, v interface{}) bool { + c := v.(*counter) + value := c.latch() + switch { // do not flush counters that are set to zero - if value := v.(*counter).latch(); value != 0 { + case value != 0: s.sink.FlushCounter(key.(string), value) + atomic.StoreUint32(&c.zeroCount, 0) + // delete unused counters + case atomic.AddUint32(&c.zeroCount, 1) >= unusedMetricPruneCount: + s.counters.Delete(key) } return true }) @@ -386,6 +428,10 @@ func (s *statStore) Flush() { if ok { flushableSink.Flush() } + + // Wait for the goroutine pruning timers to finish. This prevents an + // explosion of goroutines if someone calls Flush in a hot loop. + wg.Wait() } func (s *statStore) AddStatGenerator(statGenerator StatGenerator) { diff --git a/stats_test.go b/stats_test.go index 66ca14f..ee1fc24 100644 --- a/stats_test.go +++ b/stats_test.go @@ -379,6 +379,45 @@ func TestPerInstanceStats(t *testing.T) { }) } +func TestStatsStorePrune(t *testing.T) { + s := NewStore(nullSink{}, false).(*statStore) + tick := time.NewTicker(time.Hour) // don't flush automatically + defer tick.Stop() + go s.Start(tick) + + const N = 1024 + for i := 0; i < N; i++ { + id := strconv.Itoa(i) + s.NewCounter("counter_" + id) + s.NewTimer("timer_" + id) + } + + mlen := func(m *sync.Map) int { + n := 0 + m.Range(func(_, _ any) bool { + n++ + return true + }) + return n + } + + for i := 0; i < unusedMetricPruneCount; i++ { + if n := mlen(&s.counters); n != N { + t.Errorf("len(s.counters) == %d; want: %d", n, N) + } + if n := mlen(&s.timers); n != N { + t.Errorf("len(s.timers) == %d; want: %d", n, N) + } + s.Flush() + } + if n := mlen(&s.counters); n != 0 { + t.Errorf("len(s.counters) == %d; want: %d", n, 0) + } + if n := mlen(&s.timers); n != 0 { + t.Errorf("len(s.timers) == %d; want: %d", n, 0) + } +} + func BenchmarkStore_MutexContention(b *testing.B) { s := NewStore(nullSink{}, false) t := time.NewTicker(500 * time.Microsecond) // we want flush to contend with accessing metrics @@ -493,3 +532,55 @@ func BenchmarkStoreNewPerInstanceCounter(b *testing.B) { } }) } + +func BenchmarkStoreNewCounterParallel(b *testing.B) { + s := NewStore(nullSink{}, false) + t := time.NewTicker(time.Hour) // don't flush + defer t.Stop() + go s.Start(t) + names := new([2048]string) + for i := 0; i < len(names); i++ { + names[i] = "counter_" + strconv.Itoa(i) + } + b.ResetTimer() + b.RunParallel(func(pb *testing.PB) { + for i := 0; pb.Next(); i++ { + s.NewCounter(names[i%len(names)]) + } + }) +} + +func BenchmarkStoreFlush(b *testing.B) { + s := NewStore(nullSink{}, false) + t := time.NewTicker(time.Hour) // don't flush automatically + defer t.Stop() + go s.Start(t) + + counters := new([2048]*counter) + gauges := new([2048]*gauge) + timers := new([2048]*timer) + + for i := 0; i < len(counters); i++ { + id := strconv.Itoa(i) + counters[i] = s.NewCounter("counter_" + id).(*counter) + counters[i].Set(1) + gauges[i] = s.NewGauge("gauge_" + id).(*gauge) + gauges[i].Set(1) + timers[i] = s.NewTimer("timer_" + id).(*timer) + } + b.ResetTimer() + + for i, n := 0, 0; i < b.N; i++ { + s.Flush() + if n++; n == unusedMetricPruneCount { + b.StopTimer() + for i := 0; i < len(counters); i++ { + counters[i].currentValue = uint64(i) + counters[i].zeroCount = 0 + timers[i].active = 1 + } + b.StartTimer() + n = 0 + } + } +}