From 1aae97254e9c9cbf3252876c66419916b61cd653 Mon Sep 17 00:00:00 2001 From: Ryan Fitzpatrick Date: Tue, 12 Jan 2021 15:29:38 +0000 Subject: [PATCH 1/2] smartagentreceiver: add datapoint conversion --- .../receiver/smartagentreceiver/convert.go | 185 +++++++++ .../smartagentreceiver/convert_test.go | 366 ++++++++++++++++++ .../receiver/smartagentreceiver/output.go | 14 + .../receiver/smartagentreceiver/receiver.go | 2 +- .../smartagentreceiver/receiver_test.go | 92 ++++- 5 files changed, 653 insertions(+), 6 deletions(-) create mode 100644 internal/receiver/smartagentreceiver/convert.go create mode 100644 internal/receiver/smartagentreceiver/convert_test.go diff --git a/internal/receiver/smartagentreceiver/convert.go b/internal/receiver/smartagentreceiver/convert.go new file mode 100644 index 0000000000..6a98281cf6 --- /dev/null +++ b/internal/receiver/smartagentreceiver/convert.go @@ -0,0 +1,185 @@ +// Copyright 2021, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package smartagentreceiver + +import ( + "fmt" + + sfx "github.com/signalfx/golib/v3/datapoint" + "go.opentelemetry.io/collector/consumer/pdata" + "go.uber.org/zap" +) + +var ( + errUnsupportedMetricTypeTimestamp = fmt.Errorf("unsupported metric type timestamp") + errNoIntValue = fmt.Errorf("no valid value for expected IntValue") + errNoFloatValue = fmt.Errorf("no valid value for expected FloatValue") +) + +type Converter struct { + logger *zap.Logger +} + +// Based on https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/v0.15.0/receiver/signalfxreceiver/signalfxv2_to_metricdata.go +func (c *Converter) toMetrics(datapoints []*sfx.Datapoint) (pdata.Metrics, int) { + numDropped := 0 + md := pdata.NewMetrics() + md.ResourceMetrics().Resize(1) + rm := md.ResourceMetrics().At(0) + + rm.InstrumentationLibraryMetrics().Resize(1) + ilm := rm.InstrumentationLibraryMetrics().At(0) + + metrics := ilm.Metrics() + metrics.Resize(len(datapoints)) + + i := 0 + for _, datapoint := range datapoints { + if datapoint == nil { + continue + } + + m := metrics.At(i) + err := setDataType(datapoint, m) + if err != nil { + numDropped++ + c.logger.Warn("SignalFx datapoint type conversion error", + zap.Error(err), + zap.String("metric", datapoint.String())) + continue + } + + m.SetName(datapoint.Metric) + + switch m.DataType() { + case pdata.MetricDataTypeIntGauge: + err = fillIntDatapoint(datapoint, m.IntGauge().DataPoints()) + case pdata.MetricDataTypeIntSum: + err = fillIntDatapoint(datapoint, m.IntSum().DataPoints()) + case pdata.MetricDataTypeDoubleGauge: + err = fillDoubleDatapoint(datapoint, m.DoubleGauge().DataPoints()) + case pdata.MetricDataTypeDoubleSum: + err = fillDoubleDatapoint(datapoint, m.DoubleSum().DataPoints()) + } + + if err != nil { + numDropped++ + c.logger.Warn("SignalFx datapoint datum conversion error", + zap.Error(err), + zap.String("metric", datapoint.Metric)) + continue + } + + i++ + } + + metrics.Resize(i) + + return md, numDropped + +} +func setDataType(datapoint *sfx.Datapoint, m pdata.Metric) error { + sfxMetricType := datapoint.MetricType + if sfxMetricType == sfx.Timestamp { + return errUnsupportedMetricTypeTimestamp + } + + var isFloat bool + switch datapoint.Value.(type) { + case sfx.IntValue: + case sfx.FloatValue: + isFloat = true + default: + return fmt.Errorf("unsupported value type %T: %v", datapoint.Value, datapoint.Value) + } + + switch sfxMetricType { + case sfx.Gauge, sfx.Enum, sfx.Rate: + if isFloat { + m.SetDataType(pdata.MetricDataTypeDoubleGauge) + m.DoubleGauge().InitEmpty() // will need to be removed w/ 0.16.0 adoption + } else { + m.SetDataType(pdata.MetricDataTypeIntGauge) + m.IntGauge().InitEmpty() // will need to be removed w/ 0.16.0 adoption + } + case sfx.Count: + if isFloat { + m.SetDataType(pdata.MetricDataTypeDoubleSum) + m.DoubleSum().InitEmpty() // will need to be removed w/ 0.16.0 adoption + m.DoubleSum().SetAggregationTemporality(pdata.AggregationTemporalityDelta) + m.DoubleSum().SetIsMonotonic(true) + } else { + m.SetDataType(pdata.MetricDataTypeIntSum) + m.IntSum().InitEmpty() // will need to be removed w/ 0.16.0 adoption + m.IntSum().SetAggregationTemporality(pdata.AggregationTemporalityDelta) + m.IntSum().SetIsMonotonic(true) + } + case sfx.Counter: + if isFloat { + m.SetDataType(pdata.MetricDataTypeDoubleSum) + m.DoubleSum().InitEmpty() // will need to be removed w/ 0.16.0 adoption + m.DoubleSum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative) + m.DoubleSum().SetIsMonotonic(true) + } else { + m.SetDataType(pdata.MetricDataTypeIntSum) + m.IntSum().InitEmpty() // will need to be removed w/ 0.16.0 adoption + m.IntSum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative) + m.IntSum().SetIsMonotonic(true) + } + default: + return fmt.Errorf("unsupported metric type %T: %v", sfxMetricType, sfxMetricType) + } + + return nil +} + +func fillIntDatapoint(datapoint *sfx.Datapoint, dps pdata.IntDataPointSlice) error { + var intValue sfx.IntValue + var ok bool + if intValue, ok = datapoint.Value.(sfx.IntValue); !ok { + return errNoIntValue + } + + dps.Resize(1) + dp := dps.At(0) + dp.SetTimestamp(pdata.TimestampUnixNano(uint64(datapoint.Timestamp.UnixNano()))) + dp.SetValue(intValue.Int()) + fillInLabels(datapoint.Dimensions, dp.LabelsMap()) + + return nil +} + +func fillDoubleDatapoint(datapoint *sfx.Datapoint, dps pdata.DoubleDataPointSlice) error { + var floatValue sfx.FloatValue + var ok bool + if floatValue, ok = datapoint.Value.(sfx.FloatValue); !ok { + return errNoFloatValue + } + + dps.Resize(1) + dp := dps.At(0) + dp.SetTimestamp(pdata.TimestampUnixNano(uint64(datapoint.Timestamp.UnixNano()))) + dp.SetValue(floatValue.Float()) + fillInLabels(datapoint.Dimensions, dp.LabelsMap()) + + return nil + +} + +func fillInLabels(dimensions map[string]string, labels pdata.StringMap) { + labels.InitEmptyWithCapacity(len(dimensions)) + for k, v := range dimensions { + labels.Insert(k, v) + } +} diff --git a/internal/receiver/smartagentreceiver/convert_test.go b/internal/receiver/smartagentreceiver/convert_test.go new file mode 100644 index 0000000000..4601a20277 --- /dev/null +++ b/internal/receiver/smartagentreceiver/convert_test.go @@ -0,0 +1,366 @@ +// Copyright 2021, OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +package smartagentreceiver + +import ( + "testing" + "time" + + sfx "github.com/signalfx/golib/v3/datapoint" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/consumer/pdata" + "go.uber.org/zap" +) + +// based on https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/master/receiver/signalfxreceiver/signalfxv2_to_metricdata_test.go + +var now = time.Now() + +func sfxDatapoint() *sfx.Datapoint { + return &sfx.Datapoint{ + Metric: "some metric", + Timestamp: now, + Value: sfx.NewIntValue(13), + MetricType: sfx.Gauge, + Dimensions: map[string]string{ + "k0": "v0", + "k1": "v1", + "k2": "v2", + }, + } +} + +func pdataMetric() (pdata.Metrics, pdata.Metric) { + out := pdata.NewMetrics() + out.ResourceMetrics().Resize(1) + rm := out.ResourceMetrics().At(0) + rm.InstrumentationLibraryMetrics().Resize(1) + ilm := rm.InstrumentationLibraryMetrics().At(0) + ms := ilm.Metrics() + + ms.Resize(1) + m := ms.At(0) + return out, m +} + +func pdataMetrics(dataType pdata.MetricDataType, val interface{}) pdata.Metrics { + metrics, metric := pdataMetric() + metric.SetDataType(dataType) + metric.SetName("some metric") + + var dps interface{} + + switch dataType { + case pdata.MetricDataTypeIntGauge: + metric.IntGauge().InitEmpty() + dps = metric.IntGauge().DataPoints() + case pdata.MetricDataTypeIntSum: + metric.IntSum().InitEmpty() + metric.IntSum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative) + dps = metric.IntSum().DataPoints() + case pdata.MetricDataTypeDoubleGauge: + metric.DoubleGauge().InitEmpty() + dps = metric.DoubleGauge().DataPoints() + case pdata.MetricDataTypeDoubleSum: + metric.DoubleSum().InitEmpty() + metric.DoubleSum().SetAggregationTemporality(pdata.AggregationTemporalityCumulative) + dps = metric.DoubleSum().DataPoints() + } + + var labels pdata.StringMap + + switch dataType { + case pdata.MetricDataTypeIntGauge, pdata.MetricDataTypeIntSum: + dps.(pdata.IntDataPointSlice).Resize(1) + dp := dps.(pdata.IntDataPointSlice).At(0) + labels = dp.LabelsMap() + dp.SetTimestamp(pdata.TimestampUnixNano(now.UnixNano())) + dp.SetValue(int64(val.(int))) + case pdata.MetricDataTypeDoubleGauge, pdata.MetricDataTypeDoubleSum: + dps.(pdata.DoubleDataPointSlice).Resize(1) + dp := dps.(pdata.DoubleDataPointSlice).At(0) + labels = dp.LabelsMap() + dp.SetTimestamp(pdata.TimestampUnixNano(now.UnixNano())) + dp.SetValue(val.(float64)) + } + + labels.InitFromMap(map[string]string{ + "k0": "v0", + "k1": "v1", + "k2": "v2", + }) + labels.Sort() + + return metrics +} + +func TestToMetrics(t *testing.T) { + tests := []struct { + name string + datapoints []*sfx.Datapoint + expectedMetrics pdata.Metrics + expectedDropped int + }{ + { + name: "IntGauge", + datapoints: []*sfx.Datapoint{sfxDatapoint()}, + expectedMetrics: pdataMetrics(pdata.MetricDataTypeIntGauge, 13), + }, + { + name: "DoubleGauge", + datapoints: func() []*sfx.Datapoint { + pt := sfxDatapoint() + pt.MetricType = sfx.Gauge + pt.Value = sfx.NewFloatValue(13.13) + return []*sfx.Datapoint{pt} + }(), + expectedMetrics: pdataMetrics(pdata.MetricDataTypeDoubleGauge, 13.13), + }, + { + name: "IntCount", + datapoints: func() []*sfx.Datapoint { + pt := sfxDatapoint() + pt.MetricType = sfx.Count + return []*sfx.Datapoint{pt} + }(), + expectedMetrics: func() pdata.Metrics { + m := pdataMetrics(pdata.MetricDataTypeIntSum, 13) + d := m.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).IntSum() + d.SetAggregationTemporality(pdata.AggregationTemporalityDelta) + d.SetIsMonotonic(true) + return m + }(), + }, + { + name: "DoubleCount", + datapoints: func() []*sfx.Datapoint { + pt := sfxDatapoint() + pt.MetricType = sfx.Count + pt.Value = sfx.NewFloatValue(13.13) + return []*sfx.Datapoint{pt} + }(), + expectedMetrics: func() pdata.Metrics { + m := pdataMetrics(pdata.MetricDataTypeDoubleSum, 13.13) + d := m.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).DoubleSum() + d.SetAggregationTemporality(pdata.AggregationTemporalityDelta) + d.SetIsMonotonic(true) + return m + }(), + }, + { + name: "IntCounter", + datapoints: func() []*sfx.Datapoint { + pt := sfxDatapoint() + pt.MetricType = sfx.Counter + return []*sfx.Datapoint{pt} + }(), + expectedMetrics: func() pdata.Metrics { + m := pdataMetrics(pdata.MetricDataTypeIntSum, 13) + d := m.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).IntSum() + d.SetAggregationTemporality(pdata.AggregationTemporalityCumulative) + d.SetIsMonotonic(true) + return m + }(), + }, + { + name: "DoubleCounter", + datapoints: func() []*sfx.Datapoint { + pt := sfxDatapoint() + pt.MetricType = sfx.Counter + pt.Value = sfx.NewFloatValue(13.13) + return []*sfx.Datapoint{pt} + }(), + expectedMetrics: func() pdata.Metrics { + m := pdataMetrics(pdata.MetricDataTypeDoubleSum, 13.13) + d := m.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).DoubleSum() + d.SetAggregationTemporality(pdata.AggregationTemporalityCumulative) + d.SetIsMonotonic(true) + return m + }(), + }, + { + name: "with_zero_timestamp", + datapoints: func() []*sfx.Datapoint { + pt := sfxDatapoint() + pt.Timestamp = time.Unix(0, 0) + return []*sfx.Datapoint{pt} + }(), + expectedMetrics: func() pdata.Metrics { + md := pdataMetrics(pdata.MetricDataTypeIntGauge, 13) + md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).IntGauge().DataPoints().At(0).SetTimestamp(0) + return md + }(), + }, + { + name: "empty_dimension_values_accepted", + datapoints: func() []*sfx.Datapoint { + pt := sfxDatapoint() + pt.Dimensions["k0"] = "" + return []*sfx.Datapoint{pt} + }(), + expectedMetrics: func() pdata.Metrics { + md := pdataMetrics(pdata.MetricDataTypeIntGauge, 13) + md.ResourceMetrics().At(0).InstrumentationLibraryMetrics().At(0).Metrics().At(0).IntGauge().DataPoints().At(0).LabelsMap().Update("k0", "") + return md + }(), + }, + { + name: "nil_datapoints_ignored", + datapoints: []*sfx.Datapoint{nil, sfxDatapoint(), nil}, + expectedMetrics: pdataMetrics(pdata.MetricDataTypeIntGauge, 13), + expectedDropped: 0, + }, + { + name: "drops_invalid_datapoints", + datapoints: func() []*sfx.Datapoint { + // nil value + pt0 := sfxDatapoint() + pt0.Value = nil + + // timestamps aren't supported + pt1 := sfxDatapoint() + pt1.MetricType = sfx.Timestamp + + // unknown enum value + pt2 := sfxDatapoint() + pt2.MetricType = sfx.Counter + 100 + + return []*sfx.Datapoint{ + pt0, pt1, sfxDatapoint(), pt2} + }(), + expectedMetrics: pdataMetrics(pdata.MetricDataTypeIntGauge, 13), + expectedDropped: 3, + }, + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + converter := Converter{logger: zap.NewNop()} + md, dropped := converter.toMetrics(test.datapoints) + sortLabels(tt, md) + + assert.Equal(tt, test.expectedMetrics, md) + assert.Equal(tt, test.expectedDropped, dropped) + }) + } +} + +func TestSetDataTypeWithInvalidDatapoints(t *testing.T) { + tests := []struct { + name string + datapoint *sfx.Datapoint + expectedError string + }{ + { + name: "timestamp_as_MetricType", + datapoint: func() *sfx.Datapoint { + datapoint := sfxDatapoint() + datapoint.MetricType = sfx.Timestamp + return datapoint + }(), + expectedError: "unsupported metric type timestamp", + }, + { + name: "string_as_datapoint_value", + datapoint: func() *sfx.Datapoint { + datapoint := sfxDatapoint() + datapoint.Value = sfx.NewStringValue("disallowed") + return datapoint + }(), + expectedError: "unsupported value type datapoint.strWire: disallowed", + }, + { + name: "nonexistent_MetricType", + datapoint: func() *sfx.Datapoint { + datapoint := sfxDatapoint() + datapoint.MetricType = sfx.Counter - 10000 + return datapoint + }(), + expectedError: "unsupported metric type datapoint.MetricType: MetricType(-", + }, + } + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + _, metric := pdataMetric() + err := setDataType(test.datapoint, metric) + require.Error(t, err) + assert.Contains(t, err.Error(), test.expectedError) + }) + } +} + +func TestFillIntDatapointWithInvalidValue(t *testing.T) { + datapoint := sfxDatapoint() + datapoint.MetricType = sfx.Gauge + datapoint.Value = sfx.NewIntValue(123) + + _, metric := pdataMetric() + setDataType(datapoint, metric) + gauge := metric.IntGauge() + gauge.InitEmpty() + + datapoint.Value = sfx.NewFloatValue(123.45) + err := fillIntDatapoint(datapoint, gauge.DataPoints()) + require.Error(t, err) + assert.EqualError(t, err, "no valid value for expected IntValue") +} + +func TestFillDoubleDatapointWithInvalidValue(t *testing.T) { + datapoint := sfxDatapoint() + datapoint.MetricType = sfx.Gauge + datapoint.Value = sfx.NewFloatValue(123.45) + + _, metric := pdataMetric() + setDataType(datapoint, metric) + gauge := metric.DoubleGauge() + gauge.InitEmpty() + + datapoint.Value = sfx.NewIntValue(123) + err := fillDoubleDatapoint(datapoint, gauge.DataPoints()) + require.Error(t, err) + assert.EqualError(t, err, "no valid value for expected FloatValue") +} + +func sortLabels(t *testing.T, metrics pdata.Metrics) { + for i := 0; i < metrics.ResourceMetrics().Len(); i++ { + rm := metrics.ResourceMetrics().At(i) + for j := 0; j < rm.InstrumentationLibraryMetrics().Len(); j++ { + ilm := rm.InstrumentationLibraryMetrics().At(j) + for k := 0; k < ilm.Metrics().Len(); k++ { + m := ilm.Metrics().At(k) + switch m.DataType() { + case pdata.MetricDataTypeIntGauge: + for l := 0; l < m.IntGauge().DataPoints().Len(); l++ { + m.IntGauge().DataPoints().At(l).LabelsMap().Sort() + } + case pdata.MetricDataTypeIntSum: + for l := 0; l < m.IntSum().DataPoints().Len(); l++ { + m.IntSum().DataPoints().At(l).LabelsMap().Sort() + } + case pdata.MetricDataTypeDoubleGauge: + for l := 0; l < m.DoubleGauge().DataPoints().Len(); l++ { + m.DoubleGauge().DataPoints().At(l).LabelsMap().Sort() + } + case pdata.MetricDataTypeDoubleSum: + for l := 0; l < m.DoubleSum().DataPoints().Len(); l++ { + m.DoubleSum().DataPoints().At(l).LabelsMap().Sort() + } + default: + t.Errorf("unexpected datatype: %v", m.DataType()) + } + } + } + } +} diff --git a/internal/receiver/smartagentreceiver/output.go b/internal/receiver/smartagentreceiver/output.go index 96f8b5737a..327e3e36a2 100644 --- a/internal/receiver/smartagentreceiver/output.go +++ b/internal/receiver/smartagentreceiver/output.go @@ -15,6 +15,8 @@ package smartagentreceiver import ( + "context" + "github.com/signalfx/golib/v3/datapoint" "github.com/signalfx/golib/v3/event" "github.com/signalfx/golib/v3/trace" @@ -30,10 +32,19 @@ import ( type Output struct { nextConsumer consumer.MetricsConsumer logger *zap.Logger + converter Converter } var _ types.FilteringOutput = (*Output)(nil) +func NewOutput(nextConsumer consumer.MetricsConsumer, logger *zap.Logger) *Output { + return &Output{ + nextConsumer: nextConsumer, + logger: logger, + converter: Converter{logger: logger}, + } +} + func (output *Output) AddDatapointExclusionFilter(filter dpfilters.DatapointFilter) { output.logger.Debug("AddDatapointExclusionFilter has been called", zap.Any("filter", filter)) } @@ -60,6 +71,9 @@ func (output *Output) Copy() types.Output { func (output *Output) SendDatapoints(datapoints ...*datapoint.Datapoint) { output.logger.Debug("SendDatapoints has been called.", zap.Any("datapoints", datapoints)) + metrics, numDropped := output.converter.toMetrics(datapoints) + output.logger.Debug("SendDatapoints", zap.Any("metrics", metrics), zap.Int("numDropped", numDropped)) + output.nextConsumer.ConsumeMetrics(context.Background(), metrics) } func (output *Output) SendEvent(event *event.Event) { diff --git a/internal/receiver/smartagentreceiver/receiver.go b/internal/receiver/smartagentreceiver/receiver.go index dc45dd4198..e0e05e1f8a 100644 --- a/internal/receiver/smartagentreceiver/receiver.go +++ b/internal/receiver/smartagentreceiver/receiver.go @@ -115,7 +115,7 @@ func (r *Receiver) createMonitor(monitorType string) (interface{}, error) { } if monitorOutputValue.IsValid() { - output := &Output{nextConsumer: r.nextConsumer, logger: r.logger} + output := NewOutput(r.nextConsumer, r.logger) monitorOutputValue.Set(reflect.ValueOf(output)) } else { return nil, fmt.Errorf("invalid monitor instance: %#v", monitor) diff --git a/internal/receiver/smartagentreceiver/receiver_test.go b/internal/receiver/smartagentreceiver/receiver_test.go index 43c9237343..9960087088 100644 --- a/internal/receiver/smartagentreceiver/receiver_test.go +++ b/internal/receiver/smartagentreceiver/receiver_test.go @@ -29,11 +29,26 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configmodels" "go.opentelemetry.io/collector/consumer/consumertest" + "go.opentelemetry.io/collector/consumer/pdata" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" ) +var expectedCPUMetrics = map[string]pdata.MetricDataType{ + "cpu.idle": pdata.MetricDataTypeDoubleSum, + "cpu.interrupt": pdata.MetricDataTypeDoubleSum, + "cpu.nice": pdata.MetricDataTypeDoubleSum, + "cpu.num_processors": pdata.MetricDataTypeIntGauge, + "cpu.softirq": pdata.MetricDataTypeDoubleSum, + "cpu.steal": pdata.MetricDataTypeDoubleSum, + "cpu.system": pdata.MetricDataTypeDoubleSum, + "cpu.user": pdata.MetricDataTypeDoubleSum, + "cpu.utilization": pdata.MetricDataTypeDoubleGauge, + "cpu.utilization_per_core": pdata.MetricDataTypeDoubleGauge, + "cpu.wait": pdata.MetricDataTypeDoubleSum, +} + func newConfig(nameVal, monitorType string, intervalSeconds int) Config { return Config{ ReceiverSettings: configmodels.ReceiverSettings{ @@ -45,31 +60,98 @@ func newConfig(nameVal, monitorType string, intervalSeconds int) Config { Type: monitorType, IntervalSeconds: intervalSeconds, }, + ReportPerCPU: true, }, } } func TestSmartAgentReceiver(t *testing.T) { - cfg := newConfig("valid", "cpu", 1) + cfg := newConfig("valid", "cpu", 10) observed, logs := observer.New(zapcore.DebugLevel) - receiver := NewReceiver(zap.New(observed), cfg, consumertest.NewMetricsNop()) + consumer := new(consumertest.MetricsSink) + receiver := NewReceiver(zap.New(observed), cfg, consumer) err := receiver.Start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err) assert.EqualValues(t, "smartagentvalid", cfg.monitorConfig.MonitorConfigCore().MonitorID) monitorOutput := receiver.monitor.(*cpu.Monitor).Output - _, ok := monitorOutput.(*Output) - assert.True(t, ok) + _, isOutput := monitorOutput.(*Output) + assert.True(t, isOutput) assert.Eventuallyf(t, func() bool { filtered := logs.FilterMessageSnippet("SendDatapoints has been called.") return len(filtered.All()) == 1 }, 5*time.Second, 1*time.Millisecond, "failed to receive any metrics from monitor") + assert.Eventuallyf(t, func() bool { + // confirm single occurrence of total metrics as sanity in lieu of + // out of scope cpu monitor verification. + seenTotalMetric := map[string]bool{} + + allMetrics := consumer.AllMetrics() + for _, m := range allMetrics { + resourceMetrics := m.ResourceMetrics() + for i := 0; i < resourceMetrics.Len(); i++ { + resourceMetric := resourceMetrics.At(i) + instrumentationLibraryMetrics := resourceMetric.InstrumentationLibraryMetrics() + for j := 0; j < instrumentationLibraryMetrics.Len(); j++ { + instrumentationLibraryMetric := instrumentationLibraryMetrics.At(j) + metrics := instrumentationLibraryMetric.Metrics() + for k := 0; k < metrics.Len(); k++ { + metric := metrics.At(k) + name := metric.Name() + dataType := metric.DataType() + expectedDataType := expectedCPUMetrics[name] + require.NotEqual(t, pdata.MetricDataTypeNone, expectedDataType, "received unexpected none type for %s", name) + assert.Equal(t, expectedDataType, dataType) + var labels pdata.StringMap + switch dataType { + case pdata.MetricDataTypeIntGauge: + ig := metric.IntGauge() + for l := 0; l < ig.DataPoints().Len(); l++ { + igdp := ig.DataPoints().At(l) + labels = igdp.LabelsMap() + var val interface{} = igdp.Value() + _, ok := val.(int64) + assert.True(t, ok, "invalid value of MetricDataTypeIntGauge metric %s", name) + } + case pdata.MetricDataTypeDoubleGauge: + dg := metric.DoubleGauge() + for l := 0; l < dg.DataPoints().Len(); l++ { + dgdp := dg.DataPoints().At(l) + labels = dgdp.LabelsMap() + var val interface{} = dgdp.Value() + _, ok := val.(float64) + assert.True(t, ok, "invalid value of MetricDataTypeDoubleGauge metric %s", name) + } + case pdata.MetricDataTypeDoubleSum: + ds := metric.DoubleSum() + for l := 0; l < ds.DataPoints().Len(); l++ { + dsdp := ds.DataPoints().At(l) + labels = dsdp.LabelsMap() + var val interface{} = dsdp.Value() + _, ok := val.(float64) + assert.True(t, ok, "invalid value of MetricDataTypeDoubleSum metric %s", name) + } + default: + t.Errorf("unexpected type %#v for metric %s", metric.DataType(), name) + } + if labels.Len() == 0 { + assert.False(t, seenTotalMetric[name], "unexpected repeated total metric for %v", name) + seenTotalMetric[name] = true + } + } + } + } + } + return len(allMetrics) > 0 + }, 5*time.Second, 1*time.Millisecond, "failed to receive expected cpu metrics") + + metrics := consumer.AllMetrics() + assert.Greater(t, len(metrics), 0) err = receiver.Shutdown(context.Background()) assert.NoError(t, err) - } func TestStartReceiverWithInvalidMonitorConfig(t *testing.T) { From fb160bd8cd8a9671fbcd0a8003d597aecec61dc2 Mon Sep 17 00:00:00 2001 From: Ryan Fitzpatrick Date: Thu, 14 Jan 2021 15:32:00 +0000 Subject: [PATCH 2/2] smartagent: use obsreport and debug logging --- .../receiver/smartagentreceiver/convert.go | 4 ++-- .../receiver/smartagentreceiver/output.go | 20 +++++++++++++++---- .../receiver/smartagentreceiver/receiver.go | 2 +- .../smartagentreceiver/receiver_test.go | 10 +--------- 4 files changed, 20 insertions(+), 16 deletions(-) diff --git a/internal/receiver/smartagentreceiver/convert.go b/internal/receiver/smartagentreceiver/convert.go index 6a98281cf6..86902ac9af 100644 --- a/internal/receiver/smartagentreceiver/convert.go +++ b/internal/receiver/smartagentreceiver/convert.go @@ -54,7 +54,7 @@ func (c *Converter) toMetrics(datapoints []*sfx.Datapoint) (pdata.Metrics, int) err := setDataType(datapoint, m) if err != nil { numDropped++ - c.logger.Warn("SignalFx datapoint type conversion error", + c.logger.Debug("SignalFx datapoint type conversion error", zap.Error(err), zap.String("metric", datapoint.String())) continue @@ -75,7 +75,7 @@ func (c *Converter) toMetrics(datapoints []*sfx.Datapoint) (pdata.Metrics, int) if err != nil { numDropped++ - c.logger.Warn("SignalFx datapoint datum conversion error", + c.logger.Debug("SignalFx datapoint datum conversion error", zap.Error(err), zap.String("metric", datapoint.Metric)) continue diff --git a/internal/receiver/smartagentreceiver/output.go b/internal/receiver/smartagentreceiver/output.go index 327e3e36a2..94305ed5b6 100644 --- a/internal/receiver/smartagentreceiver/output.go +++ b/internal/receiver/smartagentreceiver/output.go @@ -23,13 +23,17 @@ import ( "github.com/signalfx/signalfx-agent/pkg/core/dpfilters" "github.com/signalfx/signalfx-agent/pkg/monitors/types" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/obsreport" "go.uber.org/zap" ) +const internalTransport = "internal" + // Output is an implementation of a Smart Agent FilteringOutput that receives datapoints from a configured monitor. // It is what provides metrics to the next MetricsConsumer (to be implemented later). At this stage it is only // a logging instance. type Output struct { + receiverName string nextConsumer consumer.MetricsConsumer logger *zap.Logger converter Converter @@ -37,8 +41,9 @@ type Output struct { var _ types.FilteringOutput = (*Output)(nil) -func NewOutput(nextConsumer consumer.MetricsConsumer, logger *zap.Logger) *Output { +func NewOutput(config Config, nextConsumer consumer.MetricsConsumer, logger *zap.Logger) *Output { return &Output{ + receiverName: config.Name(), nextConsumer: nextConsumer, logger: logger, converter: Converter{logger: logger}, @@ -70,10 +75,17 @@ func (output *Output) Copy() types.Output { } func (output *Output) SendDatapoints(datapoints ...*datapoint.Datapoint) { - output.logger.Debug("SendDatapoints has been called.", zap.Any("datapoints", datapoints)) + ctx := obsreport.ReceiverContext(context.Background(), output.receiverName, internalTransport) + ctx = obsreport.StartMetricsReceiveOp(ctx, typeStr, internalTransport) + metrics, numDropped := output.converter.toMetrics(datapoints) - output.logger.Debug("SendDatapoints", zap.Any("metrics", metrics), zap.Int("numDropped", numDropped)) - output.nextConsumer.ConsumeMetrics(context.Background(), metrics) + if numDropped > 0 { + output.logger.Debug("SendDatapoints has dropped points", zap.Int("numDropped", numDropped)) + } + + _, numPoints := metrics.MetricAndDataPointCount() + err := output.nextConsumer.ConsumeMetrics(context.Background(), metrics) + obsreport.EndMetricsReceiveOp(ctx, typeStr, numPoints, err) } func (output *Output) SendEvent(event *event.Event) { diff --git a/internal/receiver/smartagentreceiver/receiver.go b/internal/receiver/smartagentreceiver/receiver.go index e0e05e1f8a..f7ea415c60 100644 --- a/internal/receiver/smartagentreceiver/receiver.go +++ b/internal/receiver/smartagentreceiver/receiver.go @@ -115,7 +115,7 @@ func (r *Receiver) createMonitor(monitorType string) (interface{}, error) { } if monitorOutputValue.IsValid() { - output := NewOutput(r.nextConsumer, r.logger) + output := NewOutput(*r.config, r.nextConsumer, r.logger) monitorOutputValue.Set(reflect.ValueOf(output)) } else { return nil, fmt.Errorf("invalid monitor instance: %#v", monitor) diff --git a/internal/receiver/smartagentreceiver/receiver_test.go b/internal/receiver/smartagentreceiver/receiver_test.go index 9960087088..b3e8ba2493 100644 --- a/internal/receiver/smartagentreceiver/receiver_test.go +++ b/internal/receiver/smartagentreceiver/receiver_test.go @@ -31,8 +31,6 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/consumer/pdata" "go.uber.org/zap" - "go.uber.org/zap/zapcore" - "go.uber.org/zap/zaptest/observer" ) var expectedCPUMetrics = map[string]pdata.MetricDataType{ @@ -67,9 +65,8 @@ func newConfig(nameVal, monitorType string, intervalSeconds int) Config { func TestSmartAgentReceiver(t *testing.T) { cfg := newConfig("valid", "cpu", 10) - observed, logs := observer.New(zapcore.DebugLevel) consumer := new(consumertest.MetricsSink) - receiver := NewReceiver(zap.New(observed), cfg, consumer) + receiver := NewReceiver(zap.NewNop(), cfg, consumer) err := receiver.Start(context.Background(), componenttest.NewNopHost()) require.NoError(t, err) @@ -79,11 +76,6 @@ func TestSmartAgentReceiver(t *testing.T) { _, isOutput := monitorOutput.(*Output) assert.True(t, isOutput) - assert.Eventuallyf(t, func() bool { - filtered := logs.FilterMessageSnippet("SendDatapoints has been called.") - return len(filtered.All()) == 1 - }, 5*time.Second, 1*time.Millisecond, "failed to receive any metrics from monitor") - assert.Eventuallyf(t, func() bool { // confirm single occurrence of total metrics as sanity in lieu of // out of scope cpu monitor verification.