From befda567cd580cc54451de764e8fed0b2c0782e8 Mon Sep 17 00:00:00 2001 From: Jason Anderson Date: Tue, 7 Nov 2023 13:20:37 -0800 Subject: [PATCH] put behind config --- receiver/statsdreceiver/config.go | 1 + .../internal/protocol/parser.go | 2 +- .../internal/protocol/statsd_parser.go | 12 ++- .../internal/protocol/statsd_parser_test.go | 97 ++++++++++++++----- receiver/statsdreceiver/receiver.go | 1 + 5 files changed, 84 insertions(+), 29 deletions(-) diff --git a/receiver/statsdreceiver/config.go b/receiver/statsdreceiver/config.go index 76dd700b6374..4e9a2d5eb75d 100644 --- a/receiver/statsdreceiver/config.go +++ b/receiver/statsdreceiver/config.go @@ -19,6 +19,7 @@ type Config struct { NetAddr confignet.NetAddr `mapstructure:",squash"` AggregationInterval time.Duration `mapstructure:"aggregation_interval"` EnableMetricType bool `mapstructure:"enable_metric_type"` + EnableSimpleTags bool `mapstructure:"enable_simple_tags"` IsMonotonicCounter bool `mapstructure:"is_monotonic_counter"` TimerHistogramMapping []protocol.TimerHistogramMapping `mapstructure:"timer_histogram_mapping"` } diff --git a/receiver/statsdreceiver/internal/protocol/parser.go b/receiver/statsdreceiver/internal/protocol/parser.go index abfea560ce24..bc72f7e3f160 100644 --- a/receiver/statsdreceiver/internal/protocol/parser.go +++ b/receiver/statsdreceiver/internal/protocol/parser.go @@ -12,7 +12,7 @@ import ( // Parser is something that can map input StatsD strings to OTLP Metric representations. type Parser interface { - Initialize(enableMetricType bool, isMonotonicCounter bool, sendTimerHistogram []TimerHistogramMapping) error + Initialize(enableMetricType bool, enableSimpleTags bool, isMonotonicCounter bool, sendTimerHistogram []TimerHistogramMapping) error GetMetrics() []BatchMetrics Aggregate(line string, addr net.Addr) error } diff --git a/receiver/statsdreceiver/internal/protocol/statsd_parser.go b/receiver/statsdreceiver/internal/protocol/statsd_parser.go index dea891e769ef..474fe03945ba 100644 --- a/receiver/statsdreceiver/internal/protocol/statsd_parser.go +++ b/receiver/statsdreceiver/internal/protocol/statsd_parser.go @@ -79,6 +79,7 @@ var defaultObserverCategory = ObserverCategory{ type StatsDParser struct { instrumentsByAddress map[netAddr]*instruments enableMetricType bool + enableSimpleTags bool isMonotonicCounter bool timerEvents ObserverCategory histogramEvents ObserverCategory @@ -156,12 +157,13 @@ func (p *StatsDParser) resetState(when time.Time) { p.instrumentsByAddress = make(map[netAddr]*instruments) } -func (p *StatsDParser) Initialize(enableMetricType bool, isMonotonicCounter bool, sendTimerHistogram []TimerHistogramMapping) error { +func (p *StatsDParser) Initialize(enableMetricType bool, enableSimpleTags bool, isMonotonicCounter bool, sendTimerHistogram []TimerHistogramMapping) error { p.resetState(timeNowFunc()) p.histogramEvents = defaultObserverCategory p.timerEvents = defaultObserverCategory p.enableMetricType = enableMetricType + p.enableSimpleTags = enableSimpleTags p.isMonotonicCounter = isMonotonicCounter // Note: validation occurs in ("../".Config).validate() for _, eachMap := range sendTimerHistogram { @@ -270,7 +272,7 @@ func (p *StatsDParser) observerCategoryFor(t MetricType) ObserverCategory { // Aggregate for each metric line. func (p *StatsDParser) Aggregate(line string, addr net.Addr) error { - parsedMetric, err := parseMessageToMetric(line, p.enableMetricType) + parsedMetric, err := parseMessageToMetric(line, p.enableMetricType, p.enableSimpleTags) if err != nil { return err } @@ -349,7 +351,7 @@ func (p *StatsDParser) Aggregate(line string, addr net.Addr) error { return nil } -func parseMessageToMetric(line string, enableMetricType bool) (statsDMetric, error) { +func parseMessageToMetric(line string, enableMetricType bool, enableSimpleTags bool) (statsDMetric, error) { result := statsDMetric{} parts := strings.Split(line, "|") @@ -422,6 +424,10 @@ func parseMessageToMetric(line string, enableMetricType bool) (statsDMetric, err v = tagParts[1] } + if v == "" && !enableSimpleTags { + return result, fmt.Errorf("invalid tag format: %q", tagSet) + } + kvs = append(kvs, attribute.String(k, v)) } default: diff --git a/receiver/statsdreceiver/internal/protocol/statsd_parser_test.go b/receiver/statsdreceiver/internal/protocol/statsd_parser_test.go index 5313c0b8d9c3..d91ee7c3ed27 100644 --- a/receiver/statsdreceiver/internal/protocol/statsd_parser_test.go +++ b/receiver/statsdreceiver/internal/protocol/statsd_parser_test.go @@ -90,7 +90,7 @@ func Test_ParseMessageToMetric(t *testing.T) { err: errors.New("unsupported metric type: unhandled_type"), }, { - name: "counter metric with sample rate and (dimensional) tag", + name: "counter metric with sample rate and tag", input: "test.metric:42|c|@0.1|#key:value", wantMetric: testStatsDMetric( "test.metric", @@ -101,18 +101,6 @@ func Test_ParseMessageToMetric(t *testing.T) { []string{"key"}, []string{"value"}), }, - { - name: "counter metric with sample rate and (simple) tag", - input: "test.metric:42|c|@0.1|#key", - wantMetric: testStatsDMetric( - "test.metric", - 42, - false, - "c", - 0.1, - []string{"key"}, - []string{""}), - }, { name: "counter metric with sample rate(not divisible) and tag", input: "test.metric:42|c|@0.8|#key:value", @@ -247,7 +235,7 @@ func Test_ParseMessageToMetric(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := parseMessageToMetric(tt.input, false) + got, err := parseMessageToMetric(tt.input, false, false) if tt.err != nil { assert.Equal(t, tt.err, err) @@ -445,7 +433,66 @@ func Test_ParseMessageToMetricWithMetricType(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - got, err := parseMessageToMetric(tt.input, true) + got, err := parseMessageToMetric(tt.input, true, false) + + if tt.err != nil { + assert.Equal(t, tt.err, err) + } else { + assert.NoError(t, err) + assert.Equal(t, tt.wantMetric, got) + } + }) + } +} + +func Test_ParseMessageToMetricWithSimpleTags(t *testing.T) { + tests := []struct { + name string + input string + wantMetric statsDMetric + err error + }{ + { + name: "counter metric with sample rate and (dimensional) tag", + input: "test.metric:42|c|@0.1|#key:value", + wantMetric: testStatsDMetric( + "test.metric", + 42, + false, + "c", + 0.1, + []string{"key"}, + []string{"value"}), + }, + { + name: "counter metric with sample rate and (simple) tag", + input: "test.metric:42|c|@0.1|#key", + wantMetric: testStatsDMetric( + "test.metric", + 42, + false, + "c", + 0.1, + []string{"key"}, + []string{""}), + }, + { + name: "counter metric with sample rate and two (simple) tags", + input: "test.metric:42|c|@0.1|#key,key2", + wantMetric: testStatsDMetric( + "test.metric", + 42, + false, + "c", + 0.1, + []string{"key", "key2"}, + []string{"", ""}), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := parseMessageToMetric(tt.input, false, true) if tt.err != nil { assert.Equal(t, tt.err, err) @@ -689,7 +736,7 @@ func TestStatsDParser_Aggregate(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var err error p := &StatsDParser{} - assert.NoError(t, p.Initialize(false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) + assert.NoError(t, p.Initialize(false, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) p.lastIntervalTime = time.Unix(611, 0) addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678") addrKey := newNetAddr(addr) @@ -758,7 +805,7 @@ func TestStatsDParser_AggregateByAddress(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { p := &StatsDParser{} - assert.NoError(t, p.Initialize(true, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) + assert.NoError(t, p.Initialize(true, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) p.lastIntervalTime = time.Unix(611, 0) for i, addr := range tt.addresses { for _, line := range tt.input[i] { @@ -826,7 +873,7 @@ func TestStatsDParser_AggregateWithMetricType(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var err error p := &StatsDParser{} - assert.NoError(t, p.Initialize(true, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) + assert.NoError(t, p.Initialize(true, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) p.lastIntervalTime = time.Unix(611, 0) addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678") addrKey := newNetAddr(addr) @@ -876,7 +923,7 @@ func TestStatsDParser_AggregateWithIsMonotonicCounter(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var err error p := &StatsDParser{} - assert.NoError(t, p.Initialize(false, true, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) + assert.NoError(t, p.Initialize(false, false, true, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) p.lastIntervalTime = time.Unix(611, 0) addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678") addrKey := newNetAddr(addr) @@ -998,7 +1045,7 @@ func TestStatsDParser_AggregateTimerWithSummary(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var err error p := &StatsDParser{} - assert.NoError(t, p.Initialize(false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "summary"}, {StatsdType: "histogram", ObserverType: "summary"}})) + assert.NoError(t, p.Initialize(false, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "summary"}, {StatsdType: "histogram", ObserverType: "summary"}})) addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678") addrKey := newNetAddr(addr) for _, line := range tt.input { @@ -1015,7 +1062,7 @@ func TestStatsDParser_AggregateTimerWithSummary(t *testing.T) { func TestStatsDParser_Initialize(t *testing.T) { p := &StatsDParser{} - assert.NoError(t, p.Initialize(true, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) + assert.NoError(t, p.Initialize(true, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) teststatsdDMetricdescription := statsDMetricDescription{ name: "test", metricType: "g", @@ -1034,7 +1081,7 @@ func TestStatsDParser_Initialize(t *testing.T) { func TestStatsDParser_GetMetricsWithMetricType(t *testing.T) { p := &StatsDParser{} - assert.NoError(t, p.Initialize(true, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) + assert.NoError(t, p.Initialize(true, false, false, []TimerHistogramMapping{{StatsdType: "timer", ObserverType: "gauge"}, {StatsdType: "histogram", ObserverType: "gauge"}})) instrument := newInstruments(nil) instrument.gauges[testDescription("statsdTestMetric1", "g", []string{"mykey", "metric_type"}, []string{"myvalue", "gauge"})] = buildGaugeMetric(testStatsDMetric("testGauge1", 1, false, "g", 0, []string{"mykey", "metric_type"}, []string{"myvalue", "gauge"}), time.Unix(711, 0)) @@ -1107,7 +1154,7 @@ func TestStatsDParser_Mappings(t *testing.T) { t.Run(tc.name, func(t *testing.T) { p := &StatsDParser{} - assert.NoError(t, p.Initialize(false, false, tc.mapping)) + assert.NoError(t, p.Initialize(false, false, false, tc.mapping)) addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678") assert.NoError(t, p.Aggregate("H:10|h", addr)) @@ -1141,7 +1188,7 @@ func TestStatsDParser_ScopeIsIncluded(t *testing.T) { } testAddress, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678") - err := p.Initialize(true, false, + err := p.Initialize(true, false, false, []TimerHistogramMapping{ {StatsdType: "timer", ObserverType: "summary"}, {StatsdType: "histogram", ObserverType: "histogram"}, @@ -1411,7 +1458,7 @@ func TestStatsDParser_AggregateTimerWithHistogram(t *testing.T) { t.Run(tt.name, func(t *testing.T) { var err error p := &StatsDParser{} - assert.NoError(t, p.Initialize(false, false, tt.mapping)) + assert.NoError(t, p.Initialize(false, false, false, tt.mapping)) addr, _ := net.ResolveUDPAddr("udp", "1.2.3.4:5678") for _, line := range tt.input { err = p.Aggregate(line, addr) diff --git a/receiver/statsdreceiver/receiver.go b/receiver/statsdreceiver/receiver.go index 74d4354e4a62..3034fc561bbc 100644 --- a/receiver/statsdreceiver/receiver.go +++ b/receiver/statsdreceiver/receiver.go @@ -91,6 +91,7 @@ func (r *statsdReceiver) Start(ctx context.Context, host component.Host) error { ticker := time.NewTicker(r.config.AggregationInterval) err = r.parser.Initialize( r.config.EnableMetricType, + r.config.EnableSimpleTags, r.config.IsMonotonicCounter, r.config.TimerHistogramMapping, )