From 55109e5b830a8b9c60c8d5c1029cbef04ec5aae1 Mon Sep 17 00:00:00 2001 From: Ashvitha Sridharan Date: Wed, 24 May 2023 15:53:52 -0400 Subject: [PATCH 01/11] Add custom metrics for Exporter and transform operations --- agent/hcp/telemetry/custom_metrics.go | 14 +++ agent/hcp/telemetry/otel_exporter.go | 14 ++- agent/hcp/telemetry/otel_exporter_test.go | 83 ++++++++++++-- agent/hcp/telemetry/otlp_transform.go | 3 +- agent/hcp/telemetry/otlp_transform_test.go | 123 ++++++++++++++++----- 5 files changed, 197 insertions(+), 40 deletions(-) create mode 100644 agent/hcp/telemetry/custom_metrics.go diff --git a/agent/hcp/telemetry/custom_metrics.go b/agent/hcp/telemetry/custom_metrics.go new file mode 100644 index 000000000000..bda9ada4b48e --- /dev/null +++ b/agent/hcp/telemetry/custom_metrics.go @@ -0,0 +1,14 @@ +package telemetry + +// Keys for custom Go Metrics metrics emitted only for the OTEL +// export (exporter.go) and transform (transform.go) failures and successes. +// These enable us to monitor OTEL operations. +var ( + transformFailureMetric []string = []string{"hcp", "otel", "transform", "failure"} + + exportSuccessMetric []string = []string{"hcp", "otel", "exporter", "export", "sucess"} + exportFailureMetric []string = []string{"hcp", "otel", "exporter", "export", "failure"} + + exporterShutdownMetric []string = []string{"hcp", "otel", "exporter", "shutdown"} + exporterForceFlushMetric []string = []string{"hcp", "otel", "exporter", "force_flush"} +) diff --git a/agent/hcp/telemetry/otel_exporter.go b/agent/hcp/telemetry/otel_exporter.go index 2512706f5353..e3ec375608d1 100644 --- a/agent/hcp/telemetry/otel_exporter.go +++ b/agent/hcp/telemetry/otel_exporter.go @@ -4,6 +4,7 @@ import ( "context" "net/url" + goMetrics "github.com/armon/go-metrics" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" @@ -56,17 +57,24 @@ func (e *OTELExporter) Export(ctx context.Context, metrics *metricdata.ResourceM if isEmpty(otlpMetrics) { return nil } - return e.client.ExportMetrics(ctx, otlpMetrics, e.endpoint.String()) + err := e.client.ExportMetrics(ctx, otlpMetrics, e.endpoint.String()) + if err != nil { + goMetrics.IncrCounter(exportFailureMetric, 1) + return err + } + + goMetrics.IncrCounter(exportSuccessMetric, 1) + return nil } // ForceFlush is a no-op, as the MetricsClient client holds no state. func (e *OTELExporter) ForceFlush(ctx context.Context) error { - // TODO: Emit metric when this operation occurs. + goMetrics.IncrCounter(exporterForceFlushMetric, 1) return ctx.Err() } // Shutdown is a no-op, as the MetricsClient is a HTTP client that requires no graceful shutdown. func (e *OTELExporter) Shutdown(ctx context.Context) error { - // TODO: Emit metric when this operation occurs. + goMetrics.IncrCounter(exporterShutdownMetric, 1) return ctx.Err() } diff --git a/agent/hcp/telemetry/otel_exporter_test.go b/agent/hcp/telemetry/otel_exporter_test.go index 72e6b84d242c..3c98a9534af6 100644 --- a/agent/hcp/telemetry/otel_exporter_test.go +++ b/agent/hcp/telemetry/otel_exporter_test.go @@ -4,8 +4,11 @@ import ( "context" "fmt" "net/url" + "strings" "testing" + "time" + "github.com/armon/go-metrics" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/metric/aggregation" @@ -16,6 +19,14 @@ import ( "github.com/hashicorp/consul/agent/hcp/client" ) +type mockMetricsClient struct { + exportErr error +} + +func (m *mockMetricsClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error { + return m.exportErr +} + func TestTemporality(t *testing.T) { t.Parallel() exp := &OTELExporter{} @@ -50,14 +61,6 @@ func TestAggregation(t *testing.T) { } } -type mockMetricsClient struct { - exportErr error -} - -func (m *mockMetricsClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.ResourceMetrics, endpoint string) error { - return m.exportErr -} - func TestExport(t *testing.T) { t.Parallel() for name, test := range map[string]struct { @@ -111,6 +114,58 @@ func TestExport(t *testing.T) { } } +// TestExport_CustomMetrics tests that a custom metric (hcp.otel.exporter.*) is emitted +// for exporter operations. This test cannot be run in parallel as the metrics.NewGlobal() +// sets a shared global sink. +func TestExport_CustomMetrics(t *testing.T) { + for name, tc := range map[string]struct { + client client.MetricsClient + metricKey []string + }{ + "exportSuccess": { + client: &mockMetricsClient{}, + metricKey: exportSuccessMetric, + }, + "exportFailure": { + client: &mockMetricsClient{ + exportErr: fmt.Errorf("failed to export metrics"), + }, + metricKey: exportFailureMetric, + }, + "shutdown": { + metricKey: exporterShutdownMetric, + }, + "forceFlush": { + metricKey: exporterForceFlushMetric, + }, + } { + t.Run(name, func(t *testing.T) { + // Init global sink. + serviceName := "test.transform" + cfg := metrics.DefaultConfig(serviceName) + cfg.EnableHostname = false + + sink := metrics.NewInmemSink(10*time.Second, 10*time.Second) + metrics.NewGlobal(cfg, sink) + + // Perform operation that emits metric. + exp := NewOTELExporter(tc.client, &url.URL{}) + performExporterOperation(exp, name) + + // Collect sink metrics. + intervals := sink.Data() + require.Len(t, intervals, 1) + key := serviceName + "." + strings.Join(tc.metricKey, ".") + sv := intervals[0].Counters[key] + + // Verify count for transform failure metric. + require.NotNil(t, sv) + require.NotNil(t, sv.AggregateSample) + require.Equal(t, 1, sv.AggregateSample.Count) + }) + } +} + func TestForceFlush(t *testing.T) { t.Parallel() exp := &OTELExporter{} @@ -137,3 +192,15 @@ func mutateMetrics(m []metricdata.ScopeMetrics) *metricdata.ResourceMetrics { ScopeMetrics: m, } } + +func performExporterOperation(exp metric.Exporter, operation string) { + ctx := context.Background() + switch operation { + case "forceFlush": + exp.ForceFlush(ctx) + case "shutdown": + exp.Shutdown(ctx) + default: + exp.Export(ctx, inputResourceMetrics) + } +} diff --git a/agent/hcp/telemetry/otlp_transform.go b/agent/hcp/telemetry/otlp_transform.go index 7ba1650ffd05..35c0ed2a1cc2 100644 --- a/agent/hcp/telemetry/otlp_transform.go +++ b/agent/hcp/telemetry/otlp_transform.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" + goMetrics "github.com/armon/go-metrics" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/metric/metricdata" cpb "go.opentelemetry.io/proto/otlp/common/v1" @@ -70,7 +71,7 @@ func metricsToPB(metrics []metricdata.Metrics) []*mpb.Metric { for _, m := range metrics { o, err := metricTypeToPB(m) if err != nil { - // TODO: Emit metric when a transformation failure occurs. + goMetrics.IncrCounter(transformFailureMetric, 1) continue } out = append(out, o) diff --git a/agent/hcp/telemetry/otlp_transform_test.go b/agent/hcp/telemetry/otlp_transform_test.go index 1c22e9a5cd75..12966e5e2522 100644 --- a/agent/hcp/telemetry/otlp_transform_test.go +++ b/agent/hcp/telemetry/otlp_transform_test.go @@ -1,9 +1,11 @@ package telemetry import ( + "strings" "testing" "time" + "github.com/armon/go-metrics" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/otel/attribute" @@ -17,7 +19,6 @@ import ( ) var ( - // Common attributes for test cases. start = time.Date(2000, time.January, 01, 0, 0, 0, 0, time.FixedZone("GMT", 0)) end = start.Add(30 * time.Second) @@ -127,37 +128,43 @@ var ( }, } + validFloat64Gauge = metricdata.Metrics{ + Name: "float64-gauge", + Description: "Gauge with float64 values", + Unit: "1", + Data: metricdata.Gauge[float64]{DataPoints: inputDP}, + } + + validFloat64Sum = metricdata.Metrics{ + Name: "float64-sum", + Description: "Sum with float64 values", + Unit: "1", + Data: metricdata.Sum[float64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: false, + DataPoints: inputDP, + }, + } + + validFloat64Hist = metricdata.Metrics{ + Name: "float64-histogram", + Description: "Histogram", + Unit: "1", + Data: metricdata.Histogram[float64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: inputHDP, + }, + } + // Metrics Test Case // - 3 invalid metrics and 3 Valid to test filtering - // - 1 invalid metric type - // - 2 invalid cummulative temporalities (only cummulative supported) + // - 1 invalid metric type + // - 2 invalid cummulative temporalities (only cummulative supported) // - 3 types (Gauge, Counter, and Histogram) supported inputMetrics = []metricdata.Metrics{ - { - Name: "float64-gauge", - Description: "Gauge with float64 values", - Unit: "1", - Data: metricdata.Gauge[float64]{DataPoints: inputDP}, - }, - { - Name: "float64-sum", - Description: "Sum with float64 values", - Unit: "1", - Data: metricdata.Sum[float64]{ - Temporality: metricdata.CumulativeTemporality, - IsMonotonic: false, - DataPoints: inputDP, - }, - }, - { - Name: "float64-histogram", - Description: "Histogram", - Unit: "1", - Data: metricdata.Histogram[float64]{ - Temporality: metricdata.CumulativeTemporality, - DataPoints: inputHDP, - }, - }, + validFloat64Gauge, + validFloat64Sum, + validFloat64Hist, invalidSumTemporality, invalidHistTemporality, invalidSumAgg, @@ -273,3 +280,63 @@ func TestTransformOTLP(t *testing.T) { rm := transformOTLP(inputResourceMetrics) require.Equal(t, expectedResourceMetrics, rm) } + +// TestTransformOTLP_CustomMetrics tests that a custom metric (hcp.otel.transform.failure) is emitted +// when transform fails. This test cannot be run in parallel as the metrics.NewGlobal() +// sets a shared global sink. +func TestTransformOTLP_CustomMetrics(t *testing.T) { + for name, tc := range map[string]struct { + inputRM *metricdata.ResourceMetrics + expectedMetricCount int + }{ + "successNoMetric": { + inputRM: &metricdata.ResourceMetrics{ + // 3 valid metrics. + ScopeMetrics: []metricdata.ScopeMetrics{ + { + Metrics: []metricdata.Metrics{ + validFloat64Gauge, + validFloat64Hist, + validFloat64Sum, + }, + }, + }, + }, + }, + "failureEmitsMetric": { + // inputScopeMetrics contains 3 bad metrics. + inputRM: inputResourceMetrics, + expectedMetricCount: 3, + }, + } { + tc := tc + t.Run(name, func(t *testing.T) { + // Init global sink. + serviceName := "test.transform" + cfg := metrics.DefaultConfig(serviceName) + cfg.EnableHostname = false + + sink := metrics.NewInmemSink(10*time.Second, 10*time.Second) + metrics.NewGlobal(cfg, sink) + + // Perform operation that emits metric. + transformOTLP(tc.inputRM) + + // Collect sink metrics. + intervals := sink.Data() + require.Len(t, intervals, 1) + key := serviceName + "." + strings.Join(transformFailureMetric, ".") + sv := intervals[0].Counters[key] + + if tc.expectedMetricCount == 0 { + require.Empty(t, sv) + return + } + + // Verify count for transform failure metric. + require.NotNil(t, sv) + require.NotNil(t, sv.AggregateSample) + require.Equal(t, 3, sv.AggregateSample.Count) + }) + } +} From f6b3a9d42603640760e3896490c191f20a2259db Mon Sep 17 00:00:00 2001 From: Ashvitha Sridharan Date: Wed, 24 May 2023 15:56:15 -0400 Subject: [PATCH 02/11] Improve deps logging Run go mod tidy --- agent/hcp/deps.go | 8 ++++++-- agent/setup.go | 2 +- go.sum | 20 ++++++++++---------- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/agent/hcp/deps.go b/agent/hcp/deps.go index fcf2a6ef7800..2f733c41d10b 100644 --- a/agent/hcp/deps.go +++ b/agent/hcp/deps.go @@ -27,15 +27,17 @@ type Deps struct { func NewDeps(cfg config.CloudConfig, logger hclog.Logger, nodeID types.NodeID) (d Deps, err error) { d.Client, err = hcpclient.NewClient(cfg) if err != nil { + logger.Error("failed to init HCP deps - client:", "error", err) return } - d.Provider, err = scada.New(cfg, logger.Named("hcp.scada")) + d.Provider, err = scada.New(cfg, logger.Named("scada")) if err != nil { + logger.Error("failed to init HCP deps - scada:", "error", err) return } - d.Sink = sink(d.Client, &cfg, logger, nodeID) + d.Sink = sink(d.Client, &cfg, logger.Named("sink"), nodeID) return } @@ -86,5 +88,7 @@ func sink(hcpClient hcpclient.Client, cfg hcpclient.CloudConfig, logger hclog.Lo return nil } + logger.Info("Initialized HCP Metrics Sink") + return sink } diff --git a/agent/setup.go b/agent/setup.go index 6e6bb322681d..9ed993aaf4a6 100644 --- a/agent/setup.go +++ b/agent/setup.go @@ -104,7 +104,7 @@ func NewBaseDeps(configLoader ConfigLoader, logOut io.Writer, providedLogger hcl var extraSinks []metrics.MetricSink if cfg.IsCloudEnabled() { - d.HCP, err = hcp.NewDeps(cfg.Cloud, d.Logger, cfg.NodeID) + d.HCP, err = hcp.NewDeps(cfg.Cloud, d.Logger.Named("hcp"), cfg.NodeID) if err != nil { return d, err } diff --git a/go.sum b/go.sum index bffefaaf6aab..9bb28c97c75a 100644 --- a/go.sum +++ b/go.sum @@ -1079,16 +1079,16 @@ go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= -go.opentelemetry.io/otel v1.16.0-rc.1.0.20230510144741-7dea2225a218 h1:aKv7ueCXRlBdHGBNfot8BYwcvp4jwJ/rK/T/KQ3uXoA= -go.opentelemetry.io/otel v1.16.0-rc.1.0.20230510144741-7dea2225a218/go.mod h1:dGSTwGyzvw5Dzn8nE8HrfOXnWIDrL0GIzQdOpTnJ2CM= -go.opentelemetry.io/otel/metric v1.16.0-rc.1 h1:R9MPFw2jA+z91ejfOVU7QRYSdb37E5Ak6jJUwNMQbR8= -go.opentelemetry.io/otel/metric v1.16.0-rc.1/go.mod h1:0I+4bYjKHaoXGw7uXAABYA5wyptQdXeXOhi3SBgD6GM= -go.opentelemetry.io/otel/sdk v1.16.0-rc.1.0.20230510144741-7dea2225a218 h1:YC5ikDtSM7s+sJprqR7edyP9EBKMHGaAnWfte7EsQCI= -go.opentelemetry.io/otel/sdk v1.16.0-rc.1.0.20230510144741-7dea2225a218/go.mod h1:tY+q2LQ4iuvdwcN0zrt/2NdF3ntVodUPbiHPMRZnXyo= -go.opentelemetry.io/otel/sdk/metric v0.39.0-rc.1.0.20230510144741-7dea2225a218 h1:5Ehgy+TyY7Jh3orDVIn7uVJ7UkFm3yP5lXXQN8ia+00= -go.opentelemetry.io/otel/sdk/metric v0.39.0-rc.1.0.20230510144741-7dea2225a218/go.mod h1:VKkJz/K+pb4rkqXlBH5DMJi1ebQLYhV82fTSK3WvOOQ= -go.opentelemetry.io/otel/trace v1.16.0-rc.1 h1:/dPBlZrzSSXglIEKgy/A3kyiACcmgNMFWKTIHHxxd/o= -go.opentelemetry.io/otel/trace v1.16.0-rc.1/go.mod h1:xqretMbHfSU24I2KKbSEG+aVHsNtBCr5L4BGaNqTx68= +go.opentelemetry.io/otel v1.16.0 h1:Z7GVAX/UkAXPKsy94IU+i6thsQS4nb7LviLpnaNeW8s= +go.opentelemetry.io/otel v1.16.0/go.mod h1:vl0h9NUa1D5s1nv3A5vZOYWn8av4K8Ml6JDeHrT/bx4= +go.opentelemetry.io/otel/metric v1.16.0 h1:RbrpwVG1Hfv85LgnZ7+txXioPDoh6EdbZHo26Q3hqOo= +go.opentelemetry.io/otel/metric v1.16.0/go.mod h1:QE47cpOmkwipPiefDwo2wDzwJrlfxxNYodqc4xnGCo4= +go.opentelemetry.io/otel/sdk v1.16.0 h1:Z1Ok1YsijYL0CSJpHt4cS3wDDh7p572grzNrBMiMWgE= +go.opentelemetry.io/otel/sdk v1.16.0/go.mod h1:tMsIuKXuuIWPBAOrH+eHtvhTL+SntFtXF9QD68aP6p4= +go.opentelemetry.io/otel/sdk/metric v0.39.0 h1:Kun8i1eYf48kHH83RucG93ffz0zGV1sh46FAScOTuDI= +go.opentelemetry.io/otel/sdk/metric v0.39.0/go.mod h1:piDIRgjcK7u0HCL5pCA4e74qpK/jk3NiUoAHATVAmiI= +go.opentelemetry.io/otel/trace v1.16.0 h1:8JRpaObFoW0pxuVPapkgH8UhHQj+bJW8jJsCZEu5MQs= +go.opentelemetry.io/otel/trace v1.16.0/go.mod h1:Yt9vYq1SdNz3xdjZZK7wcXv1qv2pwLkqr2QVwea0ef0= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw= go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= From 1b2500fe0cc0aa1f5b4accc7122729b2c7c1786a Mon Sep 17 00:00:00 2001 From: Ashvitha Sridharan Date: Wed, 24 May 2023 16:05:05 -0400 Subject: [PATCH 03/11] Upgrade SDK and OTEL --- go.mod | 14 +++++++------- go.sum | 7 ++++--- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/go.mod b/go.mod index e4e32cea3732..89ef735d985d 100644 --- a/go.mod +++ b/go.mod @@ -62,7 +62,7 @@ require ( github.com/hashicorp/golang-lru v0.5.4 github.com/hashicorp/hcl v1.0.0 github.com/hashicorp/hcp-scada-provider v0.2.3 - github.com/hashicorp/hcp-sdk-go v0.46.1-0.20230519164650-51657675d9e7 + github.com/hashicorp/hcp-sdk-go v0.48.0 github.com/hashicorp/hil v0.0.0-20200423225030-a18a1cd20038 github.com/hashicorp/memberlist v0.5.0 github.com/hashicorp/raft v1.5.0 @@ -94,12 +94,12 @@ require ( github.com/rboyer/safeio v0.2.1 github.com/ryanuber/columnize v2.1.2+incompatible github.com/shirou/gopsutil/v3 v3.22.8 - github.com/stretchr/testify v1.8.2 + github.com/stretchr/testify v1.8.3 go.etcd.io/bbolt v1.3.6 - go.opentelemetry.io/otel v1.16.0-rc.1.0.20230510144741-7dea2225a218 - go.opentelemetry.io/otel/metric v1.16.0-rc.1 - go.opentelemetry.io/otel/sdk v1.16.0-rc.1.0.20230510144741-7dea2225a218 - go.opentelemetry.io/otel/sdk/metric v0.39.0-rc.1.0.20230510144741-7dea2225a218 + go.opentelemetry.io/otel v1.16.0 + go.opentelemetry.io/otel/metric v1.16.0 + go.opentelemetry.io/otel/sdk v1.16.0 + go.opentelemetry.io/otel/sdk/metric v0.39.0 go.opentelemetry.io/proto/otlp v0.19.0 go.uber.org/goleak v1.1.10 golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d @@ -231,7 +231,7 @@ require ( github.com/yusufpapurcu/wmi v1.2.2 // indirect go.mongodb.org/mongo-driver v1.11.0 // indirect go.opencensus.io v0.23.0 // indirect - go.opentelemetry.io/otel/trace v1.16.0-rc.1 // indirect + go.opentelemetry.io/otel/trace v1.16.0 // indirect go.uber.org/atomic v1.9.0 // indirect golang.org/x/exp v0.0.0-20230321023759-10a507213a29 // indirect golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect diff --git a/go.sum b/go.sum index 9bb28c97c75a..59ebf57327e0 100644 --- a/go.sum +++ b/go.sum @@ -610,8 +610,8 @@ github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hashicorp/hcp-scada-provider v0.2.3 h1:AarYR+/Pcv+cMvPdAlb92uOBmZfEH6ny4+DT+4NY2VQ= github.com/hashicorp/hcp-scada-provider v0.2.3/go.mod h1:ZFTgGwkzNv99PLQjTsulzaCplCzOTBh0IUQsPKzrQFo= -github.com/hashicorp/hcp-sdk-go v0.46.1-0.20230519164650-51657675d9e7 h1:/7/5kyyCT5tCeRanKIJAfP8Z6JnjEV55PNuI6phn2k0= -github.com/hashicorp/hcp-sdk-go v0.46.1-0.20230519164650-51657675d9e7/go.mod h1:hZqky4HEzsKwvLOt4QJlZUrjeQmb4UCZUhDP2HyQFfc= +github.com/hashicorp/hcp-sdk-go v0.48.0 h1:LWpFR7YVDz4uG4C/ixcy2tRbg7/BgjMcTh1bRkKaeBQ= +github.com/hashicorp/hcp-sdk-go v0.48.0/go.mod h1:hZqky4HEzsKwvLOt4QJlZUrjeQmb4UCZUhDP2HyQFfc= github.com/hashicorp/hil v0.0.0-20200423225030-a18a1cd20038 h1:n9J0rwVWXDpNd5iZnwY7w4WZyq53/rROeI7OVvLW8Ok= github.com/hashicorp/hil v0.0.0-20200423225030-a18a1cd20038/go.mod h1:n2TSygSNwsLJ76m8qFXTSc7beTb+auJxYdqrnoqwZWE= github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64= @@ -1011,8 +1011,9 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/stretchr/testify v1.8.3 h1:RP3t2pwF7cMEbC1dqtB6poj3niw/9gnV4Cjg5oW5gtY= +github.com/stretchr/testify v1.8.3/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/tencentcloud/tencentcloud-sdk-go v1.0.162 h1:8fDzz4GuVg4skjY2B0nMN7h6uN61EDVkuLyI2+qGHhI= github.com/tencentcloud/tencentcloud-sdk-go v1.0.162/go.mod h1:asUz5BPXxgoPGaRgZaVm1iGcUAuHyYUo1nXqKa83cvI= github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= From 01338eef155162aff4092cd7f09d65488d7eeabb Mon Sep 17 00:00:00 2001 From: Ashvitha Sridharan Date: Wed, 24 May 2023 16:27:24 -0400 Subject: [PATCH 04/11] Remove the partial success implemetation and check for HTTP status code in metrics client --- agent/hcp/client/metrics_client.go | 12 ++---------- agent/hcp/client/metrics_client_test.go | 8 +------- 2 files changed, 3 insertions(+), 17 deletions(-) diff --git a/agent/hcp/client/metrics_client.go b/agent/hcp/client/metrics_client.go index a45d6343fc46..ab2cf6c752e5 100644 --- a/agent/hcp/client/metrics_client.go +++ b/agent/hcp/client/metrics_client.go @@ -144,16 +144,8 @@ func (o *otlpClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.R return fmt.Errorf("failed to export metrics: %v", err) } - if respData.Len() != 0 { - var respProto colmetricpb.ExportMetricsServiceResponse - if err := proto.Unmarshal(respData.Bytes(), &respProto); err != nil { - return fmt.Errorf("failed to export metrics: %v", err) - } - - if respProto.PartialSuccess != nil { - msg := respProto.PartialSuccess.GetErrorMessage() - return fmt.Errorf("failed to export metrics: partial success: %s", msg) - } + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("failed to export metrics: code %d: %s", resp.StatusCode, string(body)) } return nil diff --git a/agent/hcp/client/metrics_client_test.go b/agent/hcp/client/metrics_client_test.go index ee4c4262bba9..3cbbfe1232a4 100644 --- a/agent/hcp/client/metrics_client_test.go +++ b/agent/hcp/client/metrics_client_test.go @@ -72,7 +72,7 @@ func TestExportMetrics(t *testing.T) { }, "failsWithNonRetryableError": { status: http.StatusBadRequest, - wantErr: "failed to export metrics", + wantErr: "failed to export metrics: code 400", }, } { t.Run(name, func(t *testing.T) { @@ -82,12 +82,6 @@ func TestExportMetrics(t *testing.T) { require.Equal(t, r.Header.Get("Authorization"), "Bearer test-token") body := colpb.ExportMetricsServiceResponse{} - - if test.wantErr != "" { - body.PartialSuccess = &colpb.ExportMetricsPartialSuccess{ - ErrorMessage: "partial failure", - } - } bytes, err := proto.Marshal(&body) require.NoError(t, err) From 64899a4f8ae8e61bf5da3708fea69841b43e7717 Mon Sep 17 00:00:00 2001 From: Ashvitha Sridharan Date: Thu, 25 May 2023 16:39:43 -0400 Subject: [PATCH 05/11] Add x-channel --- agent/hcp/client/metrics_client.go | 5 ++++- agent/hcp/client/metrics_client_test.go | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/agent/hcp/client/metrics_client.go b/agent/hcp/client/metrics_client.go index ab2cf6c752e5..15c43d218b62 100644 --- a/agent/hcp/client/metrics_client.go +++ b/agent/hcp/client/metrics_client.go @@ -17,6 +17,8 @@ import ( metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" "golang.org/x/oauth2" "google.golang.org/protobuf/proto" + + "github.com/hashicorp/consul/version" ) const ( @@ -72,8 +74,9 @@ func NewMetricsClient(cfg CloudConfig, ctx context.Context) (MetricsClient, erro } header := make(http.Header) - header.Set("Content-Type", "application/x-protobuf") + header.Set("content-type", "application/x-protobuf") header.Set("x-hcp-resource-id", r.String()) + header.Set("x-channel", fmt.Sprintf("consul/%s", version.Version)) return &otlpClient{ client: c, diff --git a/agent/hcp/client/metrics_client_test.go b/agent/hcp/client/metrics_client_test.go index 3cbbfe1232a4..e298d34e8b1a 100644 --- a/agent/hcp/client/metrics_client_test.go +++ b/agent/hcp/client/metrics_client_test.go @@ -11,6 +11,8 @@ import ( colpb "go.opentelemetry.io/proto/otlp/collector/metrics/v1" metricpb "go.opentelemetry.io/proto/otlp/metrics/v1" "google.golang.org/protobuf/proto" + + "github.com/hashicorp/consul/version" ) func TestNewMetricsClient(t *testing.T) { @@ -77,8 +79,9 @@ func TestExportMetrics(t *testing.T) { } { t.Run(name, func(t *testing.T) { srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - require.Equal(t, r.Header.Get("Content-Type"), "application/x-protobuf") + require.Equal(t, r.Header.Get("content-type"), "application/x-protobuf") require.Equal(t, r.Header.Get("x-hcp-resource-id"), testResourceID) + require.Equal(t, r.Header.Get("x-channel"), fmt.Sprintf("consul/%s", version.Version)) require.Equal(t, r.Header.Get("Authorization"), "Bearer test-token") body := colpb.ExportMetricsServiceResponse{} From 46e09183581f5d2598216a0fabe283b7d5724d7d Mon Sep 17 00:00:00 2001 From: Ashvitha Sridharan Date: Thu, 25 May 2023 16:54:12 -0400 Subject: [PATCH 06/11] cleanup logs in deps.go based on PR feedback --- agent/hcp/deps.go | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/agent/hcp/deps.go b/agent/hcp/deps.go index 2f733c41d10b..fa4be4ff9071 100644 --- a/agent/hcp/deps.go +++ b/agent/hcp/deps.go @@ -5,6 +5,7 @@ package hcp import ( "context" + "fmt" "net/url" "time" @@ -24,22 +25,24 @@ type Deps struct { Sink metrics.MetricSink } -func NewDeps(cfg config.CloudConfig, logger hclog.Logger, nodeID types.NodeID) (d Deps, err error) { - d.Client, err = hcpclient.NewClient(cfg) +func NewDeps(cfg config.CloudConfig, logger hclog.Logger, nodeID types.NodeID) (Deps, error) { + client, err := hcpclient.NewClient(cfg) if err != nil { - logger.Error("failed to init HCP deps - client:", "error", err) - return + return Deps{}, fmt.Errorf("failed to init client: %w:", err) } - d.Provider, err = scada.New(cfg, logger.Named("scada")) + provider, err := scada.New(cfg, logger.Named("scada")) if err != nil { - logger.Error("failed to init HCP deps - scada:", "error", err) - return + return Deps{}, fmt.Errorf("failed to init scada: %w", err) } - d.Sink = sink(d.Client, &cfg, logger.Named("sink"), nodeID) + sink := sink(client, &cfg, logger.Named("sink"), nodeID) - return + return Deps{ + Client: client, + Provider: provider, + Sink: sink, + }, nil } // sink provides initializes an OTELSink which forwards Consul metrics to HCP. From ca8932693422b7b418a9e51a152fee196178bb84 Mon Sep 17 00:00:00 2001 From: Ashvitha Sridharan Date: Thu, 25 May 2023 16:55:17 -0400 Subject: [PATCH 07/11] Change to debug log and lowercase --- agent/hcp/deps.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/agent/hcp/deps.go b/agent/hcp/deps.go index fa4be4ff9071..f4ad161daba4 100644 --- a/agent/hcp/deps.go +++ b/agent/hcp/deps.go @@ -91,7 +91,7 @@ func sink(hcpClient hcpclient.Client, cfg hcpclient.CloudConfig, logger hclog.Lo return nil } - logger.Info("Initialized HCP Metrics Sink") + logger.Debug("initialized HCP metrics sink") return sink } From 0305ed3780623faf464ed5bdb59c9bb977543d6b Mon Sep 17 00:00:00 2001 From: Ashvitha Sridharan Date: Thu, 25 May 2023 17:04:08 -0400 Subject: [PATCH 08/11] address test operation feedback --- agent/hcp/telemetry/otel_exporter_test.go | 36 ++++++++++++----------- 1 file changed, 19 insertions(+), 17 deletions(-) diff --git a/agent/hcp/telemetry/otel_exporter_test.go b/agent/hcp/telemetry/otel_exporter_test.go index 3c98a9534af6..6402dde5ed8c 100644 --- a/agent/hcp/telemetry/otel_exporter_test.go +++ b/agent/hcp/telemetry/otel_exporter_test.go @@ -121,22 +121,27 @@ func TestExport_CustomMetrics(t *testing.T) { for name, tc := range map[string]struct { client client.MetricsClient metricKey []string + operation string }{ - "exportSuccess": { + "exportSuccessEmitsCustomMetric": { client: &mockMetricsClient{}, metricKey: exportSuccessMetric, + operation: "export", }, - "exportFailure": { + "exportFailureEmitsCustomMetric": { client: &mockMetricsClient{ exportErr: fmt.Errorf("failed to export metrics"), }, metricKey: exportFailureMetric, + operation: "export", }, - "shutdown": { + "shutdownEmitsCustomMetric": { metricKey: exporterShutdownMetric, + operation: "shutdown", }, - "forceFlush": { + "forceFlushEmitsCustomMetric": { metricKey: exporterForceFlushMetric, + operation: "flush", }, } { t.Run(name, func(t *testing.T) { @@ -150,7 +155,16 @@ func TestExport_CustomMetrics(t *testing.T) { // Perform operation that emits metric. exp := NewOTELExporter(tc.client, &url.URL{}) - performExporterOperation(exp, name) + + ctx := context.Background() + switch tc.operation { + case "flush": + exp.ForceFlush(ctx) + case "shutdown": + exp.Shutdown(ctx) + default: + exp.Export(ctx, inputResourceMetrics) + } // Collect sink metrics. intervals := sink.Data() @@ -192,15 +206,3 @@ func mutateMetrics(m []metricdata.ScopeMetrics) *metricdata.ResourceMetrics { ScopeMetrics: m, } } - -func performExporterOperation(exp metric.Exporter, operation string) { - ctx := context.Background() - switch operation { - case "forceFlush": - exp.ForceFlush(ctx) - case "shutdown": - exp.Shutdown(ctx) - default: - exp.Export(ctx, inputResourceMetrics) - } -} From dd2f7a8764b98a979e10df9a70de91ea2230b4b0 Mon Sep 17 00:00:00 2001 From: Ashvitha Sridharan Date: Thu, 25 May 2023 17:09:39 -0400 Subject: [PATCH 09/11] use GetHumanVersion on version --- agent/hcp/client/metrics_client.go | 2 +- agent/hcp/client/metrics_client_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/agent/hcp/client/metrics_client.go b/agent/hcp/client/metrics_client.go index 15c43d218b62..15a395e8bf8a 100644 --- a/agent/hcp/client/metrics_client.go +++ b/agent/hcp/client/metrics_client.go @@ -76,7 +76,7 @@ func NewMetricsClient(cfg CloudConfig, ctx context.Context) (MetricsClient, erro header := make(http.Header) header.Set("content-type", "application/x-protobuf") header.Set("x-hcp-resource-id", r.String()) - header.Set("x-channel", fmt.Sprintf("consul/%s", version.Version)) + header.Set("x-channel", fmt.Sprintf("consul/%s", version.GetHumanVersion())) return &otlpClient{ client: c, diff --git a/agent/hcp/client/metrics_client_test.go b/agent/hcp/client/metrics_client_test.go index e298d34e8b1a..e80996fcf5eb 100644 --- a/agent/hcp/client/metrics_client_test.go +++ b/agent/hcp/client/metrics_client_test.go @@ -81,7 +81,7 @@ func TestExportMetrics(t *testing.T) { srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { require.Equal(t, r.Header.Get("content-type"), "application/x-protobuf") require.Equal(t, r.Header.Get("x-hcp-resource-id"), testResourceID) - require.Equal(t, r.Header.Get("x-channel"), fmt.Sprintf("consul/%s", version.Version)) + require.Equal(t, r.Header.Get("x-channel"), fmt.Sprintf("consul/%s", version.GetHumanVersion())) require.Equal(t, r.Header.Get("Authorization"), "Bearer test-token") body := colpb.ExportMetricsServiceResponse{} From 32b1525cf4f4c4e2a1f5cbd82688bc67688b5e4c Mon Sep 17 00:00:00 2001 From: Ashvitha Sridharan Date: Thu, 25 May 2023 18:34:06 -0400 Subject: [PATCH 10/11] Fix error wrapping --- agent/hcp/client/metrics_client.go | 8 ++++---- agent/hcp/telemetry/otel_exporter.go | 3 ++- agent/hcp/telemetry/otel_exporter_test.go | 2 +- 3 files changed, 7 insertions(+), 6 deletions(-) diff --git a/agent/hcp/client/metrics_client.go b/agent/hcp/client/metrics_client.go index 15a395e8bf8a..2c37a24a0754 100644 --- a/agent/hcp/client/metrics_client.go +++ b/agent/hcp/client/metrics_client.go @@ -127,24 +127,24 @@ func (o *otlpClient) ExportMetrics(ctx context.Context, protoMetrics *metricpb.R body, err := proto.Marshal(pbRequest) if err != nil { - return fmt.Errorf("failed to export metrics: %v", err) + return fmt.Errorf("failed to marshal the request: %w", err) } req, err := retryablehttp.NewRequest(http.MethodPost, endpoint, bytes.NewBuffer(body)) if err != nil { - return fmt.Errorf("failed to export metrics: %v", err) + return fmt.Errorf("failed to create request: %w", err) } req.Header = *o.header resp, err := o.client.Do(req.WithContext(ctx)) if err != nil { - return fmt.Errorf("failed to export metrics: %v", err) + return fmt.Errorf("failed to post metrics: %w", err) } defer resp.Body.Close() var respData bytes.Buffer if _, err := io.Copy(&respData, resp.Body); err != nil { - return fmt.Errorf("failed to export metrics: %v", err) + return fmt.Errorf("failed to read body: %w", err) } if resp.StatusCode != http.StatusOK { diff --git a/agent/hcp/telemetry/otel_exporter.go b/agent/hcp/telemetry/otel_exporter.go index e3ec375608d1..7942c0c5debf 100644 --- a/agent/hcp/telemetry/otel_exporter.go +++ b/agent/hcp/telemetry/otel_exporter.go @@ -2,6 +2,7 @@ package telemetry import ( "context" + "fmt" "net/url" goMetrics "github.com/armon/go-metrics" @@ -60,7 +61,7 @@ func (e *OTELExporter) Export(ctx context.Context, metrics *metricdata.ResourceM err := e.client.ExportMetrics(ctx, otlpMetrics, e.endpoint.String()) if err != nil { goMetrics.IncrCounter(exportFailureMetric, 1) - return err + return fmt.Errorf("failed to export metrics: %w", err) } goMetrics.IncrCounter(exportSuccessMetric, 1) diff --git a/agent/hcp/telemetry/otel_exporter_test.go b/agent/hcp/telemetry/otel_exporter_test.go index 6402dde5ed8c..04434786a94f 100644 --- a/agent/hcp/telemetry/otel_exporter_test.go +++ b/agent/hcp/telemetry/otel_exporter_test.go @@ -130,7 +130,7 @@ func TestExport_CustomMetrics(t *testing.T) { }, "exportFailureEmitsCustomMetric": { client: &mockMetricsClient{ - exportErr: fmt.Errorf("failed to export metrics"), + exportErr: fmt.Errorf("client err"), }, metricKey: exportFailureMetric, operation: "export", From b24ab3e599685e286f1182675ef29ba8e2069110 Mon Sep 17 00:00:00 2001 From: Ashvitha Sridharan Date: Fri, 26 May 2023 14:52:08 -0400 Subject: [PATCH 11/11] Fix metric names --- agent/hcp/telemetry/custom_metrics.go | 10 +++++----- agent/hcp/telemetry/otel_exporter.go | 8 ++++---- agent/hcp/telemetry/otel_exporter_test.go | 8 ++++---- agent/hcp/telemetry/otlp_transform.go | 2 +- agent/hcp/telemetry/otlp_transform_test.go | 2 +- 5 files changed, 15 insertions(+), 15 deletions(-) diff --git a/agent/hcp/telemetry/custom_metrics.go b/agent/hcp/telemetry/custom_metrics.go index bda9ada4b48e..746dc56cbe41 100644 --- a/agent/hcp/telemetry/custom_metrics.go +++ b/agent/hcp/telemetry/custom_metrics.go @@ -4,11 +4,11 @@ package telemetry // export (exporter.go) and transform (transform.go) failures and successes. // These enable us to monitor OTEL operations. var ( - transformFailureMetric []string = []string{"hcp", "otel", "transform", "failure"} + internalMetricTransformFailure []string = []string{"hcp", "otel", "transform", "failure"} - exportSuccessMetric []string = []string{"hcp", "otel", "exporter", "export", "sucess"} - exportFailureMetric []string = []string{"hcp", "otel", "exporter", "export", "failure"} + internalMetricExportSuccess []string = []string{"hcp", "otel", "exporter", "export", "sucess"} + internalMetricExportFailure []string = []string{"hcp", "otel", "exporter", "export", "failure"} - exporterShutdownMetric []string = []string{"hcp", "otel", "exporter", "shutdown"} - exporterForceFlushMetric []string = []string{"hcp", "otel", "exporter", "force_flush"} + internalMetricExporterShutdown []string = []string{"hcp", "otel", "exporter", "shutdown"} + internalMetricExporterForceFlush []string = []string{"hcp", "otel", "exporter", "force_flush"} ) diff --git a/agent/hcp/telemetry/otel_exporter.go b/agent/hcp/telemetry/otel_exporter.go index 7942c0c5debf..76c8f5b000b5 100644 --- a/agent/hcp/telemetry/otel_exporter.go +++ b/agent/hcp/telemetry/otel_exporter.go @@ -60,22 +60,22 @@ func (e *OTELExporter) Export(ctx context.Context, metrics *metricdata.ResourceM } err := e.client.ExportMetrics(ctx, otlpMetrics, e.endpoint.String()) if err != nil { - goMetrics.IncrCounter(exportFailureMetric, 1) + goMetrics.IncrCounter(internalMetricExportFailure, 1) return fmt.Errorf("failed to export metrics: %w", err) } - goMetrics.IncrCounter(exportSuccessMetric, 1) + goMetrics.IncrCounter(internalMetricExportSuccess, 1) return nil } // ForceFlush is a no-op, as the MetricsClient client holds no state. func (e *OTELExporter) ForceFlush(ctx context.Context) error { - goMetrics.IncrCounter(exporterForceFlushMetric, 1) + goMetrics.IncrCounter(internalMetricExporterForceFlush, 1) return ctx.Err() } // Shutdown is a no-op, as the MetricsClient is a HTTP client that requires no graceful shutdown. func (e *OTELExporter) Shutdown(ctx context.Context) error { - goMetrics.IncrCounter(exporterShutdownMetric, 1) + goMetrics.IncrCounter(internalMetricExporterShutdown, 1) return ctx.Err() } diff --git a/agent/hcp/telemetry/otel_exporter_test.go b/agent/hcp/telemetry/otel_exporter_test.go index 04434786a94f..bc1a626f1c16 100644 --- a/agent/hcp/telemetry/otel_exporter_test.go +++ b/agent/hcp/telemetry/otel_exporter_test.go @@ -125,22 +125,22 @@ func TestExport_CustomMetrics(t *testing.T) { }{ "exportSuccessEmitsCustomMetric": { client: &mockMetricsClient{}, - metricKey: exportSuccessMetric, + metricKey: internalMetricExportSuccess, operation: "export", }, "exportFailureEmitsCustomMetric": { client: &mockMetricsClient{ exportErr: fmt.Errorf("client err"), }, - metricKey: exportFailureMetric, + metricKey: internalMetricExportFailure, operation: "export", }, "shutdownEmitsCustomMetric": { - metricKey: exporterShutdownMetric, + metricKey: internalMetricExporterShutdown, operation: "shutdown", }, "forceFlushEmitsCustomMetric": { - metricKey: exporterForceFlushMetric, + metricKey: internalMetricExporterForceFlush, operation: "flush", }, } { diff --git a/agent/hcp/telemetry/otlp_transform.go b/agent/hcp/telemetry/otlp_transform.go index 35c0ed2a1cc2..76e20552a0d4 100644 --- a/agent/hcp/telemetry/otlp_transform.go +++ b/agent/hcp/telemetry/otlp_transform.go @@ -71,7 +71,7 @@ func metricsToPB(metrics []metricdata.Metrics) []*mpb.Metric { for _, m := range metrics { o, err := metricTypeToPB(m) if err != nil { - goMetrics.IncrCounter(transformFailureMetric, 1) + goMetrics.IncrCounter(internalMetricTransformFailure, 1) continue } out = append(out, o) diff --git a/agent/hcp/telemetry/otlp_transform_test.go b/agent/hcp/telemetry/otlp_transform_test.go index 12966e5e2522..8f6beb7d489d 100644 --- a/agent/hcp/telemetry/otlp_transform_test.go +++ b/agent/hcp/telemetry/otlp_transform_test.go @@ -325,7 +325,7 @@ func TestTransformOTLP_CustomMetrics(t *testing.T) { // Collect sink metrics. intervals := sink.Data() require.Len(t, intervals, 1) - key := serviceName + "." + strings.Join(transformFailureMetric, ".") + key := serviceName + "." + strings.Join(internalMetricTransformFailure, ".") sv := intervals[0].Counters[key] if tc.expectedMetricCount == 0 {