Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix broken spanmetrics counters after span producing service restart #29711

Merged
merged 18 commits into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/spanmetrics-fix-resource-metrics-key.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: connector/spanmetrics

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Configurable resource metrics key attributes, filter the resource attributes used to create the resource metrics key.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [29711]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: This enhancement can be used to fix broken spanmetrics counters after a span producing service restart, when resource attributes contain dynamic/ephemeral values (e.g. process id).

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
7 changes: 7 additions & 0 deletions connector/spanmetricsconnector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ type Config struct {
// Optional. See defaultResourceMetricsCacheSize in connector.go for the default value.
ResourceMetricsCacheSize int `mapstructure:"resource_metrics_cache_size"`

// ResourceMetricsKeyAttributes filters the resource attributes used to create the resource metrics key hash.
// This can be used to avoid situations where resource attributes may change across service restarts, causing
// metric counters to break (and duplicate).
// e.g. ["service.name", "telemetry.sdk.language", "telemetry.sdk.name"]
portertech marked this conversation as resolved.
Show resolved Hide resolved
// See https://opentelemetry.io/docs/specs/semconv/resource/ for possible attributes.
ResourceMetricsKeyAttributes []string `mapstructure:"resource_metrics_key_attributes"`

AggregationTemporality string `mapstructure:"aggregation_temporality"`

Histogram HistogramConfig `mapstructure:"histogram"`
Expand Down
11 changes: 11 additions & 0 deletions connector/spanmetricsconnector/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,17 @@ func TestLoadConfig(t *testing.T) {
Exemplars: ExemplarsConfig{Enabled: true, MaxPerDataPoint: &defaultMaxPerDatapoint},
},
},
{
id: component.NewIDWithName(metadata.Type, "resource_metrics_key_attributes"),
expected: &Config{
AggregationTemporality: "AGGREGATION_TEMPORALITY_CUMULATIVE",
DimensionsCacheSize: defaultDimensionsCacheSize,
ResourceMetricsCacheSize: defaultResourceMetricsCacheSize,
ResourceMetricsKeyAttributes: []string{"service.name", "telemetry.sdk.language", "telemetry.sdk.name"},
MetricsFlushInterval: 15 * time.Second,
Histogram: HistogramConfig{Disable: false, Unit: defaultUnit},
},
},
}

for _, tt := range tests {
Expand Down
42 changes: 31 additions & 11 deletions connector/spanmetricsconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type connectorImp struct {

resourceMetrics *cache.Cache[resourceKey, *resourceMetrics]

resourceMetricsKeyAttributes map[string]bool
portertech marked this conversation as resolved.
Show resolved Hide resolved

keyBuf *bytes.Buffer

// An LRU cache of dimension key-value maps keyed by a unique identifier formed by a concatenation of its values:
Expand Down Expand Up @@ -115,17 +117,23 @@ func newConnector(logger *zap.Logger, config component.Config, ticker *clock.Tic
return nil, err
}

resourceMetricsKeyAttributes := make(map[string]bool, len(cfg.ResourceMetricsKeyAttributes))
for _, attr := range cfg.ResourceMetricsKeyAttributes {
resourceMetricsKeyAttributes[attr] = true
}

return &connectorImp{
logger: logger,
config: *cfg,
resourceMetrics: resourceMetricsCache,
dimensions: newDimensions(cfg.Dimensions),
keyBuf: bytes.NewBuffer(make([]byte, 0, 1024)),
metricKeyToDimensions: metricKeyToDimensionsCache,
ticker: ticker,
done: make(chan struct{}),
eDimensions: newDimensions(cfg.Events.Dimensions),
events: cfg.Events,
logger: logger,
config: *cfg,
resourceMetrics: resourceMetricsCache,
resourceMetricsKeyAttributes: resourceMetricsKeyAttributes,
dimensions: newDimensions(cfg.Dimensions),
keyBuf: bytes.NewBuffer(make([]byte, 0, 1024)),
metricKeyToDimensions: metricKeyToDimensionsCache,
ticker: ticker,
done: make(chan struct{}),
eDimensions: newDimensions(cfg.Events.Dimensions),
events: cfg.Events,
}, nil
}

Expand Down Expand Up @@ -390,8 +398,20 @@ func (p *connectorImp) addExemplar(span ptrace.Span, duration float64, h metrics

type resourceKey [16]byte

func (p *connectorImp) createResourceKey(attr pcommon.Map) resourceKey {
if len(p.resourceMetricsKeyAttributes) == 0 {
return pdatautil.MapHash(attr)
}
m := pcommon.NewMap()
attr.CopyTo(m)
m.RemoveIf(func(k string, _ pcommon.Value) bool {
return !p.resourceMetricsKeyAttributes[k]
})
return pdatautil.MapHash(m)
}

func (p *connectorImp) getOrCreateResourceMetrics(attr pcommon.Map) *resourceMetrics {
key := resourceKey(pdatautil.MapHash(attr))
key := p.createResourceKey(attr)
v, ok := p.resourceMetrics.Get(key)
if !ok {
v = &resourceMetrics{
Expand Down
75 changes: 59 additions & 16 deletions connector/spanmetricsconnector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ func TestConcurrentShutdown(t *testing.T) {
ticker := mockClock.NewTicker(time.Nanosecond)

// Test
p := newConnectorImp(t, new(consumertest.MetricsSink), nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, logger, ticker)
p := newConnectorImp(t, new(consumertest.MetricsSink), nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, logger, ticker)
err := p.Start(ctx, componenttest.NewNopHost())
require.NoError(t, err)

Expand Down Expand Up @@ -680,7 +680,7 @@ func TestConsumeMetricsErrors(t *testing.T) {
}
mockClock := clock.NewMock(time.Now())
ticker := mockClock.NewTicker(time.Nanosecond)
p := newConnectorImp(t, mcon, nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, logger, ticker)
p := newConnectorImp(t, mcon, nil, explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, logger, ticker)

ctx := metadata.NewIncomingContext(context.Background(), nil)
err := p.Start(ctx, componenttest.NewNopHost())
Expand Down Expand Up @@ -842,7 +842,7 @@ func TestConsumeTraces(t *testing.T) {
mockClock := clock.NewMock(time.Now())
ticker := mockClock.NewTicker(time.Nanosecond)

p := newConnectorImp(t, mcon, stringp("defaultNullValue"), tc.histogramConfig, tc.exemplarConfig, disabledEventsConfig, tc.aggregationTemporality, zaptest.NewLogger(t), ticker)
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), tc.histogramConfig, tc.exemplarConfig, disabledEventsConfig, tc.aggregationTemporality, []string{}, zaptest.NewLogger(t), ticker)

ctx := metadata.NewIncomingContext(context.Background(), nil)
err := p.Start(ctx, componenttest.NewNopHost())
Expand All @@ -868,7 +868,7 @@ func TestConsumeTraces(t *testing.T) {
func TestMetricKeyCache(t *testing.T) {
mcon := consumertest.NewNop()

p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, zaptest.NewLogger(t), nil)
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, zaptest.NewLogger(t), nil)
traces := buildSampleTrace()

// Test
Expand Down Expand Up @@ -898,7 +898,7 @@ func TestMetricKeyCache(t *testing.T) {
func TestResourceMetricsCache(t *testing.T) {
mcon := consumertest.NewNop()

p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, zaptest.NewLogger(t), nil)
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, zaptest.NewLogger(t), nil)

// Test
ctx := metadata.NewIncomingContext(context.Background(), nil)
Expand Down Expand Up @@ -933,11 +933,53 @@ func TestResourceMetricsCache(t *testing.T) {
assert.Equal(t, resourceMetricsCacheSize, p.resourceMetrics.Len())
}

func TestResourceMetricsKeyAttributes(t *testing.T) {
mcon := consumertest.NewNop()

resourceMetricsKeyAttributes := []string{
"service.name",
}

p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, resourceMetricsKeyAttributes, zaptest.NewLogger(t), nil)

// Test
ctx := metadata.NewIncomingContext(context.Background(), nil)

// 0 resources in the beginning
assert.Zero(t, p.resourceMetrics.Len())

err := p.ConsumeTraces(ctx, buildSampleTrace())
// Validate
require.NoError(t, err)
assert.Equal(t, 2, p.resourceMetrics.Len())

// consume another batch of traces for the same resources
err = p.ConsumeTraces(ctx, buildSampleTrace())
require.NoError(t, err)
assert.Equal(t, 2, p.resourceMetrics.Len())

// consume more batches for new resources. Max size is exceeded causing old resource entries to be discarded
for i := 0; i < resourceMetricsCacheSize; i++ {
traces := buildSampleTrace()

// add resource attributes to simulate additional resources providing data
for j := 0; j < traces.ResourceSpans().Len(); j++ {
traces.ResourceSpans().At(j).Resource().Attributes().PutStr("not included in resource key attributes", fmt.Sprintf("%d", i))
}

err = p.ConsumeTraces(ctx, traces)
require.NoError(t, err)
}

// validate that the additional resources providing data did not result in additional resource metrics
assert.Equal(t, 2, p.resourceMetrics.Len())
portertech marked this conversation as resolved.
Show resolved Hide resolved
}

func BenchmarkConnectorConsumeTraces(b *testing.B) {
// Prepare
mcon := consumertest.NewNop()

conn := newConnectorImp(nil, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, zaptest.NewLogger(b), nil)
conn := newConnectorImp(nil, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, zaptest.NewLogger(b), nil)

traces := buildSampleTrace()

Expand All @@ -951,7 +993,7 @@ func BenchmarkConnectorConsumeTraces(b *testing.B) {
func TestExcludeDimensionsConsumeTraces(t *testing.T) {
mcon := consumertest.NewNop()
excludeDimensions := []string{"span.kind", "span.name", "totallyWrongNameDoesNotAffectAnything"}
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, zaptest.NewLogger(t), nil, excludeDimensions...)
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, zaptest.NewLogger(t), nil, excludeDimensions...)
traces := buildSampleTrace()

// Test
Expand Down Expand Up @@ -1000,15 +1042,16 @@ func TestExcludeDimensionsConsumeTraces(t *testing.T) {

}

func newConnectorImp(t *testing.T, mcon consumer.Metrics, defaultNullValue *string, histogramConfig func() HistogramConfig, exemplarsConfig func() ExemplarsConfig, eventsConfig func() EventsConfig, temporality string, logger *zap.Logger, ticker *clock.Ticker, excludedDimensions ...string) *connectorImp {
func newConnectorImp(t *testing.T, mcon consumer.Metrics, defaultNullValue *string, histogramConfig func() HistogramConfig, exemplarsConfig func() ExemplarsConfig, eventsConfig func() EventsConfig, temporality string, resourceMetricsKeyAttributes []string, logger *zap.Logger, ticker *clock.Ticker, excludedDimensions ...string) *connectorImp {

cfg := &Config{
AggregationTemporality: temporality,
Histogram: histogramConfig(),
Exemplars: exemplarsConfig(),
ExcludeDimensions: excludedDimensions,
DimensionsCacheSize: dimensionsCacheSize,
ResourceMetricsCacheSize: resourceMetricsCacheSize,
AggregationTemporality: temporality,
Histogram: histogramConfig(),
Exemplars: exemplarsConfig(),
ExcludeDimensions: excludedDimensions,
DimensionsCacheSize: dimensionsCacheSize,
ResourceMetricsCacheSize: resourceMetricsCacheSize,
ResourceMetricsKeyAttributes: resourceMetricsKeyAttributes,
Dimensions: []Dimension{
// Set nil defaults to force a lookup for the attribute in the span.
{stringAttrName, nil},
Expand Down Expand Up @@ -1120,7 +1163,7 @@ func TestConnectorConsumeTracesEvictedCacheKey(t *testing.T) {
ticker := mockClock.NewTicker(time.Nanosecond)

// Note: default dimension key cache size is 2.
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, zaptest.NewLogger(t), ticker)
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, disabledExemplarsConfig, disabledEventsConfig, cumulative, []string{}, zaptest.NewLogger(t), ticker)

ctx := metadata.NewIncomingContext(context.Background(), nil)
err := p.Start(ctx, componenttest.NewNopHost())
Expand Down Expand Up @@ -1374,7 +1417,7 @@ func TestSpanMetrics_Events(t *testing.T) {
}
func TestExemplarsForSumMetrics(t *testing.T) {
mcon := consumertest.NewNop()
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, enabledExemplarsConfig, enabledEventsConfig, cumulative, zaptest.NewLogger(t), nil)
p := newConnectorImp(t, mcon, stringp("defaultNullValue"), explicitHistogramsConfig, enabledExemplarsConfig, enabledEventsConfig, cumulative, []string{}, zaptest.NewLogger(t), nil)
traces := buildSampleTrace()

// Test
Expand Down
7 changes: 7 additions & 0 deletions connector/spanmetricsconnector/testdata/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,10 @@ spanmetrics/exemplars_enabled_with_max_per_datapoint:
exemplars:
enabled: true
max_per_data_point: 5

# resource metrics key attributes filter
spanmetrics/resource_metrics_key_attributes:
resource_metrics_key_attributes:
- service.name
- telemetry.sdk.language
- telemetry.sdk.name