From 9395f366e8fa3b8e85d61dd06dd69768b03e441e Mon Sep 17 00:00:00 2001 From: Steven Swartz Date: Tue, 4 Jun 2024 09:27:18 -0400 Subject: [PATCH] [connector/spanmetrics] Produce delta temporality span metrics with timestamps representing an uninterrupted series (#31780) Closes #31671 **Description:** Currently delta temporality span metrics are produced with (StartTimestamp, Timestamp)'s of `(T1, T2), (T3, T4) ...`. However, the [specification](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#temporality) says that the correct pattern for an uninterrupted delta series is `(T1, T2), (T2, T3) ...` This misalignment with the spec can confuse downstream components' conversion from delta temporality to cumulative temporality, causing each data point to be viewed as a cumulative counter "reset". An example of this is in `prometheusexporter` The conversion issue forces you to generate cumulative span metrics, which use significantly more memory to cache the cumulative counts. At work, I applied this patch to our collectors and switched to producing delta temporality metrics for `prometheusexporter` to then convert to cumulative. That caused a significant drop in-memory usage: ![image](https://github.com/open-telemetry/opentelemetry-collector-contrib/assets/17691679/804d0792-1085-400e-a4e3-d64fb865cd4f) **Testing:** - Unit tests asserting the timestamps - Manual testing with `prometheusexporter` to make sure counter values are cumulative and no longer being reset after receiving each delta data point --- ...etrics_uninterrupted_delta_timestamps.yaml | 27 +++ connector/spanmetricsconnector/README.md | 3 +- connector/spanmetricsconnector/config.go | 19 ++ connector/spanmetricsconnector/config_test.go | 39 ++- connector/spanmetricsconnector/connector.go | 52 +++- .../spanmetricsconnector/connector_test.go | 224 ++++++++++++++++-- connector/spanmetricsconnector/go.mod | 1 + connector/spanmetricsconnector/go.sum | 2 + .../internal/metrics/metrics.go | 29 +-- .../spanmetricsconnector/testdata/config.yaml | 11 + .../integrationtest/go.mod | 1 + .../integrationtest/go.sum | 2 + 12 files changed, 371 insertions(+), 39 deletions(-) create mode 100644 .chloggen/spanmetrics_uninterrupted_delta_timestamps.yaml diff --git a/.chloggen/spanmetrics_uninterrupted_delta_timestamps.yaml b/.chloggen/spanmetrics_uninterrupted_delta_timestamps.yaml new file mode 100644 index 000000000000..c0b117bf55a0 --- /dev/null +++ b/.chloggen/spanmetrics_uninterrupted_delta_timestamps.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: spanmetrics + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Produce delta temporality span metrics with StartTimeUnixNano and TimeUnixNano values representing an uninterrupted series + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [31671, 30688] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: This allows producing delta span metrics instead of the more memory-intensive cumulative metrics, specifically when a downstream component can convert the delta metrics to cumulative. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/connector/spanmetricsconnector/README.md b/connector/spanmetricsconnector/README.md index 4f431dfcf139..7e728e4f740f 100644 --- a/connector/spanmetricsconnector/README.md +++ b/connector/spanmetricsconnector/README.md @@ -114,8 +114,9 @@ The following settings can be optionally configured: - `namespace`: Defines the namespace of the generated metrics. If `namespace` provided, generated metric name will be added `namespace.` prefix. - `metrics_flush_interval` (default: `60s`): Defines the flush interval of the generated metrics. - `metrics_expiration` (default: `0`): Defines the expiration time as `time.Duration`, after which, if no new spans are received, metrics will no longer be exported. Setting to `0` means the metrics will never expire (default behavior). +- `metric_timestamp_cache_size` (default `1000`): Only relevant for delta temporality span metrics. Controls the size of the cache used to keep track of a metric's TimestampUnixNano the last time it was flushed. When a metric is evicted from the cache, its next data point will indicate a "reset" in the series. Downstream components converting from delta to cumulative, like `prometheusexporter`, may handle these resets by setting cumulative counters back to 0. - `exemplars`: Use to configure how to attach exemplars to metrics. - - `enabled` (default: `false`): enabling will add spans as Exemplars to all metrics. Exemplars are only kept for one flush interval. + - `enabled` (default: `false`): enabling will add spans as Exemplars to all metrics. Exemplars are only kept for one flush interval.rom the cache, its next data point will indicate a "reset" in the series. Downstream components converting from delta to cumulative, like `prometheusexporter`, may handle these resets by setting cumulative counters back to 0. - `events`: Use to configure the events metric. - `enabled`: (default: `false`): enabling will add the events metric. - `dimensions`: (mandatory if `enabled`) the list of the span's event attributes to add as dimensions to the events metric, which will be included _on top of_ the common and configured `dimensions` for span and resource attributes. diff --git a/connector/spanmetricsconnector/config.go b/connector/spanmetricsconnector/config.go index c379a2c38fd9..2fb5bd9118f6 100644 --- a/connector/spanmetricsconnector/config.go +++ b/connector/spanmetricsconnector/config.go @@ -23,6 +23,8 @@ var defaultHistogramBucketsMs = []float64{ 2, 4, 6, 8, 10, 50, 100, 200, 400, 800, 1000, 1400, 2000, 5000, 10_000, 15_000, } +var defaultDeltaTimestampCacheSize = 1000 + // Dimension defines the dimension name and optional default value if the Dimension is missing from a span attribute. type Dimension struct { Name string `mapstructure:"name"` @@ -71,6 +73,9 @@ type Config struct { // Default value (0) means that the metrics will never expire. MetricsExpiration time.Duration `mapstructure:"metrics_expiration"` + // TimestampCacheSize controls the size of the cache used to keep track of delta metrics' TimestampUnixNano the last time it was flushed + TimestampCacheSize *int `mapstructure:"metric_timestamp_cache_size"` + // Namespace is the namespace of the metrics emitted by the connector. Namespace string `mapstructure:"namespace"` @@ -139,6 +144,13 @@ func (c Config) Validate() error { return fmt.Errorf("invalid metrics_expiration: %v, the duration should be positive", c.MetricsExpiration) } + if c.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta && c.GetDeltaTimestampCacheSize() <= 0 { + return fmt.Errorf( + "invalid delta timestamp cache size: %v, the maximum number of the items in the cache should be positive", + c.GetDeltaTimestampCacheSize(), + ) + } + return nil } @@ -151,6 +163,13 @@ func (c Config) GetAggregationTemporality() pmetric.AggregationTemporality { return pmetric.AggregationTemporalityCumulative } +func (c Config) GetDeltaTimestampCacheSize() int { + if c.TimestampCacheSize != nil { + return *c.TimestampCacheSize + } + return defaultDeltaTimestampCacheSize +} + // validateDimensions checks duplicates for reserved dimensions and additional dimensions. func validateDimensions(dimensions []Dimension) error { labelNames := make(map[string]struct{}) diff --git a/connector/spanmetricsconnector/config_test.go b/connector/spanmetricsconnector/config_test.go index da235794f3cb..117edb58e16d 100644 --- a/connector/spanmetricsconnector/config_test.go +++ b/connector/spanmetricsconnector/config_test.go @@ -27,10 +27,12 @@ func TestLoadConfig(t *testing.T) { defaultMethod := "GET" defaultMaxPerDatapoint := 5 + customTimestampCacheSize := 123 tests := []struct { - id component.ID - expected component.Config - errorMessage string + id component.ID + expected component.Config + errorMessage string + extraAssertions func(config *Config) }{ { id: component.NewIDWithName(metadata.Type, "default"), @@ -125,6 +127,34 @@ func TestLoadConfig(t *testing.T) { Histogram: HistogramConfig{Disable: false, Unit: defaultUnit}, }, }, + { + id: component.NewIDWithName(metadata.Type, "custom_delta_timestamp_cache_size"), + expected: &Config{ + AggregationTemporality: "AGGREGATION_TEMPORALITY_DELTA", + TimestampCacheSize: &customTimestampCacheSize, + DimensionsCacheSize: defaultDimensionsCacheSize, + ResourceMetricsCacheSize: defaultResourceMetricsCacheSize, + MetricsFlushInterval: 60 * time.Second, + Histogram: HistogramConfig{Disable: false, Unit: defaultUnit}, + }, + }, + { + id: component.NewIDWithName(metadata.Type, "default_delta_timestamp_cache_size"), + expected: &Config{ + AggregationTemporality: "AGGREGATION_TEMPORALITY_DELTA", + DimensionsCacheSize: defaultDimensionsCacheSize, + ResourceMetricsCacheSize: defaultResourceMetricsCacheSize, + MetricsFlushInterval: 60 * time.Second, + Histogram: HistogramConfig{Disable: false, Unit: defaultUnit}, + }, + extraAssertions: func(config *Config) { + assert.Equal(t, defaultDeltaTimestampCacheSize, config.GetDeltaTimestampCacheSize()) + }, + }, + { + id: component.NewIDWithName(metadata.Type, "invalid_delta_timestamp_cache_size"), + errorMessage: "invalid delta timestamp cache size: 0, the maximum number of the items in the cache should be positive", + }, } for _, tt := range tests { @@ -143,6 +173,9 @@ func TestLoadConfig(t *testing.T) { } assert.NoError(t, component.ValidateConfig(cfg)) assert.Equal(t, tt.expected, cfg) + if tt.extraAssertions != nil { + tt.extraAssertions(cfg.(*Config)) + } }) } } diff --git a/connector/spanmetricsconnector/connector.go b/connector/spanmetricsconnector/connector.go index 3fab7d119452..cc3f69a7ba9a 100644 --- a/connector/spanmetricsconnector/connector.go +++ b/connector/spanmetricsconnector/connector.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "github.com/hashicorp/golang-lru/v2/simplelru" "github.com/lightstep/go-expohisto/structure" "github.com/tilinna/clock" "go.opentelemetry.io/collector/component" @@ -72,6 +73,9 @@ type connectorImp struct { eDimensions []dimension events EventsConfig + + // Tracks the last TimestampUnixNano for delta metrics so that they represent an uninterrupted series. Unused for cumulative span metrics. + lastDeltaTimestamps *simplelru.LRU[metrics.Key, pcommon.Timestamp] } type resourceMetrics struct { @@ -125,6 +129,16 @@ func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Tic resourceMetricsKeyAttributes[attr] = s } + var lastDeltaTimestamps *simplelru.LRU[metrics.Key, pcommon.Timestamp] + if cfg.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta { + lastDeltaTimestamps, err = simplelru.NewLRU[metrics.Key, pcommon.Timestamp](cfg.GetDeltaTimestampCacheSize(), func(k metrics.Key, _ pcommon.Timestamp) { + logger.Info("Evicting cached delta timestamp", zap.String("key", string(k))) + }) + if err != nil { + return nil, err + } + } + return &connectorImp{ logger: logger, config: *cfg, @@ -133,6 +147,7 @@ func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Tic dimensions: newDimensions(cfg.Dimensions), keyBuf: bytes.NewBuffer(make([]byte, 0, 1024)), metricKeyToDimensions: metricKeyToDimensionsCache, + lastDeltaTimestamps: lastDeltaTimestamps, ticker: ticker, done: make(chan struct{}), eDimensions: newDimensions(cfg.Events.Dimensions), @@ -251,6 +266,7 @@ func (p *connectorImp) exportMetrics(ctx context.Context) { // buildMetrics collects the computed raw metrics data and builds OTLP metrics. func (p *connectorImp) buildMetrics() pmetric.Metrics { m := pmetric.NewMetrics() + timestamp := pcommon.NewTimestampFromTime(time.Now()) p.resourceMetrics.ForEach(func(_ resourceKey, rawMetrics *resourceMetrics) { rm := m.ResourceMetrics().AppendEmpty() @@ -259,23 +275,46 @@ func (p *connectorImp) buildMetrics() pmetric.Metrics { sm := rm.ScopeMetrics().AppendEmpty() sm.Scope().SetName("spanmetricsconnector") + /** + * To represent an uninterrupted stream of metrics as per the spec, the (StartTimestamp, Timestamp)'s of successive data points should be: + * - For cumulative metrics: (T1, T2), (T1, T3), (T1, T4) ... + * - For delta metrics: (T1, T2), (T2, T3), (T3, T4) ... + */ + deltaMetricKeys := make(map[metrics.Key]bool) + startTimeGenerator := func(mk metrics.Key) pcommon.Timestamp { + startTime := rawMetrics.startTimestamp + if p.config.GetAggregationTemporality() == pmetric.AggregationTemporalityDelta { + if lastTimestamp, ok := p.lastDeltaTimestamps.Get(mk); ok { + startTime = lastTimestamp + } + // Collect lastDeltaTimestamps keys that need to be updated. Metrics can share the same key, so defer the update. + deltaMetricKeys[mk] = true + } + return startTime + } + sums := rawMetrics.sums metric := sm.Metrics().AppendEmpty() metric.SetName(buildMetricName(p.config.Namespace, metricNameCalls)) - sums.BuildMetrics(metric, rawMetrics.startTimestamp, p.config.GetAggregationTemporality()) + sums.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality()) if !p.config.Histogram.Disable { histograms := rawMetrics.histograms metric = sm.Metrics().AppendEmpty() metric.SetName(buildMetricName(p.config.Namespace, metricNameDuration)) metric.SetUnit(p.config.Histogram.Unit.String()) - histograms.BuildMetrics(metric, rawMetrics.startTimestamp, p.config.GetAggregationTemporality()) + histograms.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality()) } events := rawMetrics.events if p.events.Enabled { metric = sm.Metrics().AppendEmpty() metric.SetName(buildMetricName(p.config.Namespace, metricNameEvents)) - events.BuildMetrics(metric, rawMetrics.startTimestamp, p.config.GetAggregationTemporality()) + events.BuildMetrics(metric, startTimeGenerator, timestamp, p.config.GetAggregationTemporality()) + } + + for mk := range deltaMetricKeys { + // For delta metrics, cache the current data point's timestamp, which will be the start timestamp for the next data points in the series + p.lastDeltaTimestamps.Add(mk, timestamp) } }) @@ -326,6 +365,7 @@ func (p *connectorImp) resetState() { // and span metadata such as name, kind, status_code and any additional // dimensions the user has configured. func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) { + startTimestamp := pcommon.NewTimestampFromTime(time.Now()) for i := 0; i < traces.ResourceSpans().Len(); i++ { rspans := traces.ResourceSpans().At(i) resourceAttr := rspans.Resource().Attributes() @@ -334,7 +374,7 @@ func (p *connectorImp) aggregateMetrics(traces ptrace.Traces) { continue } - rm := p.getOrCreateResourceMetrics(resourceAttr) + rm := p.getOrCreateResourceMetrics(resourceAttr, startTimestamp) sums := rm.sums histograms := rm.histograms events := rm.events @@ -431,7 +471,7 @@ func (p *connectorImp) createResourceKey(attr pcommon.Map) resourceKey { return pdatautil.MapHash(m) } -func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map) *resourceMetrics { +func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map, startTimestamp pcommon.Timestamp) *resourceMetrics { key := p.createResourceKey(attr) v, ok := p.resourceMetrics.Get(key) if !ok { @@ -440,7 +480,7 @@ func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map) *resourceMet sums: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint), events: metrics.NewSumMetrics(p.config.Exemplars.MaxPerDataPoint), attributes: attr, - startTimestamp: pcommon.NewTimestampFromTime(time.Now()), + startTimestamp: startTimestamp, } p.resourceMetrics.Add(key, v) } diff --git a/connector/spanmetricsconnector/connector_test.go b/connector/spanmetricsconnector/connector_test.go index eae39cda1e0a..341945c4b528 100644 --- a/connector/spanmetricsconnector/connector_test.go +++ b/connector/spanmetricsconnector/connector_test.go @@ -452,7 +452,7 @@ func disabledHistogramsConfig() HistogramConfig { } } -func newConnectorImp(defaultNullValue *string, histogramConfig func() HistogramConfig, exemplarsConfig func() ExemplarsConfig, eventsConfig func() EventsConfig, temporality string, expiration time.Duration, resourceMetricsKeyAttributes []string, excludedDimensions ...string) (*connectorImp, *clock.Mock, error) { +func newConnectorImp(defaultNullValue *string, histogramConfig func() HistogramConfig, exemplarsConfig func() ExemplarsConfig, eventsConfig func() EventsConfig, temporality string, expiration time.Duration, resourceMetricsKeyAttributes []string, deltaTimestampCacheSize int, excludedDimensions ...string) (*connectorImp, *clock.Mock, error) { cfg := &Config{ AggregationTemporality: temporality, Histogram: histogramConfig(), @@ -477,8 +477,9 @@ func newConnectorImp(defaultNullValue *string, histogramConfig func() HistogramC // Add a resource attribute to test "process" attributes like IP, host, region, cluster, etc. {regionResourceAttrName, nil}, }, - Events: eventsConfig(), - MetricsExpiration: expiration, + Events: eventsConfig(), + MetricsExpiration: expiration, + TimestampCacheSize: &deltaTimestampCacheSize, } mockClock := clock.NewMock(time.Now()) @@ -640,7 +641,7 @@ func TestConcurrentShutdown(t *testing.T) { core, observedLogs := observer.New(zapcore.InfoLevel) // Test - p, _, err := newConnectorImp(nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}) + p, _, err := newConnectorImp(nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000) require.NoError(t, err) // Override the default no-op consumer and logger for testing. p.metricsConsumer = new(consumertest.MetricsSink) @@ -718,7 +719,7 @@ func TestConsumeMetricsErrors(t *testing.T) { logger := zap.New(core) var wg sync.WaitGroup - p, mockClock, err := newConnectorImp(nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}) + p, mockClock, err := newConnectorImp(nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000) require.NoError(t, err) // Override the default no-op consumer and logger for testing. p.metricsConsumer = &errConsumer{ @@ -883,7 +884,7 @@ func TestConsumeTraces(t *testing.T) { // Prepare mcon := &consumertest.MetricsSink{} - p, mockClock, err := newConnectorImp(stringp("defaultNullValue"), tc.histogramConfig, tc.exemplarConfig, disabledEventsConfig, tc.aggregationTemporality, 0, []string{}) + p, mockClock, err := newConnectorImp(stringp("defaultNullValue"), tc.histogramConfig, tc.exemplarConfig, disabledEventsConfig, tc.aggregationTemporality, 0, []string{}, 1000) require.NoError(t, err) // Override the default no-op consumer with metrics sink for testing. p.metricsConsumer = mcon @@ -910,7 +911,7 @@ func TestConsumeTraces(t *testing.T) { } func TestMetricKeyCache(t *testing.T) { - p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}) + p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000) require.NoError(t, err) traces := buildSampleTrace() @@ -939,7 +940,7 @@ func TestMetricKeyCache(t *testing.T) { } func TestResourceMetricsCache(t *testing.T) { - p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}) + p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000) require.NoError(t, err) // Test @@ -976,7 +977,7 @@ func TestResourceMetricsCache(t *testing.T) { } func TestResourceMetricsExpiration(t *testing.T) { - p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 1*time.Millisecond, []string{}) + p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 1*time.Millisecond, []string{}, 1000) require.NoError(t, err) // Test @@ -1001,7 +1002,7 @@ func TestResourceMetricsKeyAttributes(t *testing.T) { "service.name", } - p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, resourceMetricsKeyAttributes) + p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, resourceMetricsKeyAttributes, 1000) require.NoError(t, err) // Test @@ -1039,7 +1040,7 @@ func TestResourceMetricsKeyAttributes(t *testing.T) { func BenchmarkConnectorConsumeTraces(b *testing.B) { // Prepare - conn, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}) + conn, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000) require.NoError(b, err) traces := buildSampleTrace() @@ -1053,7 +1054,7 @@ func BenchmarkConnectorConsumeTraces(b *testing.B) { func TestExcludeDimensionsConsumeTraces(t *testing.T) { excludeDimensions := []string{"span.kind", "span.name", "totallyWrongNameDoesNotAffectAnything"} - p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, excludeDimensions...) + p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000, excludeDimensions...) require.NoError(t, err) traces := buildSampleTrace() @@ -1183,7 +1184,7 @@ func TestConnectorConsumeTracesEvictedCacheKey(t *testing.T) { wg.Add(len(wantDataPointCounts)) // Note: default dimension key cache size is 2. - p, mockClock, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}) + p, mockClock, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 0, []string{}, 1000) require.NoError(t, err) // Override the default no-op consumer with metrics sink for testing. p.metricsConsumer = mcon @@ -1268,7 +1269,7 @@ func TestConnectorConsumeTracesExpiredMetrics(t *testing.T) { mcon := &consumertest.MetricsSink{} // Creating a connector with a very short metricsTTL to ensure that the metrics are expired. - p, mockClock, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 1*time.Nanosecond, []string{}) + p, mockClock, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, 1*time.Nanosecond, []string{}, 1000) require.NoError(t, err) // Override the default no-op consumer with metrics sink for testing. p.metricsConsumer = mcon @@ -1543,7 +1544,7 @@ func TestExemplarsAreDiscardedAfterFlushing(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - p, _, err := newConnectorImp(stringp("defaultNullValue"), tt.histogramConfig, enabledExemplarsConfig, enabledEventsConfig, tt.temporality, 0, []string{}) + p, _, err := newConnectorImp(stringp("defaultNullValue"), tt.histogramConfig, enabledExemplarsConfig, enabledEventsConfig, tt.temporality, 0, []string{}, 1000) p.metricsConsumer = &consumertest.MetricsSink{} require.NoError(t, err) @@ -1642,3 +1643,196 @@ func assertDataPointsHaveExactlyOneExemplarForTrace(t *testing.T, metrics pmetri } } } + +func TestTimestampsForUninterruptedStream(t *testing.T) { + tests := []struct { + temporality string + verifyTimestamps func(startTime1 pcommon.Timestamp, timestamp1 pcommon.Timestamp, startTime2 pcommon.Timestamp, timestamp2 pcommon.Timestamp) + }{ + { + temporality: cumulative, + verifyTimestamps: func(startTime1 pcommon.Timestamp, timestamp1 pcommon.Timestamp, startTime2 pcommon.Timestamp, timestamp2 pcommon.Timestamp) { + // (T1, T2), (T1, T3) ... + assert.Greater(t, timestamp1, startTime1) + assert.Equal(t, startTime1, startTime2) + assert.Greater(t, timestamp2, startTime2) + }, + }, + { + temporality: delta, + verifyTimestamps: func(startTime1 pcommon.Timestamp, timestamp1 pcommon.Timestamp, startTime2 pcommon.Timestamp, timestamp2 pcommon.Timestamp) { + // (T1, T2), (T2, T3) ... + assert.Greater(t, timestamp1, startTime1) + assert.Equal(t, timestamp1, startTime2) + assert.Greater(t, timestamp2, startTime2) + }, + }, + } + + for _, tt := range tests { + t.Run(tt.temporality, func(t *testing.T) { + p, _, err := newConnectorImp(stringp("defaultNullValue"), explicitHistogramsConfig, enabledExemplarsConfig, enabledEventsConfig, tt.temporality, 0, []string{}, 1000) + require.NoError(t, err) + p.metricsConsumer = &consumertest.MetricsSink{} + + // Test + ctx := metadata.NewIncomingContext(context.Background(), nil) + + // Send first batch of spans + err = p.ConsumeTraces(ctx, buildSampleTrace()) + require.NoError(t, err) + p.exportMetrics(ctx) + metrics1 := p.metricsConsumer.(*consumertest.MetricsSink).AllMetrics()[0] + startTimestamp1, timestamp1 := verifyAndCollectCommonTimestamps(t, metrics1) + + // Send an unrelated batch of spans for a different resource + unrelatedTraces := ptrace.NewTraces() + initServiceSpans( + serviceSpans{ + serviceName: "unrelated-service", + spans: []span{ + { + name: "/ping", + kind: ptrace.SpanKindServer, + statusCode: ptrace.StatusCodeOk, + traceID: [16]byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10}, + spanID: [8]byte{0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18}, + }, + }, + }, unrelatedTraces.ResourceSpans().AppendEmpty()) + err = p.ConsumeTraces(ctx, unrelatedTraces) + require.NoError(t, err) + p.exportMetrics(ctx) + + // Send another set of spans that are the same as the first batch + err = p.ConsumeTraces(ctx, buildSampleTrace()) + require.NoError(t, err) + p.exportMetrics(ctx) + metrics2 := p.metricsConsumer.(*consumertest.MetricsSink).AllMetrics()[2] + startTimestamp2, timestamp2 := verifyAndCollectCommonTimestamps(t, metrics2) + + tt.verifyTimestamps(startTimestamp1, timestamp1, startTimestamp2, timestamp2) + }) + } +} + +func verifyAndCollectCommonTimestamps(t *testing.T, m pmetric.Metrics) (start pcommon.Timestamp, timestamp pcommon.Timestamp) { + // Go through all data points and collect the start timestamp and timestamp. They should be the same value for each data point + for i := 0; i < m.ResourceMetrics().Len(); i++ { + rm := m.ResourceMetrics().At(i) + + serviceName, _ := rm.Resource().Attributes().Get("service.name") + if serviceName.Str() == "unrelated-service" { + continue + } + + ism := rm.ScopeMetrics() + for ilmC := 0; ilmC < ism.Len(); ilmC++ { + m := ism.At(ilmC).Metrics() + for mC := 0; mC < m.Len(); mC++ { + metric := m.At(mC) + + switch metric.Type() { + case pmetric.MetricTypeSum: + { + dps := metric.Sum().DataPoints() + for dpi := 0; dpi < dps.Len(); dpi++ { + if int64(start) == 0 { + start = dps.At(dpi).StartTimestamp() + timestamp = dps.At(dpi).Timestamp() + } + assert.Equal(t, dps.At(dpi).StartTimestamp(), start) + assert.Equal(t, dps.At(dpi).Timestamp(), timestamp) + } + } + case pmetric.MetricTypeHistogram: + { + dps := metric.Histogram().DataPoints() + for dpi := 0; dpi < dps.Len(); dpi++ { + if int64(start) == 0 { + start = dps.At(dpi).StartTimestamp() + timestamp = dps.At(dpi).Timestamp() + } + assert.Equal(t, dps.At(dpi).StartTimestamp(), start) + assert.Equal(t, dps.At(dpi).Timestamp(), timestamp) + } + } + default: + t.Fail() + } + } + } + } + + return start, timestamp +} + +func TestDeltaTimestampCacheExpiry(t *testing.T) { + timestampCacheSize := 1 + p, _, err := newConnectorImp(stringp("defaultNullValue"), exponentialHistogramsConfig, enabledExemplarsConfig, enabledEventsConfig, delta, 0, []string{}, timestampCacheSize) + require.NoError(t, err) + p.metricsConsumer = &consumertest.MetricsSink{} + + ctx := metadata.NewIncomingContext(context.Background(), nil) + + // Send a span from service A which should fill the cache + serviceATrace1 := ptrace.NewTraces() + initServiceSpans( + serviceSpans{ + serviceName: "service-a", + spans: []span{ + { + name: "/ping", + kind: ptrace.SpanKindServer, + statusCode: ptrace.StatusCodeOk, + traceID: [16]byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10}, + spanID: [8]byte{0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18}, + }, + }, + }, serviceATrace1.ResourceSpans().AppendEmpty()) + err = p.ConsumeTraces(ctx, serviceATrace1) + require.NoError(t, err) + p.exportMetrics(ctx) + + // Send a span from service B which should evict service A's entries + serviceBTrace1 := ptrace.NewTraces() + initServiceSpans( + serviceSpans{ + serviceName: "service-b", + spans: []span{ + { + name: "/ping", + kind: ptrace.SpanKindServer, + statusCode: ptrace.StatusCodeOk, + traceID: [16]byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10}, + spanID: [8]byte{0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18}, + }, + }, + }, serviceBTrace1.ResourceSpans().AppendEmpty()) + err = p.ConsumeTraces(ctx, serviceBTrace1) + require.NoError(t, err) + p.exportMetrics(ctx) + + // Send another span from Service A, then verify no error + cache was actually evicted + serviceATrace2 := ptrace.NewTraces() + initServiceSpans( + serviceSpans{ + serviceName: "service-a", + spans: []span{ + { + name: "/ping", + kind: ptrace.SpanKindServer, + statusCode: ptrace.StatusCodeOk, + traceID: [16]byte{0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0A, 0x0B, 0x0C, 0x0D, 0x0E, 0x0F, 0x10}, + spanID: [8]byte{0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18}, + }, + }, + }, serviceATrace2.ResourceSpans().AppendEmpty()) + err = p.ConsumeTraces(ctx, serviceATrace2) + require.NoError(t, err) + p.exportMetrics(ctx) + + serviceATimestamp1 := p.metricsConsumer.(*consumertest.MetricsSink).AllMetrics()[0].ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).Timestamp() + serviceAStartTimestamp2 := p.metricsConsumer.(*consumertest.MetricsSink).AllMetrics()[2].ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics().At(0).Sum().DataPoints().At(0).StartTimestamp() + assert.Greater(t, serviceAStartTimestamp2, serviceATimestamp1) // These would be the same if nothing was evicted from the cache +} diff --git a/connector/spanmetricsconnector/go.mod b/connector/spanmetricsconnector/go.mod index 0399f49392da..485016e68172 100644 --- a/connector/spanmetricsconnector/go.mod +++ b/connector/spanmetricsconnector/go.mod @@ -4,6 +4,7 @@ go 1.21.0 require ( github.com/hashicorp/golang-lru v1.0.2 + github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/lightstep/go-expohisto v1.0.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.102.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.102.0 diff --git a/connector/spanmetricsconnector/go.sum b/connector/spanmetricsconnector/go.sum index 96c22916c4cc..17568f1f7cfc 100644 --- a/connector/spanmetricsconnector/go.sum +++ b/connector/spanmetricsconnector/go.sum @@ -21,6 +21,8 @@ github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c= github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= diff --git a/connector/spanmetricsconnector/internal/metrics/metrics.go b/connector/spanmetricsconnector/internal/metrics/metrics.go index b9ec433c6e13..7b6cc937f637 100644 --- a/connector/spanmetricsconnector/internal/metrics/metrics.go +++ b/connector/spanmetricsconnector/internal/metrics/metrics.go @@ -5,7 +5,6 @@ package metrics // import "github.com/open-telemetry/opentelemetry-collector-con import ( "sort" - "time" "github.com/lightstep/go-expohisto/structure" "go.opentelemetry.io/collector/pdata/pcommon" @@ -16,7 +15,7 @@ type Key string type HistogramMetrics interface { GetOrCreate(key Key, attributes pcommon.Map) Histogram - BuildMetrics(pmetric.Metric, pcommon.Timestamp, pmetric.AggregationTemporality) + BuildMetrics(pmetric.Metric, generateStartTimestamp, pcommon.Timestamp, pmetric.AggregationTemporality) ClearExemplars() } @@ -59,6 +58,8 @@ type exponentialHistogram struct { maxExemplarCount *int } +type generateStartTimestamp = func(Key) pcommon.Timestamp + func NewExponentialHistogramMetrics(maxSize int32, maxExemplarCount *int) HistogramMetrics { return &exponentialHistogramMetrics{ metrics: make(map[Key]*exponentialHistogram), @@ -93,16 +94,16 @@ func (m *explicitHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Map) func (m *explicitHistogramMetrics) BuildMetrics( metric pmetric.Metric, - start pcommon.Timestamp, + startTimestamp generateStartTimestamp, + timestamp pcommon.Timestamp, temporality pmetric.AggregationTemporality, ) { metric.SetEmptyHistogram().SetAggregationTemporality(temporality) dps := metric.Histogram().DataPoints() dps.EnsureCapacity(len(m.metrics)) - timestamp := pcommon.NewTimestampFromTime(time.Now()) - for _, h := range m.metrics { + for k, h := range m.metrics { dp := dps.AppendEmpty() - dp.SetStartTimestamp(start) + dp.SetStartTimestamp(startTimestamp(k)) dp.SetTimestamp(timestamp) dp.ExplicitBounds().FromRaw(h.bounds) dp.BucketCounts().FromRaw(h.bucketCounts) @@ -146,16 +147,16 @@ func (m *exponentialHistogramMetrics) GetOrCreate(key Key, attributes pcommon.Ma func (m *exponentialHistogramMetrics) BuildMetrics( metric pmetric.Metric, - start pcommon.Timestamp, + startTimestamp generateStartTimestamp, + timestamp pcommon.Timestamp, temporality pmetric.AggregationTemporality, ) { metric.SetEmptyExponentialHistogram().SetAggregationTemporality(temporality) dps := metric.ExponentialHistogram().DataPoints() dps.EnsureCapacity(len(m.metrics)) - timestamp := pcommon.NewTimestampFromTime(time.Now()) - for _, m := range m.metrics { + for k, m := range m.metrics { dp := dps.AppendEmpty() - dp.SetStartTimestamp(start) + dp.SetStartTimestamp(startTimestamp(k)) dp.SetTimestamp(timestamp) expoHistToExponentialDataPoint(m.histogram, dp) for i := 0; i < m.exemplars.Len(); i++ { @@ -284,7 +285,8 @@ func (s *Sum) AddExemplar(traceID pcommon.TraceID, spanID pcommon.SpanID, value func (m *SumMetrics) BuildMetrics( metric pmetric.Metric, - start pcommon.Timestamp, + startTimestamp generateStartTimestamp, + timestamp pcommon.Timestamp, temporality pmetric.AggregationTemporality, ) { metric.SetEmptySum().SetIsMonotonic(true) @@ -292,10 +294,9 @@ func (m *SumMetrics) BuildMetrics( dps := metric.Sum().DataPoints() dps.EnsureCapacity(len(m.metrics)) - timestamp := pcommon.NewTimestampFromTime(time.Now()) - for _, s := range m.metrics { + for k, s := range m.metrics { dp := dps.AppendEmpty() - dp.SetStartTimestamp(start) + dp.SetStartTimestamp(startTimestamp(k)) dp.SetTimestamp(timestamp) dp.SetIntValue(int64(s.count)) for i := 0; i < s.exemplars.Len(); i++ { diff --git a/connector/spanmetricsconnector/testdata/config.yaml b/connector/spanmetricsconnector/testdata/config.yaml index 7358cd000549..2b69653e4a21 100644 --- a/connector/spanmetricsconnector/testdata/config.yaml +++ b/connector/spanmetricsconnector/testdata/config.yaml @@ -78,3 +78,14 @@ spanmetrics/resource_metrics_key_attributes: - service.name - telemetry.sdk.language - telemetry.sdk.name + +spanmetrics/custom_delta_timestamp_cache_size: + aggregation_temporality: "AGGREGATION_TEMPORALITY_DELTA" + metric_timestamp_cache_size: 123 + +spanmetrics/invalid_delta_timestamp_cache_size: + aggregation_temporality: "AGGREGATION_TEMPORALITY_DELTA" + metric_timestamp_cache_size: 0 + +spanmetrics/default_delta_timestamp_cache_size: + aggregation_temporality: "AGGREGATION_TEMPORALITY_DELTA" diff --git a/exporter/elasticsearchexporter/integrationtest/go.mod b/exporter/elasticsearchexporter/integrationtest/go.mod index b529c1d84cb0..48b26de61ffa 100644 --- a/exporter/elasticsearchexporter/integrationtest/go.mod +++ b/exporter/elasticsearchexporter/integrationtest/go.mod @@ -61,6 +61,7 @@ require ( github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0 // indirect github.com/hashicorp/go-version v1.7.0 // indirect github.com/hashicorp/golang-lru v1.0.2 // indirect + github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/iancoleman/strcase v0.3.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jaegertracing/jaeger v1.57.0 // indirect diff --git a/exporter/elasticsearchexporter/integrationtest/go.sum b/exporter/elasticsearchexporter/integrationtest/go.sum index b77869ae5ed8..ec71908d4028 100644 --- a/exporter/elasticsearchexporter/integrationtest/go.sum +++ b/exporter/elasticsearchexporter/integrationtest/go.sum @@ -116,6 +116,8 @@ github.com/hashicorp/go-version v1.7.0 h1:5tqGy27NaOTB8yJKUZELlFAS/LTKJkrmONwQKe github.com/hashicorp/go-version v1.7.0/go.mod h1:fltr4n8CU8Ke44wwGCBoEymUuxUHl09ZGVZPK5anwXA= github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c= github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/hexops/gotextdiff v1.0.3 h1:gitA9+qJrrTCsiCl7+kh75nPqQt1cx4ZkudSTLoUqJM=