Skip to content

Commit

Permalink
feat(inputs.statsd): Allow reporting sets and timings count as floats (
Browse files Browse the repository at this point in the history
  • Loading branch information
calind authored and asaharn committed Oct 16, 2024
1 parent f0d47e3 commit 63900a7
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 2 deletions.
7 changes: 7 additions & 0 deletions plugins/inputs/statsd/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ See the [CONFIGURATION.md][CONFIGURATION.md] for more details.
## Enabling this would ensure that both counters and guages are both emitted
## as floats.
# float_counters = false

## Emit timings `metric_<name>_count` field as float, the same as all other
## histogram fields
# float_timings = false

## Emit sets as float
# float_sets = false
```

## Description
Expand Down
7 changes: 7 additions & 0 deletions plugins/inputs/statsd/sample.conf
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,10 @@
## Enabling this would ensure that both counters and guages are both emitted
## as floats.
# float_counters = false

## Emit timings `metric_<name>_count` field as float, the same as all other
## histogram fields
# float_timings = false

## Emit sets as float
# float_sets = false
14 changes: 12 additions & 2 deletions plugins/inputs/statsd/statsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ type Statsd struct {
DeleteTimings bool `toml:"delete_timings"`
ConvertNames bool `toml:"convert_names"`
FloatCounters bool `toml:"float_counters"`
FloatTimings bool `toml:"float_timings"`
FloatSets bool `toml:"float_sets"`

EnableAggregationTemporality bool `toml:"enable_aggregation_temporality"`

Expand Down Expand Up @@ -260,7 +262,11 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
fields[prefix+"sum"] = stats.Sum()
fields[prefix+"upper"] = stats.Upper()
fields[prefix+"lower"] = stats.Lower()
fields[prefix+"count"] = stats.Count()
if s.FloatTimings {
fields[prefix+"count"] = float64(stats.Count())
} else {
fields[prefix+"count"] = stats.Count()
}
for _, percentile := range s.Percentiles {
name := fmt.Sprintf("%s%v_percentile", prefix, percentile)
fields[name] = stats.Percentile(float64(percentile))
Expand Down Expand Up @@ -306,7 +312,11 @@ func (s *Statsd) Gather(acc telegraf.Accumulator) error {
for _, m := range s.sets {
fields := make(map[string]interface{})
for field, set := range m.fields {
fields[field] = int64(len(set))
if s.FloatSets {
fields[field] = float64(len(set))
} else {
fields[field] = int64(len(set))
}
}
if s.EnableAggregationTemporality {
fields["start_time"] = s.lastGatherTime.Format(time.RFC3339)
Expand Down
76 changes: 76 additions & 0 deletions plugins/inputs/statsd/statsd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,51 @@ func TestParse_Sets(t *testing.T) {
}
}

func TestParse_Sets_SetsAsFloat(t *testing.T) {
s := NewTestStatsd()
s.FloatSets = true

// Test that sets work
validLines := []string{
"unique.user.ids:100|s",
"unique.user.ids:100|s",
"unique.user.ids:200|s",
}

for _, line := range validLines {
require.NoErrorf(t, s.parseStatsdLine(line), "Parsing line %s should not have resulted in an error", line)
}

validations := []struct {
name string
value int64
}{
{
"unique_user_ids",
2,
},
}
for _, test := range validations {
require.NoError(t, testValidateSet(test.name, test.value, s.sets))
}

expected := []telegraf.Metric{
testutil.MustMetric(
"unique_user_ids",
map[string]string{"metric_type": "set"},
map[string]interface{}{"value": 2.0},
time.Now(),
telegraf.Untyped,
),
}

acc := &testutil.Accumulator{}
require.NoError(t, s.Gather(acc))
metrics := acc.GetTelegrafMetrics()
testutil.PrintMetrics(metrics)
testutil.RequireMetricsEqual(t, expected, metrics, testutil.IgnoreTime(), testutil.SortMetrics())
}

// Tests low-level functionality of counters
func TestParse_Counters(t *testing.T) {
s := NewTestStatsd()
Expand Down Expand Up @@ -676,6 +721,37 @@ func TestParse_Timings(t *testing.T) {
acc.AssertContainsFields(t, "test_timing", valid)
}

func TestParse_Timings_TimingsAsFloat(t *testing.T) {
s := NewTestStatsd()
s.FloatTimings = true
s.Percentiles = []Number{90.0}
acc := &testutil.Accumulator{}

// Test that timings work
validLines := []string{
"test.timing:100|ms",
}

for _, line := range validLines {
require.NoErrorf(t, s.parseStatsdLine(line), "Parsing line %s should not have resulted in an error", line)
}

require.NoError(t, s.Gather(acc))

valid := map[string]interface{}{
"90_percentile": float64(100),
"count": float64(1),
"lower": float64(100),
"mean": float64(100),
"median": float64(100),
"stddev": float64(0),
"sum": float64(100),
"upper": float64(100),
}

acc.AssertContainsFields(t, "test_timing", valid)
}

// Tests low-level functionality of distributions
func TestParse_Distributions(t *testing.T) {
s := NewTestStatsd()
Expand Down

0 comments on commit 63900a7

Please sign in to comment.