diff --git a/config/load.go b/config/load.go index 424e24e4..a8ffc883 100644 --- a/config/load.go +++ b/config/load.go @@ -2,21 +2,22 @@ package config import ( "fmt" + "os" "reflect" + "strings" "time" - "golang.org/x/exp/slices" - "github.com/fsnotify/fsnotify" "github.com/joho/godotenv" "github.com/spf13/viper" + "golang.org/x/exp/slices" ) func (c *Config) load() { c.hotReloadableConfig = make(map[string][]*configValue) c.envs = make(map[string]string) - if err := godotenv.Load(); err != nil { + if err := godotenv.Load(); err != nil && !isTest() { fmt.Println("INFO: No .env file found.") } @@ -29,7 +30,7 @@ func (c *Config) load() { v.SetConfigFile(configPath) err := v.ReadInConfig() // Find and read the config file // Don't panic if config.yaml is not found or error with parsing. Use the default config values instead - if err != nil { + if err != nil && !isTest() { fmt.Printf("[Config] :: Failed to parse config file from path %q, using default values: %v\n", configPath, err) } v.OnConfigChange(func(e fsnotify.Event) { @@ -233,3 +234,12 @@ func mapDeepEqual[K comparable, V any](a, b map[K]V) bool { return true } + +func isTest() bool { + for _, arg := range os.Args { + if strings.HasPrefix(arg, "-test.") { + return true + } + } + return false +} diff --git a/go.mod b/go.mod index 212d8744..07b9bf27 100644 --- a/go.mod +++ b/go.mod @@ -11,12 +11,12 @@ require ( github.com/ory/dockertest/v3 v3.9.1 github.com/spf13/viper v1.15.0 github.com/stretchr/testify v1.8.2 - go.opentelemetry.io/otel v1.11.2 - go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.34.0 - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.11.2 - go.opentelemetry.io/otel/metric v0.34.0 - go.opentelemetry.io/otel/sdk v1.11.2 - go.opentelemetry.io/otel/sdk/metric v0.34.0 + go.opentelemetry.io/otel v1.14.0 + go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.37.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.14.0 + go.opentelemetry.io/otel/metric v0.37.0 + go.opentelemetry.io/otel/sdk v1.14.0 + go.opentelemetry.io/otel/sdk/metric v0.37.0 golang.org/x/exp v0.0.0-20230310171629-522b1b587ee0 ) @@ -62,10 +62,10 @@ require ( github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect github.com/xeipuuv/gojsonschema v1.2.0 // indirect github.com/zenizh/go-capturer v0.0.0-20211219060012-52ea6c8fed04 - go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.2 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.34.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.2 // indirect - go.opentelemetry.io/otel/trace v1.11.2 // indirect + go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.37.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.14.0 // indirect + go.opentelemetry.io/otel/trace v1.14.0 // indirect go.opentelemetry.io/proto/otlp v0.19.0 // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.8.0 // indirect diff --git a/go.sum b/go.sum index 81dbde09..82116afc 100644 --- a/go.sum +++ b/go.sum @@ -301,33 +301,33 @@ go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= -go.opentelemetry.io/otel v1.11.2 h1:YBZcQlsVekzFsFbjygXMOXSs6pialIZxcjfO/mBDmR0= -go.opentelemetry.io/otel v1.11.2/go.mod h1:7p4EUV+AqgdlNV9gL97IgUZiVR3yrFXYo53f9BM3tRI= -go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.2 h1:htgM8vZIF8oPSCxa341e3IZ4yr/sKxgu8KZYllByiVY= -go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.11.2/go.mod h1:rqbht/LlhVBgn5+k3M5QK96K5Xb0DvXpMJ5SFQpY6uw= -go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.34.0 h1:kpskzLZ60cJ48SJ4uxWa6waBL+4kSV6nVK8rP+QM8Wg= -go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.34.0/go.mod h1:4+x3i62TEegDHuzNva0bMcAN8oUi5w4liGb1d/VgPYo= -go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.34.0 h1:e7kFb4pJLbhJgAwUdoVTHzB9pGujs5O8/7gFyZL88fg= -go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.34.0/go.mod h1:3x00m9exjIbhK+zTO4MsCSlfbVmgvLP0wjDgDKa/8bw= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.2 h1:fqR1kli93643au1RKo0Uma3d2aPQKT+WBKfTSBaKbOc= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.11.2/go.mod h1:5Qn6qvgkMsLDX+sYK64rHb1FPhpn0UtxF+ouX1uhyJE= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.11.2 h1:ERwKPn9Aer7Gxsc0+ZlutlH1bEEAUXAUhqm3Y45ABbk= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.11.2/go.mod h1:jWZUM2MWhWCJ9J9xVbRx7tzK1mXKpAlze4CeulycwVY= -go.opentelemetry.io/otel/metric v0.34.0 h1:MCPoQxcg/26EuuJwpYN1mZTeCYAUGx8ABxfW07YkjP8= -go.opentelemetry.io/otel/metric v0.34.0/go.mod h1:ZFuI4yQGNCupurTXCwkeD/zHBt+C2bR7bw5JqUm/AP8= -go.opentelemetry.io/otel/sdk v1.11.2 h1:GF4JoaEx7iihdMFu30sOyRx52HDHOkl9xQ8SMqNXUiU= -go.opentelemetry.io/otel/sdk v1.11.2/go.mod h1:wZ1WxImwpq+lVRo4vsmSOxdd+xwoUJ6rqyLc3SyX9aU= -go.opentelemetry.io/otel/sdk/metric v0.34.0 h1:7ElxfQpXCFZlRTvVRTkcUvK8Gt5DC8QzmzsLsO2gdzo= -go.opentelemetry.io/otel/sdk/metric v0.34.0/go.mod h1:l4r16BIqiqPy5rd14kkxllPy/fOI4tWo1jkpD9Z3ffQ= -go.opentelemetry.io/otel/trace v1.11.2 h1:Xf7hWSF2Glv0DE3MH7fBHvtpSBsjcBUe5MYAmZM/+y0= -go.opentelemetry.io/otel/trace v1.11.2/go.mod h1:4N+yC7QEz7TTsG9BSRLNAa63eg5E06ObSbKPmxQ/pKA= +go.opentelemetry.io/otel v1.14.0 h1:/79Huy8wbf5DnIPhemGB+zEPVwnN6fuQybr/SRXa6hM= +go.opentelemetry.io/otel v1.14.0/go.mod h1:o4buv+dJzx8rohcUeRmWUZhqupFvzWis188WlggnNeU= +go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0 h1:/fXHZHGvro6MVqV34fJzDhi7sHGpX3Ej/Qjmfn003ho= +go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.14.0/go.mod h1:UFG7EBMRdXyFstOwH028U0sVf+AvukSGhF0g8+dmNG8= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.37.0 h1:22J9c9mxNAZugv86zhwjBnER0DbO0VVpW9Oo/j3jBBQ= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.37.0/go.mod h1:QD8SSO9fgtBOvXYpcX5NXW+YnDJByTnh7a/9enQWFmw= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.37.0 h1:CI6DSdsSkJxX1rsfPSQ0SciKx6klhdDRBXqKb+FwXG8= +go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.37.0/go.mod h1:WLBYPrz8srktckhCjFaau4VHSfGaMuqoKSXwpzaiRZg= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.14.0 h1:TKf2uAs2ueguzLaxOCBXNpHxfO/aC7PAdDsSH0IbeRQ= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.14.0/go.mod h1:HrbCVv40OOLTABmOn1ZWty6CHXkU8DK/Urc43tHug70= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.14.0 h1:ap+y8RXX3Mu9apKVtOkM6WSFESLM8K3wNQyOU8sWHcc= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.14.0/go.mod h1:5w41DY6S9gZrbjuq6Y+753e96WfPha5IcsOSZTtullM= +go.opentelemetry.io/otel/metric v0.37.0 h1:pHDQuLQOZwYD+Km0eb657A25NaRzy0a+eLyKfDXedEs= +go.opentelemetry.io/otel/metric v0.37.0/go.mod h1:DmdaHfGt54iV6UKxsV9slj2bBRJcKC1B1uvDLIioc1s= +go.opentelemetry.io/otel/sdk v1.14.0 h1:PDCppFRDq8A1jL9v6KMI6dYesaq+DFcDZvjsoGvxGzY= +go.opentelemetry.io/otel/sdk v1.14.0/go.mod h1:bwIC5TjrNG6QDCHNWvW4HLHtUQ4I+VQDsnjhvyZCALM= +go.opentelemetry.io/otel/sdk/metric v0.37.0 h1:haYBBtZZxiI3ROwSmkZnI+d0+AVzBWeviuYQDeBWosU= +go.opentelemetry.io/otel/sdk/metric v0.37.0/go.mod h1:mO2WV1AZKKwhwHTV3AKOoIEb9LbUaENZDuGUQd+j4A0= +go.opentelemetry.io/otel/trace v1.14.0 h1:wp2Mmvj41tDsyAJXiWDWpfNsOiIyd38fy85pyKcFq/M= +go.opentelemetry.io/otel/trace v1.14.0/go.mod h1:8avnQLK+CG77yNLUae4ea2JDQ6iT+gozhnZjy/rw9G8= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw= go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= -go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= +go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= diff --git a/stats/internal/otel/options.go b/stats/internal/otel/options.go index 532ba94b..cfbbb689 100644 --- a/stats/internal/otel/options.go +++ b/stats/internal/otel/options.go @@ -3,6 +3,7 @@ package otel import ( "time" + "go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/sdk/instrumentation" sdkmetric "go.opentelemetry.io/otel/sdk/metric" @@ -89,6 +90,28 @@ func WithMeterProviderExportsInterval(interval time.Duration) MeterProviderOptio } } +// WithDefaultHistogramBucketBoundaries lets you overwrite the default buckets for all histograms. +func WithDefaultHistogramBucketBoundaries(boundaries []float64) MeterProviderOption { + return func(c *meterProviderConfig) { + c.otlpMetricGRPCOptions = append(c.otlpMetricGRPCOptions, + otlpmetricgrpc.WithAggregationSelector(func(ik sdkmetric.InstrumentKind) aggregation.Aggregation { + switch ik { + case sdkmetric.InstrumentKindCounter, sdkmetric.InstrumentKindUpDownCounter, + sdkmetric.InstrumentKindObservableCounter, sdkmetric.InstrumentKindObservableUpDownCounter: + return aggregation.Sum{} + case sdkmetric.InstrumentKindObservableGauge: + return aggregation.LastValue{} + case sdkmetric.InstrumentKindHistogram: + return aggregation.ExplicitBucketHistogram{ + Boundaries: boundaries, + } + } + panic("unknown instrument kind") + }), + ) + } +} + // WithHistogramBucketBoundaries allows the creation of a view to overwrite the default buckets of a given histogram. // meterName is optional. func WithHistogramBucketBoundaries(instrumentName, meterName string, boundaries []float64) MeterProviderOption { @@ -100,6 +123,7 @@ func WithHistogramBucketBoundaries(instrumentName, meterName string, boundaries sdkmetric.Instrument{ Name: instrumentName, Scope: scope, + Kind: sdkmetric.InstrumentKindHistogram, }, sdkmetric.Stream{ Aggregation: aggregation.ExplicitBucketHistogram{ diff --git a/stats/internal/otel/otel.go b/stats/internal/otel/otel.go index a1e4a3da..68b1411f 100644 --- a/stats/internal/otel/otel.go +++ b/stats/internal/otel/otel.go @@ -14,7 +14,7 @@ import ( sdkmetric "go.opentelemetry.io/otel/sdk/metric" "go.opentelemetry.io/otel/sdk/resource" sdktrace "go.opentelemetry.io/otel/sdk/trace" - semconv "go.opentelemetry.io/otel/semconv/v1.12.0" + semconv "go.opentelemetry.io/otel/semconv/v1.17.0" "golang.org/x/sync/errgroup" ) @@ -93,6 +93,9 @@ func (m *Manager) Setup( if c.withInsecure { meterProviderOptions = append(meterProviderOptions, otlpmetricgrpc.WithInsecure()) } + if len(c.meterProviderConfig.otlpMetricGRPCOptions) > 0 { + meterProviderOptions = append(meterProviderOptions, c.meterProviderConfig.otlpMetricGRPCOptions...) + } exp, err := otlpmetricgrpc.New(ctx, meterProviderOptions...) if err != nil { return nil, nil, fmt.Errorf("failed to create metric exporter: %w", err) @@ -202,8 +205,9 @@ type tracerProviderConfig struct { } type meterProviderConfig struct { - enabled bool - global bool - exportsInterval time.Duration - views []sdkmetric.View + enabled bool + global bool + exportsInterval time.Duration + views []sdkmetric.View + otlpMetricGRPCOptions []otlpmetricgrpc.Option } diff --git a/stats/internal/otel/otel_test.go b/stats/internal/otel/otel_test.go index ce9e52f1..2f2ce56d 100644 --- a/stats/internal/otel/otel_test.go +++ b/stats/internal/otel/otel_test.go @@ -18,6 +18,7 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric/global" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" "github.com/rudderlabs/rudder-go-kit/httputil" statsTest "github.com/rudderlabs/rudder-go-kit/stats/testhelper" @@ -57,44 +58,19 @@ func TestCollector(t *testing.T) { m := mp.Meter("some-test") // foo counter - counter, err := m.SyncInt64().Counter("foo") + counter, err := m.Int64Counter("foo") require.NoError(t, err) counter.Add(ctx, 1, attribute.String("hello", "world")) // bar counter - counter, err = m.SyncInt64().Counter("bar") + counter, err = m.Int64Counter("bar") require.NoError(t, err) counter.Add(ctx, 5) // baz histogram - h, err := m.SyncInt64().Histogram("baz") + h, err := m.Int64Histogram("baz") require.NoError(t, err) h.Record(ctx, 20, attribute.String("a", "b")) - var ( - resp *http.Response - metrics map[string]*promClient.MetricFamily - metricsEndpoint = fmt.Sprintf("http://localhost:%d/metrics", dt.GetHostPort(t, metricsPort, container)) - ) - require.Eventuallyf(t, func() bool { - resp, err = http.Get(metricsEndpoint) - if err != nil { - return false - } - defer func() { httputil.CloseResponse(resp) }() - metrics, err = statsTest.ParsePrometheusMetrics(resp.Body) - if err != nil { - return false - } - if _, ok := metrics["foo"]; !ok { - return false - } - if _, ok := metrics["bar"]; !ok { - return false - } - if _, ok := metrics["baz"]; !ok { - return false - } - return true - }, 5*time.Second, 100*time.Millisecond, "err: %v, metrics: %+v", err, metrics) + metrics := requireMetrics(t, container, "foo", "bar", "baz") require.EqualValues(t, ptr("foo"), metrics["foo"].Name) require.EqualValues(t, ptr(promClient.MetricType_COUNTER), metrics["foo"].Type) @@ -119,24 +95,172 @@ func TestCollector(t *testing.T) { {Name: ptr("instance"), Value: ptr("my-instance-id")}, }, metrics["bar"].Metric[0].Label) - require.EqualValues(t, ptr("baz"), metrics["baz"].Name) - require.EqualValues(t, ptr(promClient.MetricType_HISTOGRAM), metrics["baz"].Type) - require.Len(t, metrics["baz"].Metric, 1) - require.EqualValues(t, ptr(uint64(1)), metrics["baz"].Metric[0].Histogram.SampleCount) - require.EqualValues(t, ptr(20.0), metrics["baz"].Metric[0].Histogram.SampleSum) - require.ElementsMatch(t, []*promClient.Bucket{ - {CumulativeCount: ptr(uint64(0)), UpperBound: ptr(10.0)}, - {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(20.0)}, - {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(30.0)}, - {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(math.Inf(0))}, - }, metrics["baz"].Metric[0].Histogram.Bucket) - require.ElementsMatch(t, []*promClient.LabelPair{ - // the label1=value1 is coming from the otel-collector-config.yaml (see const_labels) - {Name: ptr("label1"), Value: ptr("value1")}, - {Name: ptr("a"), Value: ptr("b")}, - {Name: ptr("job"), Value: ptr("TestCollector")}, - {Name: ptr("instance"), Value: ptr("my-instance-id")}, - }, metrics["baz"].Metric[0].Label) + requireHistogramEqual(t, metrics["baz"], histogram{ + name: "baz", count: 1, sum: 20, + buckets: []*promClient.Bucket{ + {CumulativeCount: ptr(uint64(0)), UpperBound: ptr(10.0)}, + {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(20.0)}, + {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(30.0)}, + {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(math.Inf(1))}, + }, + labels: []*promClient.LabelPair{ + {Name: ptr("label1"), Value: ptr("value1")}, + {Name: ptr("a"), Value: ptr("b")}, + {Name: ptr("job"), Value: ptr("TestCollector")}, + {Name: ptr("instance"), Value: ptr("my-instance-id")}, + }, + }) +} + +func TestHistogramBuckets(t *testing.T) { + setup := func(t *testing.T, opts ...MeterProviderOption) (*docker.Container, *sdkmetric.MeterProvider) { + cwd, err := os.Getwd() + require.NoError(t, err) + container, grpcEndpoint := statsTest.StartOTelCollector(t, metricsPort, + filepath.Join(cwd, "testdata", "otel-collector-config.yaml"), + ) + + ctx := context.Background() + res, err := NewResource("TestHistogramBuckets", "my-instance-id", "1.0.0") + require.NoError(t, err) + var om Manager + _, mp, err := om.Setup(ctx, res, + WithInsecure(), + WithMeterProvider(grpcEndpoint, + append(opts, WithMeterProviderExportsInterval(50*time.Millisecond))..., + ), + ) + require.NoError(t, err) + t.Cleanup(func() { require.NoError(t, om.Shutdown(context.Background())) }) + require.NotEqual(t, mp, global.MeterProvider()) + + return container, mp + } + + t.Run("default applies to all meters", func(t *testing.T) { + ctx := context.Background() + container, mp := setup(t, + WithDefaultHistogramBucketBoundaries([]float64{10, 20, 30}), + ) + + // foo histogram on meter-1 + h, err := mp.Meter("meter-1").Int64Histogram("foo") + require.NoError(t, err) + h.Record(ctx, 20, attribute.String("a", "b")) + + // bar histogram on meter-2 + h, err = mp.Meter("meter-2").Int64Histogram("bar") + require.NoError(t, err) + h.Record(ctx, 30, attribute.String("c", "d")) + + metrics := requireMetrics(t, container, "foo", "bar") + + requireHistogramEqual(t, metrics["foo"], histogram{ + name: "foo", count: 1, sum: 20, + buckets: []*promClient.Bucket{ + {CumulativeCount: ptr(uint64(0)), UpperBound: ptr(10.0)}, + {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(20.0)}, + {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(30.0)}, + {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(math.Inf(1))}, + }, + labels: []*promClient.LabelPair{ + {Name: ptr("label1"), Value: ptr("value1")}, + {Name: ptr("a"), Value: ptr("b")}, + {Name: ptr("job"), Value: ptr("TestHistogramBuckets")}, + {Name: ptr("instance"), Value: ptr("my-instance-id")}, + }, + }) + + requireHistogramEqual(t, metrics["bar"], histogram{ + name: "bar", count: 1, sum: 30, + buckets: []*promClient.Bucket{ + {CumulativeCount: ptr(uint64(0)), UpperBound: ptr(10.0)}, + {CumulativeCount: ptr(uint64(0)), UpperBound: ptr(20.0)}, + {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(30.0)}, + {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(math.Inf(1))}, + }, + labels: []*promClient.LabelPair{ + {Name: ptr("label1"), Value: ptr("value1")}, + {Name: ptr("c"), Value: ptr("d")}, + {Name: ptr("job"), Value: ptr("TestHistogramBuckets")}, + {Name: ptr("instance"), Value: ptr("my-instance-id")}, + }, + }) + }) + + t.Run("custom boundaries do not override default ones", func(t *testing.T) { + ctx := context.Background() + container, mp := setup(t, + WithDefaultHistogramBucketBoundaries([]float64{10, 20, 30}), + WithHistogramBucketBoundaries("bar", "meter-1", []float64{40, 50, 60}), + WithHistogramBucketBoundaries("baz", "meter-1", []float64{70, 80, 90}), + ) + + // foo histogram + h, err := mp.Meter("meter-1").Int64Histogram("foo") + require.NoError(t, err) + h.Record(ctx, 20, attribute.String("a", "b")) + + // bar histogram + h, err = mp.Meter("meter-1").Int64Histogram("bar") + require.NoError(t, err) + h.Record(ctx, 50, attribute.String("c", "d")) + + // baz histogram + h, err = mp.Meter("meter-1").Int64Histogram("baz") + require.NoError(t, err) + h.Record(ctx, 80, attribute.String("e", "f")) + + metrics := requireMetrics(t, container, "foo", "bar", "baz") + + requireHistogramEqual(t, metrics["foo"], histogram{ + name: "foo", count: 1, sum: 20, + buckets: []*promClient.Bucket{ + {CumulativeCount: ptr(uint64(0)), UpperBound: ptr(10.0)}, + {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(20.0)}, + {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(30.0)}, + {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(math.Inf(1))}, + }, + labels: []*promClient.LabelPair{ + {Name: ptr("label1"), Value: ptr("value1")}, + {Name: ptr("a"), Value: ptr("b")}, + {Name: ptr("job"), Value: ptr("TestHistogramBuckets")}, + {Name: ptr("instance"), Value: ptr("my-instance-id")}, + }, + }) + + requireHistogramEqual(t, metrics["bar"], histogram{ + name: "bar", count: 1, sum: 50, + buckets: []*promClient.Bucket{ + {CumulativeCount: ptr(uint64(0)), UpperBound: ptr(40.0)}, + {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(50.0)}, + {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(60.0)}, + {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(math.Inf(1))}, + }, + labels: []*promClient.LabelPair{ + {Name: ptr("label1"), Value: ptr("value1")}, + {Name: ptr("c"), Value: ptr("d")}, + {Name: ptr("job"), Value: ptr("TestHistogramBuckets")}, + {Name: ptr("instance"), Value: ptr("my-instance-id")}, + }, + }) + + requireHistogramEqual(t, metrics["baz"], histogram{ + name: "baz", count: 1, sum: 80, + buckets: []*promClient.Bucket{ + {CumulativeCount: ptr(uint64(0)), UpperBound: ptr(70.0)}, + {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(80.0)}, + {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(90.0)}, + {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(math.Inf(1))}, + }, + labels: []*promClient.LabelPair{ + {Name: ptr("label1"), Value: ptr("value1")}, + {Name: ptr("e"), Value: ptr("f")}, + {Name: ptr("job"), Value: ptr("TestHistogramBuckets")}, + {Name: ptr("instance"), Value: ptr("my-instance-id")}, + }, + }) + }) } func TestCollectorGlobals(t *testing.T) { @@ -206,9 +330,9 @@ func TestNonBlockingConnection(t *testing.T) { }() meter := mp.Meter("test") - fooCounter, err := meter.SyncInt64().Counter("foo") + fooCounter, err := meter.Int64Counter("foo") require.NoError(t, err) - barCounter, err := meter.SyncFloat64().Counter("bar") + barCounter, err := meter.Float64Counter("bar") require.NoError(t, err) // this counter will not be lost even though the container isn't even started. see MaxElapsedTime. @@ -223,30 +347,7 @@ func TestNonBlockingConnection(t *testing.T) { ) barCounter.Add(ctx, 456) // this should be recorded - var ( - resp *http.Response - metrics map[string]*promClient.MetricFamily - metricsEndpoint = fmt.Sprintf("http://localhost:%d/metrics", dt.GetHostPort(t, metricsPort, container)) - ) - - require.Eventuallyf(t, func() bool { - resp, err = http.Get(metricsEndpoint) - if err != nil { - return false - } - defer func() { httputil.CloseResponse(resp) }() - metrics, err = statsTest.ParsePrometheusMetrics(resp.Body) - if err != nil { - return false - } - if _, ok := metrics["foo"]; !ok { - return false - } - if _, ok := metrics["bar"]; !ok { - return false - } - return true - }, 10*time.Second, 100*time.Millisecond, "err: %v, metrics: %+v", err, metrics) + metrics := requireMetrics(t, container, "foo", "bar") require.EqualValues(t, ptr("foo"), metrics["foo"].Name) require.EqualValues(t, ptr(promClient.MetricType_COUNTER), metrics["foo"].Type) @@ -272,6 +373,61 @@ func TestNonBlockingConnection(t *testing.T) { }, metrics["bar"].Metric[0].Label) } +func requireMetrics( + t *testing.T, container *docker.Container, requiredKeys ...string, +) map[string]*promClient.MetricFamily { + t.Helper() + + var ( + err error + resp *http.Response + metrics map[string]*promClient.MetricFamily + metricsEndpoint = fmt.Sprintf("http://localhost:%d/metrics", dt.GetHostPort(t, metricsPort, container)) + ) + require.Eventuallyf(t, func() bool { + resp, err = http.Get(metricsEndpoint) + if err != nil { + return false + } + defer func() { httputil.CloseResponse(resp) }() + metrics, err = statsTest.ParsePrometheusMetrics(resp.Body) + if err != nil { + return false + } + for _, k := range requiredKeys { + if _, ok := metrics[k]; !ok { + return false + } + } + return true + }, 5*time.Second, 100*time.Millisecond, "err: %v, metrics: %+v", err, metrics) + + return metrics +} + +func requireHistogramEqual(t *testing.T, mf *promClient.MetricFamily, h histogram) { + t.Helper() + require.EqualValues(t, &h.name, mf.Name) + require.EqualValues(t, ptr(promClient.MetricType_HISTOGRAM), mf.Type) + require.Len(t, mf.Metric, 1) + require.EqualValuesf(t, &h.count, mf.Metric[0].Histogram.SampleCount, + "Got %d, expected %d", *mf.Metric[0].Histogram.SampleCount, h.count, + ) + require.EqualValuesf(t, &h.sum, mf.Metric[0].Histogram.SampleSum, + "Got %.2f, expected %.2f", *mf.Metric[0].Histogram.SampleSum, h.sum, + ) + require.ElementsMatchf(t, h.buckets, mf.Metric[0].Histogram.Bucket, "Buckets for %q do not match", h.name) + require.ElementsMatch(t, h.labels, mf.Metric[0].Label) +} + func ptr[T any](v T) *T { return &v } + +type histogram struct { + name string + count uint64 + sum float64 + buckets []*promClient.Bucket + labels []*promClient.LabelPair +} diff --git a/stats/options.go b/stats/options.go index 1e62e699..0c53bc61 100644 --- a/stats/options.go +++ b/stats/options.go @@ -12,9 +12,12 @@ type statsConfig struct { namespaceIdentifier string excludedTags map[string]struct{} - periodicStatsConfig periodicStatsConfig + periodicStatsConfig periodicStatsConfig + defaultHistogramBuckets []float64 + histogramBuckets map[string][]float64 } +// Option is a function used to configure the stats service. type Option func(*statsConfig) // WithServiceName sets the service name for the stats service. @@ -30,3 +33,20 @@ func WithServiceVersion(version string) Option { c.serviceVersion = version } } + +// WithDefaultHistogramBuckets sets the histogram buckets for the stats service. +func WithDefaultHistogramBuckets(buckets []float64) Option { + return func(c *statsConfig) { + c.defaultHistogramBuckets = buckets + } +} + +// WithHistogramBuckets sets the histogram buckets for a measurement. +func WithHistogramBuckets(histogramName string, buckets []float64) Option { + return func(c *statsConfig) { + if c.histogramBuckets == nil { + c.histogramBuckets = make(map[string][]float64) + } + c.histogramBuckets[histogramName] = buckets + } +} diff --git a/stats/otel.go b/stats/otel.go index 6f25cbb4..c8a02e9b 100644 --- a/stats/otel.go +++ b/stats/otel.go @@ -12,27 +12,28 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/instrument" - "go.opentelemetry.io/otel/metric/instrument/syncfloat64" - "go.opentelemetry.io/otel/metric/instrument/syncint64" - "go.opentelemetry.io/otel/metric/unit" "github.com/rudderlabs/rudder-go-kit/logger" "github.com/rudderlabs/rudder-go-kit/stats/internal/otel" ) +const ( + defaultMeterName = "" +) + // otelStats is an OTel-specific adapter that follows the Stats contract type otelStats struct { config statsConfig otelConfig otelStatsConfig meter metric.Meter - counters map[string]syncint64.Counter + counters map[string]instrument.Int64Counter countersMu sync.Mutex gauges map[string]*otelGauge gaugesMu sync.Mutex - timers map[string]syncint64.Histogram + timers map[string]instrument.Int64Histogram timersMu sync.Mutex - histograms map[string]syncfloat64.Histogram + histograms map[string]instrument.Float64Histogram histogramsMu sync.Mutex otelManager otel.Manager @@ -68,17 +69,29 @@ func (s *otelStats) Start(ctx context.Context, goFactory GoRoutineFactory) error )) } if s.otelConfig.metricsEndpoint != "" { - options = append(options, otel.WithMeterProvider( - s.otelConfig.metricsEndpoint, + meterProviderOptions := []otel.MeterProviderOption{ otel.WithMeterProviderExportsInterval(s.otelConfig.metricsExportInterval), - )) + } + if len(s.config.defaultHistogramBuckets) > 0 { + meterProviderOptions = append(meterProviderOptions, + otel.WithDefaultHistogramBucketBoundaries(s.config.defaultHistogramBuckets), + ) + } + if len(s.config.histogramBuckets) > 0 { + for histogramName, buckets := range s.config.histogramBuckets { + meterProviderOptions = append(meterProviderOptions, + otel.WithHistogramBucketBoundaries(histogramName, defaultMeterName, buckets), + ) + } + } + options = append(options, otel.WithMeterProvider(s.otelConfig.metricsEndpoint, meterProviderOptions...)) } _, mp, err := s.otelManager.Setup(ctx, res, options...) if err != nil { return fmt.Errorf("failed to setup open telemetry: %w", err) } - s.meter = mp.Meter("") + s.meter = mp.Meter(defaultMeterName) // Starting background collection var backgroundCollectionCtx context.Context @@ -204,7 +217,7 @@ func (s *otelStats) getMeasurement(name, statType string, tags Tags) Measurement case GaugeType: return s.getGauge(s.meter, name, om.attributes, tags.String()) case TimerType: - instr := buildOTelInstrument(s.meter, name, s.timers, &s.timersMu, instrument.WithUnit(unit.Milliseconds)) + instr := buildOTelInstrument(s.meter, name, s.timers, &s.timersMu, instrument.WithUnit("ms")) return &otelTimer{timer: instr, otelMeasurement: om} case HistogramType: instr := buildOTelInstrument(s.meter, name, s.histograms, &s.histogramsMu) @@ -231,7 +244,7 @@ func (s *otelStats) getGauge(meter metric.Meter, name string, attributes []attri } if !ok { - g, err := meter.AsyncFloat64().Gauge(name) + g, err := meter.Float64ObservableGauge(name) if err != nil { panic(fmt.Errorf("failed to create gauge %s: %w", name, err)) } @@ -239,11 +252,12 @@ func (s *otelStats) getGauge(meter metric.Meter, name string, attributes []attri genericMeasurement: genericMeasurement{statType: GaugeType}, attributes: attributes, }} - err = meter.RegisterCallback([]instrument.Asynchronous{g}, func(ctx context.Context) { + _, err = meter.RegisterCallback(func(ctx context.Context, o metric.Observer) error { if value := og.getValue(); value != nil { - g.Observe(ctx, cast.ToFloat64(value), og.attributes...) + o.ObserveFloat64(g, cast.ToFloat64(value), og.attributes...) } - }) + return nil + }, g) if err != nil { panic(fmt.Errorf("failed to register callback for gauge %s: %w", name, err)) } @@ -274,12 +288,12 @@ func buildOTelInstrument[T any]( var err error var value interface{} switch any(m).(type) { - case map[string]syncint64.Counter: - value, err = meter.SyncInt64().Counter(name, opts...) - case map[string]syncint64.Histogram: - value, err = meter.SyncInt64().Histogram(name, opts...) - case map[string]syncfloat64.Histogram: - value, err = meter.SyncFloat64().Histogram(name, opts...) + case map[string]instrument.Int64Counter: + value, err = meter.Int64Counter(name, castOptions[instrument.Int64Option](opts...)...) + case map[string]instrument.Int64Histogram: + value, err = meter.Int64Histogram(name, castOptions[instrument.Int64Option](opts...)...) + case map[string]instrument.Float64Histogram: + value, err = meter.Float64Histogram(name, castOptions[instrument.Float64Option](opts...)...) default: panic(fmt.Errorf("unknown instrument type %T", instr)) } @@ -293,6 +307,16 @@ func buildOTelInstrument[T any]( return instr } +func castOptions[T any](opts ...instrument.Option) []T { + var co []T + for _, opt := range opts { + if o, ok := opt.(T); ok { + co = append(co, o) + } + } + return co +} + type otelStatsConfig struct { tracesEndpoint string tracingSamplingRate float64 diff --git a/stats/otel_benchmark_test.go b/stats/otel_benchmark_test.go new file mode 100644 index 00000000..69bc0860 --- /dev/null +++ b/stats/otel_benchmark_test.go @@ -0,0 +1,125 @@ +package stats + +import ( + "os" + "path/filepath" + "strconv" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/rudderlabs/rudder-go-kit/config" + "github.com/rudderlabs/rudder-go-kit/logger" + "github.com/rudderlabs/rudder-go-kit/stats/metric" + statsTest "github.com/rudderlabs/rudder-go-kit/stats/testhelper" + "github.com/rudderlabs/rudder-go-kit/testhelper/rand" +) + +func BenchmarkOTel(b *testing.B) { + cwd, err := os.Getwd() + require.NoError(b, err) + _, grpcEndpoint := statsTest.StartOTelCollector(b, metricsPort, + filepath.Join(cwd, "testdata", "otel-collector-config.yaml"), + ) + + type testCase struct { + name string + grpcEndpoint string + } + for _, tc := range []testCase{ + {"reachable", grpcEndpoint}, + {"unreachable", "unreachable:4317"}, + } { + b.Run(tc.name, func(b *testing.B) { + b.Run("counter creation", func(b *testing.B) { + randString := rand.UniqueString(10) + s := getStatsForBenchmark(tc.grpcEndpoint) + b.ResetTimer() + for i := 1; i <= b.N; i++ { + s.NewTaggedStat("test_counter_"+randString+strconv.Itoa(i), CountType, Tags{"tag1": "value1"}) + } + }) + + b.Run("timer creation", func(b *testing.B) { + randString := rand.UniqueString(10) + s := getStatsForBenchmark(tc.grpcEndpoint) + b.ResetTimer() + for i := 1; i <= b.N; i++ { + s.NewTaggedStat("test_timer_"+randString+strconv.Itoa(i), TimerType, Tags{"tag1": "value1"}) + } + }) + + b.Run("gauge creation", func(b *testing.B) { + randString := rand.UniqueString(10) + s := getStatsForBenchmark(tc.grpcEndpoint) + b.ResetTimer() + for i := 1; i <= b.N; i++ { + s.NewTaggedStat("test_gauge_"+randString+strconv.Itoa(i), GaugeType, Tags{"tag1": "value1"}) + } + }) + + b.Run("histogram creation", func(b *testing.B) { + randString := rand.UniqueString(10) + s := getStatsForBenchmark(tc.grpcEndpoint) + b.ResetTimer() + for i := 1; i <= b.N; i++ { + s.NewTaggedStat("test_histogram_"+randString+strconv.Itoa(i), HistogramType, Tags{"tag1": "value1"}) + } + }) + + b.Run("use counter", func(b *testing.B) { + randString := rand.UniqueString(10) + s := getStatsForBenchmark(tc.grpcEndpoint) + m := s.NewTaggedStat("test_counter_"+randString, CountType, Tags{"tag1": "value1"}) + b.ResetTimer() + for i := 0; i < b.N; i++ { + m.Count(i) + } + }) + + b.Run("use timer", func(b *testing.B) { + randString := rand.UniqueString(10) + s := getStatsForBenchmark(tc.grpcEndpoint) + m := s.NewTaggedStat("test_timer_"+randString, TimerType, Tags{"tag1": "value1"}) + start := time.Now() + b.ResetTimer() + for i := 0; i < b.N; i++ { + m.Since(start) + } + }) + + b.Run("use gauge", func(b *testing.B) { + randString := rand.UniqueString(10) + s := getStatsForBenchmark(tc.grpcEndpoint) + m := s.NewTaggedStat("test_gauge_"+randString, GaugeType, Tags{"tag1": "value1"}) + b.ResetTimer() + for i := 0; i < b.N; i++ { + m.Gauge(i) + } + }) + + b.Run("use histogram", func(b *testing.B) { + randString := rand.UniqueString(10) + s := getStatsForBenchmark(tc.grpcEndpoint) + m := s.NewTaggedStat("test_histogram_"+randString, HistogramType, Tags{"tag1": "value1"}) + b.ResetTimer() + for i := 0; i < b.N; i++ { + m.Observe(float64(i)) + } + }) + }) + } +} + +func getStatsForBenchmark(grpcEndpoint string) Stats { + c := config.New() + c.Set("INSTANCE_ID", "my-instance-id") + c.Set("OpenTelemetry.enabled", true) + c.Set("OpenTelemetry.metrics.endpoint", grpcEndpoint) + c.Set("OpenTelemetry.metrics.exportInterval", 10*time.Second) + c.Set("RuntimeStats.enabled", false) + l := logger.NewFactory(c) + m := metric.NewManager() + return NewStats(c, l, m) +} diff --git a/stats/otel_measurement.go b/stats/otel_measurement.go index 220a67af..f13d3be6 100644 --- a/stats/otel_measurement.go +++ b/stats/otel_measurement.go @@ -6,8 +6,7 @@ import ( "time" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric/instrument/syncfloat64" - "go.opentelemetry.io/otel/metric/instrument/syncint64" + "go.opentelemetry.io/otel/metric/instrument" ) // otelMeasurement is the statsd-specific implementation of Measurement @@ -20,7 +19,7 @@ type otelMeasurement struct { // otelCounter represents a counter stat type otelCounter struct { *otelMeasurement - counter syncint64.Counter + counter instrument.Int64Counter } func (c *otelCounter) Count(n int) { @@ -68,7 +67,7 @@ func (g *otelGauge) getValue() interface{} { type otelTimer struct { *otelMeasurement now func() time.Time - timer syncint64.Histogram + timer instrument.Int64Histogram } // Since sends the time elapsed since duration start. Only applies to TimerType stats @@ -106,7 +105,7 @@ func (t *otelTimer) RecordDuration() func() { // otelHistogram represents a histogram stat type otelHistogram struct { *otelMeasurement - histogram syncfloat64.Histogram + histogram instrument.Float64Histogram } // Observe sends an observation diff --git a/stats/otel_test.go b/stats/otel_test.go index 5c88ea10..4314af21 100644 --- a/stats/otel_test.go +++ b/stats/otel_test.go @@ -3,6 +3,7 @@ package stats import ( "context" "fmt" + "math" "net/http" "os" "path/filepath" @@ -253,7 +254,8 @@ func TestOTelTaggedGauges(t *testing.T) { s.NewStat("test-gauge", GaugeType).Gauge(2) s.NewTaggedStat("test-gauge", GaugeType, Tags{"c": "d"}).Gauge(3) - rm, err := r.Collect(ctx) + rm := metricdata.ResourceMetrics{} + err := r.Collect(ctx, &rm) require.NoError(t, err) var dp []metricdata.DataPoint[float64] @@ -533,9 +535,104 @@ func TestOTelStartStopError(t *testing.T) { } } +func TestOTelHistogramBuckets(t *testing.T) { + cwd, err := os.Getwd() + require.NoError(t, err) + container, grpcEndpoint := statsTest.StartOTelCollector(t, metricsPort, + filepath.Join(cwd, "testdata", "otel-collector-config.yaml"), + ) + + c := config.New() + c.Set("INSTANCE_ID", "my-instance-id") + c.Set("OpenTelemetry.enabled", true) + c.Set("OpenTelemetry.metrics.endpoint", grpcEndpoint) + c.Set("OpenTelemetry.metrics.exportInterval", time.Millisecond) + c.Set("RuntimeStats.enabled", false) + l := logger.NewFactory(c) + m := metric.NewManager() + s := NewStats(c, l, m, + WithServiceName(t.Name()), + WithDefaultHistogramBuckets([]float64{10, 20, 30}), + WithHistogramBuckets("bar", []float64{40, 50, 60}), + ) + + // start stats + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + require.NoError(t, s.Start(ctx, DefaultGoRoutineFactory)) + defer s.Stop() + + s.NewTaggedStat("foo", HistogramType, Tags{"a": "b"}).Observe(20) + s.NewTaggedStat("bar", HistogramType, Tags{"c": "d"}).Observe(50) + + var ( + resp *http.Response + metrics map[string]*promClient.MetricFamily + metricsEndpoint = fmt.Sprintf("http://localhost:%d/metrics", docker.GetHostPort(t, metricsPort, container)) + ) + + require.Eventuallyf(t, func() bool { + resp, err = http.Get(metricsEndpoint) + if err != nil { + return false + } + defer func() { httputil.CloseResponse(resp) }() + metrics, err = statsTest.ParsePrometheusMetrics(resp.Body) + if err != nil { + return false + } + if _, ok := metrics["foo"]; !ok { + return false + } + if _, ok := metrics["bar"]; !ok { + return false + } + return true + }, 10*time.Second, 100*time.Millisecond, "err: %v, metrics: %+v", err, metrics) + + require.EqualValues(t, ptr("foo"), metrics["foo"].Name) + require.EqualValues(t, ptr(promClient.MetricType_HISTOGRAM), metrics["foo"].Type) + require.Len(t, metrics["foo"].Metric, 1) + require.EqualValues(t, ptr(uint64(1)), metrics["foo"].Metric[0].Histogram.SampleCount) + require.EqualValues(t, ptr(20.0), metrics["foo"].Metric[0].Histogram.SampleSum) + require.EqualValues(t, []*promClient.Bucket{ + {CumulativeCount: ptr(uint64(0)), UpperBound: ptr(10.0)}, + {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(20.0)}, + {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(30.0)}, + {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(math.Inf(1))}, + }, metrics["foo"].Metric[0].Histogram.Bucket) + require.ElementsMatchf(t, []*promClient.LabelPair{ + // the label1=value1 is coming from the otel-collector-config.yaml (see const_labels) + {Name: ptr("label1"), Value: ptr("value1")}, + {Name: ptr("a"), Value: ptr("b")}, + {Name: ptr("job"), Value: ptr("TestOTelHistogramBuckets")}, + {Name: ptr("instance"), Value: ptr("my-instance-id")}, + }, metrics["foo"].Metric[0].Label, "Got %+v", metrics["foo"].Metric[0].Label) + + require.EqualValues(t, ptr("bar"), metrics["bar"].Name) + require.EqualValues(t, ptr(promClient.MetricType_HISTOGRAM), metrics["bar"].Type) + require.Len(t, metrics["bar"].Metric, 1) + require.EqualValues(t, ptr(uint64(1)), metrics["bar"].Metric[0].Histogram.SampleCount) + require.EqualValues(t, ptr(50.0), metrics["bar"].Metric[0].Histogram.SampleSum) + require.EqualValues(t, []*promClient.Bucket{ + {CumulativeCount: ptr(uint64(0)), UpperBound: ptr(40.0)}, + {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(50.0)}, + {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(60.0)}, + {CumulativeCount: ptr(uint64(1)), UpperBound: ptr(math.Inf(1))}, + }, metrics["bar"].Metric[0].Histogram.Bucket) + require.ElementsMatchf(t, []*promClient.LabelPair{ + // the label1=value1 is coming from the otel-collector-config.yaml (see const_labels) + {Name: ptr("label1"), Value: ptr("value1")}, + {Name: ptr("c"), Value: ptr("d")}, + {Name: ptr("job"), Value: ptr("TestOTelHistogramBuckets")}, + {Name: ptr("instance"), Value: ptr("my-instance-id")}, + }, metrics["bar"].Metric[0].Label, "Got %+v", metrics["bar"].Metric[0].Label) +} + func getDataPoint[T any](ctx context.Context, t *testing.T, rdr sdkmetric.Reader, name string, idx int) (zero T) { t.Helper() - rm, err := rdr.Collect(ctx) + rm := metricdata.ResourceMetrics{} + err := rdr.Collect(ctx, &rm) require.NoError(t, err) require.GreaterOrEqual(t, len(rm.ScopeMetrics), 1) require.GreaterOrEqual(t, len(rm.ScopeMetrics[0].Metrics), idx+1) diff --git a/stats/testhelper/otel.go b/stats/testhelper/otel.go index f4905788..a78347fe 100644 --- a/stats/testhelper/otel.go +++ b/stats/testhelper/otel.go @@ -27,7 +27,7 @@ func WithStartCollectorPort(port int) StartOTelCollectorOpt { } } -func StartOTelCollector(t *testing.T, metricsPort, configPath string, opts ...StartOTelCollectorOpt) ( +func StartOTelCollector(t testing.TB, metricsPort, configPath string, opts ...StartOTelCollectorOpt) ( container *docker.Container, grpcEndpoint string, ) { diff --git a/testhelper/docker/docker.go b/testhelper/docker/docker.go index a0104bb5..868153b1 100644 --- a/testhelper/docker/docker.go +++ b/testhelper/docker/docker.go @@ -9,7 +9,7 @@ import ( ) // GetHostPort returns the desired port mapping -func GetHostPort(t *testing.T, port string, container *docker.Container) int { +func GetHostPort(t testing.TB, port string, container *docker.Container) int { t.Helper() for p, bindings := range container.NetworkSettings.Ports { if p.Port() == port {